Hands on Rama, day 2: Rewrite CAS, finish basic C(R)UD

My adventures with Rama continue from day 1. In the second day, I rewrote the compare-and-set logic to atomically succeed only if all field edits were ok (instead of on per-field basis), and I finished my basic, idempotent C(R)UD, with full test coverage. I have re-learned that no special forms may be used in dataflow code. Instead, I may write top-level functions or create anonymous rama fns via <<ramafn. I have also been reminded that code called from dataflow must not throw exceptions.

I actually took a long break between my first and second day of coding, because the first one taught me that I really needed to understand Rama in much more detail. So I have read essentially the whole Java-focused docs, as well as Clojure ns docstrings, especially those for paths. Since I am me, I forgot a lot and will need to re-read it, but anyway writing Rama is now much simpler.

Overall, it was pretty smooth sailing, even though I had some rough spots, as you can read below.

Code highlights

I wanted a local-select returning a select-keys of an entity, possibly just {}, or - when the entity in question didn’t exist - nil . But select-keys never returns nil, and wrapping it with not-empty doesn’t return {}. I’ve tried to modify the path to use must, but that returns nothing - a select is a Rama fragment and thus can emit 0, 1, or many times - and if must is not satisfied then it doesn’t emit at all. So I ended up writing a custom some-select-keys fn (see the point about no special forms above).

The emit syntax can also do destructuring on both maps and sequences, like here:

(source> *component-edits :> {:keys [*_id *edits]}) ; for :_id, :edits
(edits->before+after *edits :> [*before *after]) ; on a sequence

Leverage identity to transform a dataflow var with a fn into the same or new var:

(identity (Math/floor (+ *one 0.1)) :> *one)

Notice that this is not necessary for clojure’s own fns, as Rama bundles a custom build of Clojure, extended to support dataflow:

(= *before *existing :> *unchanged-since-read?)

Paths are powerful. For the partial update implementation, I wanted to replace a subset of a map with new values, and submap allows me to do just that:

;; Here, *after is a subset of a component, but with new values:
(local-transform> [(keypath *_id) (submap (keys *after)) (termval *after)] $$component-by-id)

Other experiences

Good: Leveraging rtest/create-test-pstate and dataflow ?← for a quick feedback when experimenting with dataflow code, without the need to run the whole module.

