Hands on Rama, day 3: Foreign keys and data integrity, macros, queries

The adventure with Rama continues! In the previous installment, I have created a simple C(R)UD for "components". Today, we will spice it up and add a foreign key and data integrity maintenance. Namely, a component can have a parent, which can also have a parent, etc.

I learn about transactional updates, about writing query topologies, and about code reuse with Rama segmacros. As usually, I excel at running into problems we can all learn from 😅.

I will implement two features with respect to parent:

  1. Ensuring that parent is a valid component ID when creating or updating the value (it exists, there is no cycle)

  2. A query to retrieve the full hierarchy of a component’s descendants ancestors

    • I may want to have a computed property has-children? to know whether to check for descendants helper PState, $$children

  3. When deleting a parent component, deleting the whole hierarchy of its descendants TBD later

    • Likely, I need to check for children upon delete (could have got some while the deletion was issued)

    • Should I check that it hasn’t got any new descendants since a user approved the deletion? Or perhaps we should delete all approved and new anyway? Or support a force option to delete also potential new descendants without approval?

Core design

I already have an id → component PState and likely would want a comp-id → #{children IDs} one. Rama by default pre-computes the length of sub-indexed subsequences, which would be helpful for has-children?, if I want to subindex it (i.e. if it is expected to tend to have over 100 elements - which I don’t think is the case, though it could happen occasionally.) I could also maintain another PState, comp-id → count-of-children…​ . Though not sure count of children is actually ever interesting. Count of all descendants could be, but that would also be more expensive to maintain. A simple has-children? might be enough.

IMO subindexing doesn’t make sense here - I expect far only units or 10s of children on average, and mostly I will want to read all of them anyway. It could be interesting to store the parent-child relationship as a tree, though I don’t think that is possible, since I’d essentially need a recursive schema. I guess I could cheat and use (map-schema String clojure.lang.PersistentArrayMap) or something, but updating it might be expensive, and finding a particular place in the tree (i.e. the subtree for a particular descendant) would be inefficient, requiring the whole root component’s tree to be read.


Calculating descendant subtree - is Rama smart enough to do this efficiently, i.e. in each iteration compute children at level N (on each node in parallel), then combine and redistribute the output so that all grand-children that hash to the same partition are loaded at the same time (instead of each being processed separately)?


Questions explored

  1. How to create a conditional sub-tree of computation? (I.e. if the input has :parent, go and check it) A: <<if and return *error in both branches, but nil if ok.

  2. How to return early from the topology when the input is invalid? (E.g. the provided parent id does not exist.) A: You can’t. See above - set an error var, then check for it at the end.

  3. How to check for existence? (I.e. "Does the given parent to be set actually exist?") A: (thx, Nathan!): use (view …​) as the whole path: (local-select> (view contains? :x) $$ps :> *present?)

    • BEWARE: This doesn’t work with select> b/c it wouldn’t know what to partition to (it needs a (keypath …​) for that, I assume). So I need to manually use |hash + local-select>.

    • NOTE: local-select> with keypath returns nil upon no match (while with must it would not emit at all), so I can do (<<if *the-result …​.) later on.

  4. How to select on a potentially different partition and then come back? A: select> to go there and access data, then (|hash …​) to come back.

  5. How do I add an element to a (possibly nonexistent) set? A: (local-transform> [(keypath *parent) NONE-ELEM (termval *comp-id)] $$children)

  6. Reusability: how to make a valid-parent? fragment that I could use both in the create and update flows? See the defbasicblocksegmacro parent-error below.

  7. Query topologies - creating them and using them in ETLs (to perform data validity checks) - see below


  • ifexpr can be used with :> and a missing then-branch will produce nil, just like in clj: (ifexpr false "…​" :> *error) ; *error is nil

  • From the docs: “[..] the body [of a loop] can call continue> multiple times in one iteration. An example use case where this comes up is a query topology doing a parallel graph search, where each iteration of the loop continues along all outgoing edges.” This sounds very powerful, compared to ordinary Clojure…​

Code reuse: deframafn vs. segmacro

