Rama: How to achieve a transactional update across partitions

While implementing my pet project on the mind-bending Rama platform, I ran into the classical need for an atomic update of a set of states, possibly at different partitions / machines. Let’s see what are our options for implementing that.

Note: You should be familiar with the basics of Rama, such as the concepts of a PState and a depot, and how all the data is partitioned across the Rama cluster.

After this post has been published, Rama docs got the page ACID semantics which goes into great detail about atomicity, concurrency, isolation, and durability in Rama. I highly recommend reading that page.

The case

I’m building a CRUD application for "Component" entities. A Component may have another Component as its :parent. When that happens, I want transactional semantics to ensure data integrity. Namely:

  1. I want to check that the parent component exists (which happens at the parent’s partition)

  2. I want to add the new component to the parent’s children set (still the parent’s partition)

  3. I want obviously to persist the child (at the child’s partition)

The first two steps related to the parent happen on its partition, which may differ from the child’s. Multiple things could fail here, for instance:

  • The parent could have been deleted after I checked it but before (or even after) I persist the child

  • The creation of the child could fail, if it fails to satisfy business rules - and the parent might thus end up pointing to a non-existent child

I do not want either a parent or a child pointing to something that does not exist.

Background

Rama’s ETL topologies are triggered by depot appends. Rama guarantees that appends to a particular partition of a particular depot will be processed in the order of arrival. Appends to different partitions or to different depots on the same partition (~ machine) do not have any such guarantees.

A single depot append may result in a tree of events and is only fully processed after they all finished successfully. What is an event? It is all the code in between two async boundaries: “An async boundary is where code moves to another task, or where a remote operation is performed. Async boundaries in Rama can be is one of partitioners, localSelect calls on mirror PStates, or calls to external queues or databases.”

All writes within a single event are atomic - i.e. all writes to any number of PStates (remember, all at the same partition) are made visible at the same time. At least for stream topologies - in microbatching, every PState update across all partitions is atomic.

This means that if a stream topology writes to multiple partitions, these writes become visible at different times, and not all at once. So what do we do if we want transactional semantics across multiple partitions?

Solution 1: Microbatching

As Nathan pointed out, every PState update across all partitions is transactional in microbatching, which solves my need perfectly.

On microbatch topologies

A microbatch iteration can take anywhere from a couple hundred milliseconds to many seconds, depending on the complexity of processing and the amount of incoming data.

(I assume this can be tuned with options such as depot.microbatch.max.records.)

They [microbatch topologies] have significant additional capabilities for expressing computations [such as batch blocks], different performance characteristics, and simple exactly-once fault-tolerance semantics. [I.e. an all-or-nothing atomicity.]

This means all changes to all PStates on a task in a microbatch become visible at the exact same time (though changes on different tasks [~ machines] may become visible at slightly different times).

“If you do depot appends as part of your microbatch topology [..], those currently do not have exactly-once semantics in the face of failures and retries. However, this is on our roadmap.” [As of March 2024.]

Unless you require millisecond-level update latency for your PStates, you should generally prefer microbatch topologies. They have higher throughput and simpler fault-tolerance semantics than stream topologies.

Solution 2: Streaming

Let’s (artificially) assume that component creation needs very short response times and thus we need to use a streaming topology. Here is a possible solution (where [xxx] denotes a partition):

  1. [parent] Check that the parent exists, and add the child’s id to the parent→future-children PState

    1. When you ask a parent about its children, this PState is ignored but if the parent is being deleted, it also schedules the deletion of its future children, by appending them to the appropriate depot

      1. Here it could be beneficial to have creates and deletes in the same depot, so that we do not risk the delete being processed before the create finishes (and thus failing to delete the not-yet-existing child) This is reportedly a best practice anyway.

  2. [child] Check and persist the child

  3. [parent] Based on the situation:

    1. If the child creation succeeded and the parent still exists then move the child to the parent→children PState

      1. Here we have a tiny moment where a child exists but its parent doesn’t show it yet, but that’s OK, the eventual consistency here is not a problem for me

      2. If the parent has been deleted in the meantime, then a removal of the child is already scheduled. The child may appear to some clients for a brief moment.

    2. If the child creation failed, remove the child from the parent→future-children PState

Stream topologies must be retriable

Any part of a distributed computation such as a stream topology may fail. Rama solves that by retrying such a topology from scratch. Therefore the topology must be idempotent, i.e. it must be safe to run it multiple times, and it must be able to pick up from where it failed. This implies that we must modify data in the right order. In my case, when deleting a parent, I may only delete the $$parent→children entry after the successful deletion of all the children. (And make sure that an attempt to delete a deleted child does nothing.)

Open questions

How to test different "interleavings" of events, to make sure I never get into a state that would violate data integrity? RPL has a fascinating blog post about testing concurrent systems but it is not clear to me whether/how I could leverage that for my tests.

Summary

If you want an atomic update across multiple partitions / machines, your best solution is using a microbatching ETL topology. If you need few-ms response time and thus require a streaming topology, then you need to plan your updates carefully and accept some amount of eventual consistency.


Tags: rama


Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob