When working on the current project, a complicated transaction case appeared, which should work smoothly, but in itself is not atomic. The application itself is a small billing service that can horizontally scale and uses one distributed data storage (cassandra).
The application, among other things, operates with the essence of balance:
{ "accountId": "admin", "currencyBalances": { "RUB": 200.22, "USD": 10 }, "createdAt": 0, "updatedAt": 1400000000 } and applied to the balance of the operation of deposit or withdrawal of funds
{ "accountId": "admin", "currency": "RUB", "amount": -100.0 } When an operation is created, it should appear in two places: as a record of the operation itself and as a change in balance. The storage does not allow me to do this atomically, and I need an algorithm that will allow to restore the integrity of the system that falls between two records in the database.
The task itself is trivial (writing about the need to establish integrity for operation X in a log -> creating an X record -> establishing integrity, if an application instance fails, the log of uncommitted operations is read and played -> the log entry is deleted), but I am limited to some Applications and related services:
- The repository has a transactional mechanism only for operations on one key, so I cannot atomically make two entries at once. Because of this, I can not update the balance itself and make an entry in the next table - the system has the right to fall between these two records, as well as the storage may be unavailable when performing one of the records.
All entities are stored as a log of their manipulation (event sourcing). The balance might look like this:
[ { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000000, "currency": "RUB", "amount": 500, "operationId": "408279ae-ee9a-46f3-a9d2-88b0e13d476e" }, "version": 1 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000001, "currency": "RUB", "amount": 100, "operationId": "87fb18b0-3b01-477f-b959-21b22d54af1f" }, "version": 2 }, { "entityType": "doe.john.Balance", "entityId": "admin" "eventType": "doe.john.BalanceChangedEvent", "event": { "timestamp": 1400000002, "currency": "EUR", "amount": 10, "operationId": "464608fd-d737-454a-8278-9c56fa52f9d6" }, "version": 3 } ]What is ultimately transformed by the consistent application of
{ "accountId": "admin", "currencyBalances": { "RUB": 600, "EUR": 10 }, "updatedAt": 1400000002, "createdAt": 1400000000 }This takes away from me the opportunity to recalculate the user's balance entirely: I can only add new manipulations, but I cannot change old ones (this is similar to deleting related entities in the RDBMS). Exactly the same way I can’t check if the operation was applied to the balance: I can only get the
operationIdfor a particular manipulation (the manipulations are written as JSON strings, the storage supports searching only by exact values). I cannot indicate all theoperationIddirectly in the collected balance entity, because this is a potentially endless list. In addition to the above minuses, this format implies adding in optimistic locking mode (the nextversioncalculated and used inINSERT ... IF NOT EXISTS), therefore updates by another process are trivial.- Distributed locks are theoretically possible, but they can finally finish off all the performance, I would like to avoid them while it is possible. Putting any flags on the record (eg "this record is currently being edited") is meaningless due to the repository architecture.
- The application is scaled horizontally, the exact number of nodes at a specific point in time is unknown. There is a possibility that several nodes will perform the logged operation at once; prior to this scenario, this was taken as an inevitable evil, and all operations were simply idempotent.
Thus, I need to make two entries (from the point of view of the application, to perform two manipulations), while I have problems in determining whether the second entry / manipulation was performed (I can get the manipulation on a specific id / version, but I cannot determine its existence by operationId ). Anyway, I need to implement some algorithm of the journaling system, which will pre-record the upcoming operation, and then be able to establish the integrity of the system, regardless of the number of its calls.
At the moment there is an idea of only one algorithm:
- It turns out the current version of the balance
- The process gets the current operation record. If it contains information that it was fully applied, the process considers that the integrity of the record has been achieved and terminated
- The process checks the presence of records in the log table about the expected version (manipulation) of the operation will be reflected in the balance sheet.
- If such a record exists, the process checks for the presence of the appropriate balance version
- If such a version (manipulation) exists and it really corresponds to the application of the current operation, the process immediately goes to step 5
- If such a version exists and it records the use of another operation, the process overwrites the journal entry with the indication of the new version.
- If this version does not exist yet (and this can only be if the version immediately follows the current one), the process does nothing at this stage.
- If no such record exists, the process creates it.
- If such a record exists, the process checks for the presence of the appropriate balance version
- The process tries to create a manipulation and tries to apply it to the balance. In case the manipulation fails, it means that the balance version has changed, and the process needs to be started from the beginning.
- The process updates the operation, indicating in it that it has been fully applied.
As you can see, the algorithm is absolutely wild, and I’m not so sure about the security of point 3. I'm sure that this is all a bicycle design, and there are much more optimal ways to solve this situation, which, if someone knows them, I would like to hear .
UPD
Thanks to the magical abilities of my colleague (completely absent from me) I managed to find a rational solution and turn the whole task into a simple machine that seems to work. In addition to the above situation, the following conditions are added:
- The operation can have the status PENDING (set at creation), PROCESSING, PROCESSED.
- The balance keeps a lot of operations in processing; it is believed that in a normally working system this set will not grow indefinitely because of the time frame.
The task is performed as follows (∋ means that the operation is in the list of pending balance operations):
n | Статус операции | ∋ | Следующий шаг 1 | ? | ? | Чтение баланса 2 | ? | - | Чтение операции 3 | PENDING | - | Добавить операцию в незавершенные 4 | PENDING | + | Сменить статус операции на PROCESSING 5 | PROCESSING | + | Применить операцию, одновременно удалив ее из незавершенных 6 | PROCESSING | - | Сменить статус операции на PROCESSED 7 | PROCESSED | - | Yay! The case remained behind the formal proof of the efficiency (or inoperability) of this automaton (and this is bad for me, I do not know how to do this yet). The order of the transitions 0-1 and 1-2 is necessary in order to prevent a second transition 2-3 after the transition 5-6. Updating each entity is atomically, if the entity is in some other state, its version will change, which will reset the algorithm to the beginning.