I wanted to factor out parent-error to check whether component’s :parent exists, if provided. But I learned I cannot use deframafn / deframaop for that because neither may contain partitioners. The solution is either to use the inline <<ramafn, which did not fit here, or a segmacro.

Pitfalls I encountered:

  • Segmacros run at compile time and must not contain any nested function calls (obviously 😅), unless you want them evaluated at compile time. You can turn (f …​) into (seg# f …​) to postpone its evaluation until runtime.

  • Keywords cannot be used as fns, so (seg# :kwd thing) is invalid, you need to use (seg# get thing :kwd) instead

  • Partitioners cannot be used in pure dataflow (?← …​), need to actually run the module for that

Many thanks to Nathan for all the help!

Here is the code:

Segmacro for checking parent for validity, if set
(defbasicblocksegmacro parent-error
  "Check the parent exists and that there is no loop in the hierarchy,
  return error data if not."
  [component component-by-id :> maybe-error] ; args
  [[get component :parent             ; (1)
     :> '*parent#]                    ; (2)
   [get component :_id :> '*self#]
   [<<if (seg# not '*parent#)         ; (3)
    [identity nil :> maybe-error]
    [|hash '*parent#] ; I'm using a partitioner => can't use ramafn, need a segmacro
    [local-select> (seg# view contains? '*parent#) component-by-id :> '*parent-exists?#]
    [|hash '*self#] ; come back to the original partition
     [case> (seg# not '*parent-exists?#)]
     [identity {:error "The parent entity does not exist" :data {:parent '*parent#}} :> maybe-error]

     [case> (seg# = '*parent# '*self#)]
     [identity {:error "Can't be ones own parent" :data {:parent '*parent#}} :> maybe-error]

     [invoke-query "ancestor?" '*self# '*parent# :> '*loop?#]
     [ifexpr '*loop?# {:error "Ancestor loop" :data {:parent '*parent#}} :> maybe-error]]]])
1Keywords can’t be used as functions, need get instead. Notice that segmacros use code as data, that’s why we see [ instead of (.
2Similarly to Clojure macros, we use '*name# to generate a unique Rama var (notice the leading quote and trailing hash)
3The [ segment vectors don’t nest, with the exception of block expressions segments such as <<if. Nested expressions need to be "postponed" with seg#.

Query topologies

Goal: When setting a parent, check that it won’t create a loop in the hierarchy.

Extended goal: Given a child id, return the chain of its ancestors, starting with its parent. Given the capabilities of Rama, it should be simple, if not trivial, to compute ancestor chains for any number of children in parallel. I’ve actually decided to start with this, because it is interesting and creating a simpler version of the original need should then be simple.

Off to re-read Query topologies! (Well, right after I refactor all links in this document with help of Asciidoc attributes, and find out how to make Vivaldi force RPL docs into dark mode 😅)

Idea: Use a temporary PState or something similar with input-child-id → ancestors. After each step, come back to the input id’s partition and append the new ancestor. Return the PState value as the output.


  • Define, remotely invoke a query topology

  • Do not confuse local-select> and select>, it is the latter that does change partition 😅

  • How to use loop←

  • I have quite struggled with writing the code to collect all the ancestors. I’ve ended up with a DIY solution where the loop passes an accumulator for the ancestors around and only emits once it is done. (Which is a good solution, according to Nathan.) I would have preferred to emit every parent found from the loop, and use something like aggs/+vec-agg to collect them, but couldn’t figure out how to (Nathan has later demonstrated it, but it loses ordering). It also took a while to figure out that I first need to go back to the origin partition and then use aggs/+map-agg to combine all the pairs of child + its ancestors into a single output map. (I originally wrote (identity {*child (not-empty *ancestors)} :> *child→ancestors), but that emitted a single-element map for each child, while queries must produce exactly one output.)

  • After I implemented the ancestors query returning a components ancestors, I wrote ancestor? from scratch, for checking whether a component is an ancestor of another one, using a recursive query, as this was much simpler.

Other ideas considered:

If the order of ancestors wasn’t important (it is), then Nathan suggested the following neat solution (notice the loop emits each parent, and we use a compound aggregation on the origin partition):

Unordered ancestors list query (Nathan)
(<<query-topology topologies "ancestors"
  [*children :> *child->ancestors]
  (ops/explode *children :> *child)
  (loop<- [*child *child :> *ancestor]
    (select> [(keypath *child :parent)] $$component-by-id :> *parent)
    (<<if *parent
      (:> *parent)
      (continue> *parent)))
  (+compound {*child (aggs/+vec-agg *ancestor)} :> *child->ancestors))
Ordered ancestors list query (me)
(<<query-topology topologies "ancestors"
  [*children :> *child->ancestors] ; input :> output
  (ops/explode *children :> *child)
  (loop<- [*child *child, *ancestors [] :> *ancestors] ; 2 vars, 1 output
    (select> [(keypath *child :parent)] $$component-by-id :> *parent); (1)
    (<<if *parent
      (conj *ancestors *parent :> *ancestors)
      (continue> *parent *ancestors)                                 ; (2)
      (:> *ancestors)))                                              ; (3)
  (aggs/+map-agg *child (not-empty *ancestors) :> *child->ancestors))
1select> moves us to the child’s partition and gets its parent
2Dataflow’s recur
3Emit the whole vector as the sole output of the loop
Is needle an ancestor of child? (Recursive query)
(<<query-topology topologies "ancestor?"
  [*needle *child :> *ancestor?]
  (|hash *child) ; (1)
  (local-select> [(keypath *child :parent)] $$component-by-id :> *parent)
    (case> (nil? *parent))
    (identity false :> *ancestor?)
    (case> (= *needle *parent))
    (identity true :> *ancestor?)
    (default>) ; we've a parent ≠ needle, recurse
    (invoke-query "ancestor?" *needle *parent :> *ancestor?))
1The "Leading partitioner" query optimization ⇒ can’t use select> but need to have explicitly a partitioner and then a local select.

Tip: One entity = one depot

I’ve started by having 3 separate depots for Component Create, Update, and Delete operations, b/c that is what I saw in some of the examples. However, it seems cleaner to me to have just a single one, as it will then provide a single source of truth of the entity. Nathan approves:

yes, in general it’s better to have the same entity managed through the same depot, particularly updates and deletes. Putting creates and updates on different depots usually won’t have ordering problems because in most apps you can’t update or delete something until it’s been created

foreign-append! returns after topologies finish, even if they move to other partitions

The foreign-append! docstring reads “waits for data to be appended and replicated to depot partition and for all colocated stream topologies to finish processing it”, which I misunderstood as "the processing on the local partition". But as Nathan kindly explained, the append call only returns after the topology has completely finished, even if it is using partitioners or doing mirror calls.

From the docs

About Rama’s Clojure Dataflow language

Dataflow code consists of a sequence of "segments", analogous to a "form" in Clojure (since Rama dataflow is still Clojure, segments are also technically forms). A segment consists of an operation, input fields, and any number of "output declarations". An "output declaration" begins with an "output stream" followed by an optional "anchor" and any number of "variables" to bind for emits to that stream. Here are some examples of segments:

(+ 1 2 3 :> *sum) ; output 6 into the default stream as *sum

;; output streams :>, :a>, and :b>
(bar :a> <aaa> *v1 *v2 ; emit 2 fields to the stream a, anchor aaa
  :b> <anchor-b> *v2 ; emit a field, anchor anchor-b
  :> *a) ; emit a field to the default stream

(println "Hello") ; 0 output declarations

A "variable" is a symbol beginning with *, %, or $$. * signifies a value, % signifies an anonymous operation, and $$ signifies a PState.


I’ve expanded my knowledge of Rama considerably, with queries, loops, segmacros, and a number of small learnings. The application can now manage the :parent foreign key, and return a list of ancestors for any number of children.

Next time I’d like to finish this by deleting all descendants when a parent is deleted, and providing a query returning the count of descendants and a list of direct children.

The code

The code is under the day3 tag in ardoq-rama-poc.

Tags: experience rama
Related: Hands on Rama, day 2: Rewrite CAS, finish basic C(R)UD Hands on Rama, day 1: Setup, idempotent create & update

Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob