When working on the current project, a complicated transaction case appeared, which should work smoothly, but in itself is not atomic. The application itself is a small billing service that can horizontally scale and uses one distributed data storage (cassandra).

The application, among other things, operates with the essence of balance:

{ "accountId": "admin", "currencyBalances": { "RUB": 200.22, "USD": 10 }, "createdAt": 0, "updatedAt": 1400000000 } 

and applied to the balance of the operation of deposit or withdrawal of funds

 { "accountId": "admin", "currency": "RUB", "amount": -100.0 } 

When an operation is created, it should appear in two places: as a record of the operation itself and as a change in balance. The storage does not allow me to do this atomically, and I need an algorithm that will allow to restore the integrity of the system that falls between two records in the database.

The task itself is trivial (writing about the need to establish integrity for operation X in a log -> creating an X record -> establishing integrity, if an application instance fails, the log of uncommitted operations is read and played -> the log entry is deleted), but I am limited to some Applications and related services:

  • The repository has a transactional mechanism only for operations on one key, so I cannot atomically make two entries at once. Because of this, I can not update the balance itself and make an entry in the next table - the system has the right to fall between these two records, as well as the storage may be unavailable when performing one of the records.
  • All entities are stored as a log of their manipulation (event sourcing). The balance might look like this:

     [ { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000000, "currency": "RUB", "amount": 500, "operationId": "408279ae-ee9a-46f3-a9d2-88b0e13d476e" }, "version": 1 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000001, "currency": "RUB", "amount": 100, "operationId": "87fb18b0-3b01-477f-b959-21b22d54af1f" }, "version": 2 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000002, "currency": "EUR", "amount": 10, "operationId": "464608fd-d737-454a-8278-9c56fa52f9d6" }, "version": 3 } ] 

    What is ultimately transformed by the consistent application of

     { "accountId": "admin", "currencyBalances": { "RUB": 600, "EUR": 10 }, "updatedAt": 1400000002, "createdAt": 1400000000 } 

    This takes away from me the opportunity to recalculate the user's balance entirely: I can only add new manipulations, but I cannot change old ones (this is similar to deleting related entities in the RDBMS). Exactly the same way I can’t check if the operation was applied to the balance: I can only get the operationId for a particular manipulation (the manipulations are written as JSON strings, the storage supports searching only by exact values). I cannot indicate all the operationId directly in the collected balance entity, because this is a potentially endless list. In addition to the above minuses, this format implies adding in optimistic locking mode (the next version calculated and used in INSERT ... IF NOT EXISTS ), therefore updates by another process are trivial.

  • Distributed locks are theoretically possible, but they can finally finish off all the performance, I would like to avoid them while it is possible. Putting any flags on the record (eg "this record is currently being edited") is meaningless due to the repository architecture.
  • The application is scaled horizontally, the exact number of nodes at a specific point in time is unknown. There is a possibility that several nodes will perform the logged operation at once; prior to this scenario, this was taken as an inevitable evil, and all operations were simply idempotent.

Thus, I need to make two entries (from the point of view of the application, to perform two manipulations), while I have problems in determining whether the second entry / manipulation was performed (I can get the manipulation on a specific id / version, but I cannot determine its existence by operationId ). Anyway, I need to implement some algorithm of the journaling system, which will pre-record the upcoming operation, and then be able to establish the integrity of the system, regardless of the number of its calls.

At the moment there is an idea of ​​only one algorithm:

  1. It turns out the current version of the balance
  2. The process gets the current operation record. If it contains information that it was fully applied, the process considers that the integrity of the record has been achieved and terminated
  3. The process checks the presence of records in the log table about the expected version (manipulation) of the operation will be reflected in the balance sheet.
    • If such a record exists, the process checks for the presence of the appropriate balance version
      • If such a version (manipulation) exists and it really corresponds to the application of the current operation, the process immediately goes to step 5
      • If such a version exists and it records the use of another operation, the process overwrites the journal entry with the indication of the new version.
      • If this version does not exist yet (and this can only be if the version immediately follows the current one), the process does nothing at this stage.
    • If no such record exists, the process creates it.
  4. The process tries to create a manipulation and tries to apply it to the balance. In case the manipulation fails, it means that the balance version has changed, and the process needs to be started from the beginning.
  5. The process updates the operation, indicating in it that it has been fully applied.

As you can see, the algorithm is absolutely wild, and I’m not so sure about the security of point 3. I'm sure that this is all a bicycle design, and there are much more optimal ways to solve this situation, which, if someone knows them, I would like to hear .

UPD

Thanks to the magical abilities of my colleague (completely absent from me) I managed to find a rational solution and turn the whole task into a simple machine that seems to work. In addition to the above situation, the following conditions are added:

  1. The operation can have the status PENDING (set at creation), PROCESSING, PROCESSED.
  2. The balance keeps a lot of operations in processing; it is believed that in a normally working system this set will not grow indefinitely because of the time frame.

The task is performed as follows (∋ means that the operation is in the list of pending balance operations):

 n | Статус операции | ∋ | Следующий шаг 1 | ? | ? | Чтение баланса 2 | ? | - | Чтение операции 3 | PENDING | - | Добавить операцию в незавершенные 4 | PENDING | + | Сменить статус операции на PROCESSING 5 | PROCESSING | + | Применить операцию, одновременно удалив ее из незавершенных 6 | PROCESSING | - | Сменить статус операции на PROCESSED 7 | PROCESSED | - | Yay! 

The case remained behind the formal proof of the efficiency (or inoperability) of this automaton (and this is bad for me, I do not know how to do this yet). The order of the transitions 0-1 and 1-2 is necessary in order to prevent a second transition 2-3 after the transition 5-6. Updating each entity is atomically, if the entity is in some other state, its version will change, which will reset the algorithm to the beginning.

  • I read it three times and did not understand anything. What is the logical data scheme? These pairs {currency, quantity} many copies? Simply, if they differ only in currency, then their m. hundreds (well, thousands) and it is not clear what the problem is. (and please write easier (not to speak at the Academy of Sciences)). - avp
  • @avp rewrote everything - etki

1 answer 1

maybe Batch transaction will help you.

  • If I understand correctly, I’m afraid I’m not going to help - the batch statement guarantees that all requests will be executed with one timestamp and that they will be repeated until all relevant nodes respond correctly, but they will not save me from being alone the entity has been updated, and the second one is not, and I will again find myself in the state of multiple use of the operation stackoverflow.com/a/30709482/2908793 - etki
  • The description says that the transaction is completely atomic. There are both logged and non-logged transactions. Again, TIMESTAMP can be separately specified for each transaction in a transaction. Here is an example of BEGIN BATCH INSERT INTO purchases (user, balance) VALUES ('user1', -8) USING TIMESTAMP 19998889022757000; INSERT INTO purchases (user, expense_id, amount, description, paid) VALUES ('user1', 1, 8, 'burrito', false); APPLY BATCH; BEGIN BATCH INSERT INTO purchases (user, balance) VALUES ('user1', -8) USING TIMESTAMP 19998889022757000; INSERT INTO purchases (user, expense_id, amount, description, paid) VALUES ('user1', 1, 8, 'burrito', false); APPLY BATCH; - user1919757
  • It can not be atomic by design. Timestamp doesn’t save me: Imagine that I’m doing two identical BATCH STATEMENTs from different ends of the cluster. As a result, the opposite ends of the cluster at this moment contain different states and thus violate the integrity, as well as all my algorithms, which are based on the fact that the use of any new state is atomically and cannot be rolled back. - etki