Bad: How to get "select-keys <relevant>" from a PState? I know view but (local-select> [(keypath (uuid 1)) (view #(select-keys % (keys *before)))] ps :> *existing) fails with a mysterious exception. Is there some Specter thingy I should be using instead? The compile-time stacktrace is of no use.

  • A: As discussed in docs and slack, no special forms can be used in dataflow code, including anonymous lambdas - either I need to declare a top-level named function, or perhaps leverage <<ramafn

  • Possible solution:

    (<<ramafn %sel-ks [*m] (:> (select-keys *m (keys *before))))
    (local-select> [(keypath (uuid 1)) (view %sel-ks)] ps :> *existing)

    But even better, with latest Rama, view can take extra args, so I can write: (local-select> [(keypath (uuid 1)) (view select-keys (keys *before))] ps :> *existing). (Though I ended up with a custom function anyway, since I also needed some→.)

Lesson learned: Avoid exceptions at all cost (e.g. from an assert, division by zero, or whatever). They turn the data into a "poison pill", as the topology will keep retrying to process it until you notice it through telemetry and deploy a new, fixed version. The dataflow language has no try-catch. Verify data is valid when entering the system, do rigorous testing, and when necessary, wrap individual function bodies with try-catch.

Trying to figure out how to translate a Java Path to remove a top element, since there is no obvious termVoid equivalent. I guessed at NONE, but that fails in dataflow, have been advised to use NONE> instead.

Aside: What do the << and > in Rama names mean?

Operations starting with << are "block operations" that can’t be used as an expression

An operation ending with > doesn’t have a very consistent meaning. Sometimes it’s just to distinguish from the analogous Clojure operation, like filter> vs. filter. Other times it refers to something being not function-like (e.g. select> or if>). Basically [it] just means it’s an operation meant for dataflow code.

The code

The code is under the day2 tag in ardoq-rama-poc.

Highlights from the docs

While studying the docs, I have noted down a number of interesting tidbits.

Rama in general

The applications that benefit the most from Rama are those with complex domain models, large scale, or both.

Rama integrates and generalizes data ingestion, processing, indexing, and querying. The "integrates" aspect means it encompasses a suite of responsibilities that aren’t normally packaged in a single tool. The "generalizes" part means Rama is unrestricted in the kinds of applications it can build, including applications that require huge scale. This means Rama can build complex application backends on its own in a small amount of code that previously required dozens of tools and a lot of code. In many cases the code difference is over 100x.

Rama enables you to build data systems. They answer questions with the appropriate timeliness, accuracy, latency, and throughput, and have some scalability.

The key breakthrough of Rama is generalizing and integrating every aspect of data systems to such an extent that you can build entire data systems end-to-end with just Rama.

Rama was created in response to the proliferation of low-cost distributed computing resources. The Rama platform provides an execution environment [the cluster] and a compiler for that environment that enables you to write distributed applications without having to manage the underlying resources.

If a cluster is the execution environment, a module is the executable. You create modules to do work. We can understand modules in terms of their definitions and their runtime instantiation.

The idea here is there’s a relationship between an execution environment and languages that let you write applications for that environment. Execution environments provide resource abstractions: OS’s provide processes, files, file systems, sockets, a universal IO model, etc; Rama provides depots, partitioned states, distributed computations etc. Languages give you a way to manipulate those resources.

But the broader conception of performing work in Rama is that you’re coordinating distributed tasks across an arbitrary number of processes and machines to produce useful, distributed views of data.


PStates are a little similar to tables in an RDBMS in that both are named, discrete data containers within a larger data management system. They differ from tables in that they are arbitrarily compound data structures of arbitrary size, meaning that you can use them to store as much data as you need, organized however you need.

PState capabilities include "subindexing", which allows inner data structures in PStates to be of huge size, and "fine-grained reactive queries", which allows for continuous queries that are pushed "diffs" about changes to the result of the query.

Distributed programming in Rama: architecture

At a high level, Rama is designed to run multiple copies of a module — called tasks — and distribute work among these copies. When you deploy a module, you specify how many tasks to run and how to distribute the tasks across resources: you could run multiple tasks within one JVM process, within multiple JVM processes on a single machine, or within JVM processes across multiple machines. Tasks run on threads within a single JVM, and a single thread can run multiple tasks. These threads are called "task threads".

1 worker is exactly for 1 module, and the module may have multiple workers (on different JVMs).

Rama has no guarantees about the global order in which depot records are processed, so this hypothetical module design [of a random depo partitioner + ETL which hashes on the data] introduced the possibility for race conditions - any 2 updates for X will be relocated to the same partition, but in a random order, since they arrive to different partitions and the time it takes to relocate over the network to the target one has a random aspect.

Why distributed systems? For performance, resiliency, perhaps [not in the case of Rama] separation of concerns.

A RamaOperation is more general than a RamaFunction, capable of emitting multiple times, doing work between emits, emitting to multiple streams, and emitting multiple fields in a single emit.

Depot append uses AckLevel.ACK (which is the default if not specified). With this ack level the depot append call will only complete when all streaming topologies co-located with the depot have finished processing the data.

Batch blocks

Batch blocks: while normal dataflow is rather imperative, this is partially declarative, at a higher abstraction level & Rama decides a sequence of operations. It offers functionality including inner joins, outer joins, two-phase aggregation, and the ability to coordinate around batches of computation. Query topologies are implicitly batch blocks, and you can use batch blocks in microbatch topologies, though not in stream ones. A batch block runs in three phases: the "pre-agg phase", the "agg phase", and the "post-agg phase" - depending on where you put an .agg or .compoundAgg call(s). A final partitioner, if any, must be declared before the agg phase. Subbatches allow batch blocks to consume the results of other batch blocks ⇒ can do aggregates of aggregates.


Aggregators: An alternative to Paths for updating PStates, which enable huge increases in performance and expressivity in some cases. Two kinds of aggregators: more generic accumulators and more limited combiners.

Contrary to paths, aggregators know how to initialize values that don’t exist. Combiners allow parallelization ("two-phase aggregation", for combiners in batch blocks / query topos, especially impactful for global agg). Accumulators may take any number of args, Combiners always take the current + one new.

Other batch features: capture which PState entries changed.

A batch feature mostly useful in queries: use .agg without a PState argument and with .out to get the aggregate. In microbatch, aggregators can also .out("$$aTmpPState") (similar to .materialize used there w/o aggs).

Query topologies

All query topologies must contain .originPartition (= back to the query’s original task) as the final partitioner of the computation. The .out must be emitted exactly once.

Without subindexing, the entire data structure will be stored and retrieved as a single value. This will get expensive once there’s even just a few hundred elements in it. Subindexing enables inner data structures to efficiently contain huge numbers of elements, even more than could fit into memory.

Query topologies are implicitly batch blocks. So when programming them you’re always thinking in terms of pre-agg, agg, and post-agg phases. All the power of batch blocks, including joins, subbatches, and two-phase aggregation, is available.

The code

The code is under the day2 tag in ardoq-rama-poc.

Tags: experience rama
Related: Hands on Rama, day 3: Foreign keys and data integrity, macros, queries Hands on Rama, day 1: Setup, idempotent create & update

Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob