Rama: How to achieve a transactional update across partitions
While implementing my pet project on the mind-bending Rama platform, I ran into the classical need for an atomic update of a set of states, possibly at different partitions / machines. Let’s see what are our options for implementing that.
Note: You should be familiar with the basics of Rama, such as the concepts of a PState and a depot, and how all the data is partitioned across the Rama cluster.
After this post has been published, Rama docs got the page ACID semantics which goes into great detail about atomicity, concurrency, isolation, and durability in Rama. I highly recommend reading that page. |
The case
I’m building a CRUD application for "Component" entities. A Component may have another Component as its :parent
. When that happens, I want transactional semantics to ensure data integrity. Namely:
I want to check that the parent component exists (which happens at the parent’s partition)
I want to add the new component to the parent’s children set (still the parent’s partition)
I want obviously to persist the child (at the child’s partition)
The first two steps related to the parent happen on its partition, which may differ from the child’s. Multiple things could fail here, for instance:
The parent could have been deleted after I checked it but before (or even after) I persist the child
The creation of the child could fail, if it fails to satisfy business rules - and the parent might thus end up pointing to a non-existent child
I do not want either a parent or a child pointing to something that does not exist.
Background
Rama’s ETL topologies are triggered by depot appends. Rama guarantees that appends to a particular partition of a particular depot will be processed in the order of arrival. Appends to different partitions or to different depots on the same partition (~ machine) do not have any such guarantees.
A single depot append may result in a tree of events and is only fully processed after they all finished successfully. What is an event? It is all the code in between two async boundaries: “An async boundary is where code moves to another task, or where a remote operation is performed. Async boundaries in Rama can be is one of partitioners, localSelect calls on mirror PStates, or calls to external queues or databases.”
All writes within a single event are atomic - i.e. all writes to any number of PStates (remember, all at the same partition) are made visible at the same time. At least for stream topologies - in microbatching, every PState update across all partitions is atomic.
This means that if a stream topology writes to multiple partitions, these writes become visible at different times, and not all at once. So what do we do if we want transactional semantics across multiple partitions?
Solution 1: Microbatching
As Nathan pointed out, every PState update across all partitions is transactional in microbatching, which solves my need perfectly.
On microbatch topologies
A microbatch iteration can take anywhere from a couple hundred milliseconds to many seconds, depending on the complexity of processing and the amount of incoming data.
(I assume this can be tuned with options such as depot.microbatch.max.records
.)
They [microbatch topologies] have significant additional capabilities for expressing computations [such as batch blocks], different performance characteristics, and simple exactly-once fault-tolerance semantics. [I.e. an all-or-nothing atomicity.]
This means all changes to all PStates on a task in a microbatch become visible at the exact same time (though changes on different tasks [~ machines] may become visible at slightly different times).
“If you do depot appends as part of your microbatch topology [..], those currently do not have exactly-once semantics in the face of failures and retries. However, this is on our roadmap.” [As of March 2024.] |
Unless you require millisecond-level update latency for your PStates, you should generally prefer microbatch topologies. They have higher throughput and simpler fault-tolerance semantics than stream topologies.
Solution 2: Streaming
Let’s (artificially) assume that component creation needs very short response times and thus we need to use a streaming topology. Here is a possible solution (where [xxx]
denotes a partition):
[parent] Check that the parent exists, and add the child’s id to the
parent→future-children
PStateWhen you ask a parent about its children, this PState is ignored but if the parent is being deleted, it also schedules the deletion of its future children, by appending them to the appropriate depot
Here it could be beneficial to have creates and deletes in the same depot, so that we do not risk the delete being processed before the create finishes (and thus failing to delete the not-yet-existing child) This is reportedly a best practice anyway.
[child] Check and persist the child
[parent] Based on the situation:
If the child creation succeeded and the parent still exists then move the child to the
parent→children
PStateHere we have a tiny moment where a child exists but its parent doesn’t show it yet, but that’s OK, the eventual consistency here is not a problem for me
If the parent has been deleted in the meantime, then a removal of the child is already scheduled. The child may appear to some clients for a brief moment.
If the child creation failed, remove the child from the
parent→future-children
PState
Open questions
How to test different "interleavings" of events, to make sure I never get into a state that would violate data integrity? RPL has a fascinating blog post about testing concurrent systems but it is not clear to me whether/how I could leverage that for my tests.