Error handling in Clojure Core.Async (and its higher-level constructs)

  1. Summary of key points
  2. Error handling
    1. Strategies
    2. NOTE: A default uncaught error handler
    3. Error handling in core.async constructs
      1. Error handling in go loops
      2. Error handling in channel transducers
      3. Error handling in transduce and reduce
        1. Tip 1: use halt-when to stop processing upon the first exception
        2. Tip 2: Prepend a transducer that turns exceptions into anomalies
      4. Error handling in pipelines
    4. A complete error handling example
  3. Aside: Why does everything return a channel?
  4. Cleanup
    1. Garbage Collection
  5. Resources

Error handling is something that core.async leaves to you (because it depends on your particular use case). People have written before about handling errors in the low-level go loops but there is little about the higher level constructs such as pipelines and transducers. I’d like to correct that. First, a little repetition.

(I focus here on Clojure but most applies also to ClojureScript.)

Disclaimer: I am no expert, this is a result of my study and exploration. No guarantees :-)

Summary of key points

If you are short on time, these are the main points. Details follow.

  1. By default, errors are printed to standard error and the output channels are closed / items are skipped

  2. A popular solution is to catch exceptions and send them on as recognizable data, as “anomalies”

  3. Wrap the content of go/go-loop with try-catch

  4. Upon reading from a channel, check whether you get data or an “anomaly” and re-throw/handle appropriately

  5. Use chan’s and pipeline’s ex-handler parameter to customize error handling

  6. You can leverage halt-when to stop transduction upon the first exception (if that’s what you want)

  7. Compose your transducer with an error-catching transducer (which will thus wrap both the transformation and the reduction)

  8. Aside: Make sure to clean up all resources to avoid leaks and deadlocks

Error handling

Strategies

What can you do when processing a value on a channel fails? You can ignore the error, propagate it downstream, or report it elsewhere. You can then continue processing other values or abort.

The most popular way to process errors is to propagate them downstream. But how do you distinguish errors and regular values in downstream processing?

  1. Send them as a distinct type such as java.lang.Throwable (or js/Error in ClojureScript). This is trivial since you normally already have an exception (i.e. a subclass of Throwable). The processor can then switch on (if (instance? Throwable val) ...).

  2. Wrap it in a map with special keys such as {:cognitect.anomalies/category :cognitect.anomalies/interrupted, :cognitect.anomalies/message (ex-message e) ...} (Ex.: Datomic mbrainz-importer.) You can then check it downstream with (if (:cognitect.anomalies/category val) ...). (Notice that keywords work also on most non-map types so it is OK to process [1 2 <error map> 4].) This seems to be quite popular and doesn’t depend on Java exceptions.

  3. (Wrap every value in some container that knows whether the content is a value or an error: [ [:val 32], [:error ...], ...].)

Instead of propagating an error, you could report it somewhere else - log it, send it to an error tracking service, or send it to a dedicated error channel.

NOTE: A default uncaught error handler

go blocks and other async constructs run on helper threads. Java already has a mechanism for handling exceptions in non-main threads: the default or thread’s UncaughtExceptionHandler. In core.async the default one prints the error to stderr. You can set your own with (Thread/setDefaultUncaughtExceptionHandler eh)

Error handling in core.async constructs

Error handling in go loops

Default behavior: Same - the exception is printed to stderr and a closed channel is returned.

;; Note that inc fails on nil:
(a/<!!
   (a/go
     (let [ch (doto (a/chan) a/close!)]
       (inc (a/<! ch)))))
; => nil
; [STDERR] Exception in thread "async-dispatch-7" java.lang.NullPointerException
; [STDERR] at clojure.lang.Numbers.ops(Numbers.java:1068) ...

As proposed by David Nolen, use (go (try …​ (or go-safe) and a <? macro that re-throws if the item is an exception.

Error handling in channel transducers

You can create a channel with a transducer that will process each value sent to it. What happens if the transducer throws an exception?

Default behavior: Pass exceptions to Thread/defaultUncaughtExceptionHandler (which prints them) skipping this item and continue processing.

(<!! (a/into [] (doto (chan 3 (map inc))
                     (>!! 0)
                     (>!! "boom!")
                     (>!! 2)
                     (a/close!))))
; => [1 3]
; [STDERR] Exception in thread "..." java.lang.ClassCastException: ...

If you want to handle the exceptions yourself, use the 3-arity (chan buf-or-n xform ex-handler) to supply a custom exception handler:

(<!! (doto (chan 3 (map inc) (fn [exc] :error))
         (>!! "boom!")
         (a/close!)))
; => :error

(If the ex-handler returns nil then nothing is added to the output channel, i.e. the item is just skipped.)

Error handling in transduce and reduce

Default behavior: Same - close the output channel (so reading from it returns nil) and print the exception via the default UncaughtExceptionHandler

;; Note that + fails on a string:
(<!! (a/reduce + 0 (a/to-chan [1 ""]))
; => nil

;; Error in xform:
;; Note that inc fails on a string:
  (<!! (a/transduce (map inc) + 0 (a/to-chan [1 "" 3]))))
; => nil
; [STDERR] ClassCastException: class java.lang.String cannot be cast ...

;; Error in reduce:
(<!! (a/transduce (map identity)
                  (completing (fn [_ v] (inc v)))
                  0
                  (a/to-chan [1 "" 3])))
; => nil
Tip 1: use halt-when to stop processing upon the first exception

halt-when can be added as the first one in a transducer pipeline to stop the processing upon the first exception:

Returns a transducer that ends transduction when pred returns true for an input. When retf is supplied it must be a fn of 2 arguments - it will be passed the (completed) result so far and the input that triggered the predicate, and its return value (if it does not throw) an exception will be the return value of the transducer. If retf is not supplied, the input that triggered the predicate will be returned.

Example:

(transduce
    (comp
      (halt-when #(instance? Throwable %))
      (map identity))
    conj []
    [0 1 (Exception. "fake") 3])
; => #error{:via [{::type java.lang.Exception, :message "fake", ..}] ..}

The same happens in core.async:

(<!! (a/transduce
         (comp
           (halt-when #(instance? Throwable %))
           (map identity))
         conj []
         (a/to-chan [0 1 (Exception. "fake") 3])))
; => #error{:via [{::type java.lang.Exception, :message "fake", ..}] ..}

However, when included in the xf of a pipeline, the pipeline returns all elements but those where the halt-when predicate was true because pipeline starts a new transduction for each item. The nice result is that you will get the same output no matter the parallelism of the pipeline.

Tip 2: Prepend a transducer that turns exceptions into anomalies

A transducer can (catch ..) exceptions from both the downstream transducers and the reducing function. We can leverage it to create a transducer that catches all exceptions and propagates them as anomalies (namely as Throwables, in this case):

(defmacro err-or
  "If body throws an exception, catch it and return it"
  [& forms]
  `(try
     ~@forms
     (catch #?(:clj Throwable  :cljs :default) t# t#)))

(def throwable? (partial instance? #?(:clj Throwable  :cljs js/Error)))

(defn catch-ex-as-data
  "Transducer that catches errors from the transducers below (catching errors
  both in the transducing and reducing functions) and returns the first one.

  It should be first, i.e. at the top of `(comp (catch-ex-as-data) ...)`)"
  ([] (catch-ex-as-data nil))
  ([on-error]
   (fn [xf]
     (fn
       ([] (err-or (xf)))
       ([result]
        (let [res (if (throwable? result)
                    result ; don't pass anomalies down
                    (err-or (xf result)))]
          (when (and on-error (throwable? res))
            (on-error res))
          res))
       ([result input]
        (try (xf result input)
             (catch #?(:clj Throwable  :cljs :default) t
               (reduced t))))))))

If we add it to the two transduce examples from above, we will get back the exception-as-anomaly instead of nil:

;; Error in the xform:
(<!! (a/transduce
         (comp (catch-ex-as-data) (map inc))
         + 0 (a/to-chan [1 "" 3])))
; => #error{:cause "class java.lang.String cannot be cast..",..}

;; Error in reduce:
(<!! (a/transduce (comp (catch-ex-as-data) (map identity)
                    (completing (fn [_ v] (inc v)))
                    0
                    (a/to-chan [1 "" 3]))))
; => #error{:cause "class java.lang.String cannot be cast..",..}
;; ClojureScript:
(defn ++ [x y] {:pre [(int? x) (int? y)]}  (+ x y))

;; Error in the xform:
(a/take! (a/transduce
           (comp (catch-ex-as-data) (map inc))
           ++ 0 (a/to-chan! [1 "" 3]))
  #(println "result=" %))
; OUT: result= #object[Error Error: Assert failed: (int? y)]

;; Error in reduce:
(a/take! (a/transduce (comp (catch-ex-as-data) (map identity))
           (completing (fn [_ v] (++ 1 v)))
           0
           (a/to-chan! [1 "" 3]))
  println)
; OUT: result= #object[Error Error: Assert failed: (int? y)]
When the transduction ends prematurely, the remaining items on the input channel are not consumed. You likely want to drain and close the channel to avoid any go blocks on downstream channels getting blocked forever. See the example later on below.

Error handling in pipelines

Default behavior: Same - skip the item, print the exception to stderr

(let [out (chan 1)]
    (a/pipeline
      1         ; parallelism
      out
      (map inc) ; transformation
      (a/to-chan [1 "" 3]))
    (<!! (a/into [] out)))
; => [1 3]
; [STDERR] Exception in thread "async-dispatch-11" java.lang.ClassCastException

The max-arity signature is (pipeline n to xf from close? ex-handler). We can leverage the ex-handler parameter to change the default behavior. Both the xf and ex-handler arguments are simply passed to chan so we can learn from its docstring:

[..] ex-handler must be a fn of one argument - if an exception occurs during transformation it will be called with the Throwable as an argument, and any non-nil return value will be placed in the channel.

The default ex-handler calls the thread’s UncaughtExceptionHandler’s .uncaughtException (printing it to stderr) and returns nil (effectively skipping the item).

We can f.ex. propagate the exception as data:

(let [out (chan 1)]
    (a/pipeline
      1
      out
      (map inc)
      (a/to-chan [0 "" 2])
      true
      (fn ex-handler [throwable] throwable))
    (<!! (a/into [] out)))
; => [1, #error{:cause "class java.lang.String cannot be cast.." ..}, 3]

A complete error handling example

This is code from our project. We propagate exceptions as data (an anomaly), namely as java.lang.Throwable. There may be multiple anomalies in the input channel while our transduction should stop upon the first exception. We use our custom catching-transduce similar to core.async/transduce but either returning the result (instead of a channel) or throwing. It does the following error handling:

  1. Anomalies from the input channel are routed to another channel, err-ch, and combined into a single vector item (via (let [errors-ch (a/into [] err-ch)] ..) - because we want to know how many anomalies there were in the input)

  2. Exceptions during the transduction - whether in the transformation or reduction step - are caught by the custom transducer catch-ex-as-data - and the transduction stops immediately via (halt-when throwable?).

  3. Finally, if there is any anomaly in the err[ors]-ch or if the result of the transduction is an anomaly, we throw an exception; otherwise we return the result

To avoid getting blocked we need to ensure that we consume all items on all channels - alts!! and consume-rest help with that.

(defn catch-ex-as-data [] ...) ; defined in a previous section

(defn consume-rest
  "Consume all remaining items on `ch`"
  [ch]
  (a/go-loop []
    (when (a/<! ch) (recur)))
  nil)

(defn catching-transduce
  "Similar to `core.async/transduce` but returns the reduced value and
  captures 'anomalies' (i.e. exceptions sent as data) in the `chan` data and
  captures exceptions in `xf` and `f`, stopping at the first one.

  Returns the result or throws if there was any anomaly / exception."
  [xf f init ch]
  (let [[err-ch data-ch] (a/split throwable? ch) ;                           (1)
        ;; ALTERNATIVE IMPL: Upon anomaly discovery in `ch`, `untap[-all]` the
        ;; data chan + close it, consume the test of `ch` counting
        ;; # items / errors
        errors-ch (a/into [] err-ch) ;                                       (2)
        data-cnt  (atom 0)
        result-ch (->>
                    data-ch
                    (a/transduce
                      (comp
                        (catch-ex-as-data (fn [_] (consume-rest data-ch))) ; (3)
                        (map #(do (swap! data-cnt inc) %))
                        xf)
                      f
                      init))
        [val src] (a/alts!! [result-ch errors-ch]) ;                         (4)
        result    (if (= src result-ch) val (a/<!! result-ch))
        errs      (if (= src errors-ch) val (a/<!! errors-ch))]
    (cond
      (seq errs) (throw (ex-info (format "Fetching data failed for %d (ok for %d); first error: %s"
                                         (count errs) @data-cnt (first errs))
                                 {:errs errs}
                                 (first errs)))
      (throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result))
      :else result)))
1Split input into anomalies and valid items
2Consume all the anomalies (if any)
3Turn exceptions during transform/reduce into anomalies stopping at the first one; most importantly, drain the input channel upon an exception
4Get the results; we don’t know where there are any data so we need to look at both channels using alts!! and then read the other one using <!!. (If we used <!! on both, we could get block forever.)

Note: This quite certainly isn’t the best implementation of our needs I could come up with. But it seems to work :-)

If you want to see another core.async pipeline with error handling, have a look at mbrainz-importer’s load-parallel and reader.

Aside: Why does everything return a channel?

I wondered why do onto-chan, pipeline etc. return a new channel instead of the target channel, a channel that contains nothing and just gets closed when the process is finished. The reason seems to be to provide you an ability to see what is happening, namely that/whether the transformation step is finished.

You can use the channel returned from onto-chan to wait before doing next onto to do proper back pressure: (do (<! (onto-chan input out)) (recur)).

Cleanup

Something crucial both in Go and Clojure channels is to clean them up properly so that you don’t end up with stray go loops waiting infinitely for an input that never arrives. It is also important to prevent live/dead-blocking parts of your code.

In Clojure you typically want to close the input channel, which is normally propagated downstream - though you might need to take care to ensure it really happens. You can also close the end stream to signal “I am done, no more stuff please!” which, if coded properly, will propagate upstream.

To support closing the downstream channel, you want to check the output of the write operation and only continue if true:

(go-loop []
  (when (>! out-chan (get-value-from-somewhere))
    (recur)))

In some cases you might need to drain a channel (see the example above) to ensure that no downstream go blocks stay blocked.

You might want to implement a "kill switch" / "poison pill" channel that go-loops check and stop themselves when it is closed.

An example while this is important is the Core.async and crashing the repl question.

Garbage Collection

A channel is essentially a queue of puts (pairs of values and callbacks) pointing to the queue of values already inserted (based on the buffer size) pointing to a queue of takes (callbacks). A go is [in/out chan →] callback → stateful-machine → return-chan. So if it reads from an input channel and the channel is GC-ed, the go block will also be GC-ed (the shows who references what).

So if we have (let [c (chan)] (go ..) and c is GC-ed the go also gets GC-ed (unless there is a loop of go’s using each other’s channels). But with a thread - (let [c (chan)] (thread (<!! c)) it is different because it is in a thread pool and thus "used" by the OS and thus not GCed.


Tags: clojure library


Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob