Error handling in Clojure Core.Async (and its higher-level constructs)
- Summary of key points
- Error handling
- Strategies
- NOTE: A default uncaught error handler
- Error handling in core.async constructs
- Error handling in go loops
- Error handling in channel transducers
- Error handling in
transduce
andreduce
- Tip 1: use
halt-when
to stop processing upon the first exception - Tip 2: Prepend a transducer that turns exceptions into anomalies
- Error handling in pipelines
- A complete error handling example
- Aside: Why does everything return a channel?
- Cleanup
- Resources
Error handling is something that core.async leaves to you (because it depends on your particular use case). People have written before about handling errors in the low-level go loops but there is little about the higher level constructs such as pipelines and transducers. I’d like to correct that. First, a little repetition.
(I focus here on Clojure but most applies also to ClojureScript.)
Disclaimer: I am no expert, this is a result of my study and exploration. No guarantees :-)
Summary of key points
If you are short on time, these are the main points. Details follow.
By default, errors are printed to standard error and the output channels are closed / items are skipped
A popular solution is to catch exceptions and send them on as recognizable data, as “anomalies”
Wrap the content of
go
/go-loop
with try-catchUpon reading from a channel, check whether you get data or an “anomaly” and re-throw/handle appropriately
Use
chan
’s andpipeline
’sex-handler
parameter to customize error handlingYou can leverage
halt-when
to stop transduction upon the first exception (if that’s what you want)Compose your transducer with an error-catching transducer (which will thus wrap both the transformation and the reduction)
Aside: Make sure to clean up all resources to avoid leaks and deadlocks
Error handling
Strategies
What can you do when processing a value on a channel fails? You can ignore the error, propagate it downstream, or report it elsewhere. You can then continue processing other values or abort.
The most popular way to process errors is to propagate them downstream. But how do you distinguish errors and regular values in downstream processing?
Send them as a distinct type such as
java.lang.Throwable
(orjs/Error
in ClojureScript). This is trivial since you normally already have an exception (i.e. a subclass ofThrowable
). The processor can then switch on(if (instance? Throwable val) ...)
.Wrap it in a map with special keys such as
{:cognitect.anomalies/category :cognitect.anomalies/interrupted, :cognitect.anomalies/message (ex-message e) ...}
(Ex.: Datomic mbrainz-importer.) You can then check it downstream with(if (:cognitect.anomalies/category val) ...)
. (Notice that keywords work also on most non-map types so it is OK to process[1 2 <error map> 4]
.) This seems to be quite popular and doesn’t depend on Java exceptions.(Wrap every value in some container that knows whether the content is a value or an error:
[ [:val 32], [:error ...], ...]
.)
Instead of propagating an error, you could report it somewhere else - log it, send it to an error tracking service, or send it to a dedicated error channel.
NOTE: A default uncaught error handler
go
blocks and other async constructs run on helper threads. Java already has a mechanism for handling exceptions in non-main threads: the default or thread’s UncaughtExceptionHandler
. In core.async the default one prints the error to stderr. You can set your own with (Thread/setDefaultUncaughtExceptionHandler eh)
Error handling in core.async constructs
Error handling in go loops
Default behavior: Same - the exception is printed to stderr and a closed channel is returned.
;; Note that inc fails on nil:
(a/<!!
(a/go
(let [ch (doto (a/chan) a/close!)]
(inc (a/<! ch)))))
; => nil
; [STDERR] Exception in thread "async-dispatch-7" java.lang.NullPointerException
; [STDERR] at clojure.lang.Numbers.ops(Numbers.java:1068) ...
As proposed by David Nolen, use (go (try …
(or go-safe
) and a <?
macro that re-throws if the item is an exception.
Error handling in channel transducers
You can create a channel with a transducer that will process each value sent to it. What happens if the transducer throws an exception?
Default behavior: Pass exceptions to Thread/defaultUncaughtExceptionHandler
(which prints them) skipping this item and continue processing.
(<!! (a/into [] (doto (chan 3 (map inc))
(>!! 0)
(>!! "boom!")
(>!! 2)
(a/close!))))
; => [1 3]
; [STDERR] Exception in thread "..." java.lang.ClassCastException: ...
If you want to handle the exceptions yourself, use the 3-arity (chan buf-or-n xform ex-handler)
to supply a custom exception handler:
(<!! (doto (chan 3 (map inc) (fn [exc] :error))
(>!! "boom!")
(a/close!)))
; => :error
(If the ex-handler returns nil
then nothing is added to the output channel, i.e. the item is just skipped.)
Error handling in transduce
and reduce
Default behavior: Same - close the output channel (so reading from it returns nil
) and print the exception via the default UncaughtExceptionHandler
;; Note that + fails on a string:
(<!! (a/reduce + 0 (a/to-chan [1 ""]))
; => nil
;; Error in xform:
;; Note that inc fails on a string:
(<!! (a/transduce (map inc) + 0 (a/to-chan [1 "" 3]))))
; => nil
; [STDERR] ClassCastException: class java.lang.String cannot be cast ...
;; Error in reduce:
(<!! (a/transduce (map identity)
(completing (fn [_ v] (inc v)))
0
(a/to-chan [1 "" 3])))
; => nil
Tip 1: use halt-when
to stop processing upon the first exception
halt-when
can be added as the first one in a transducer pipeline to stop the processing upon the first exception:
Returns a transducer that ends transduction when pred returns true for an input. When retf is supplied it must be a fn of 2 arguments - it will be passed the (completed) result so far and the input that triggered the predicate, and its return value (if it does not throw) an exception will be the return value of the transducer. If retf is not supplied, the input that triggered the predicate will be returned.
Example:
(transduce
(comp
(halt-when #(instance? Throwable %))
(map identity))
conj []
[0 1 (Exception. "fake") 3])
; => #error{:via [{::type java.lang.Exception, :message "fake", ..}] ..}
The same happens in core.async:
(<!! (a/transduce
(comp
(halt-when #(instance? Throwable %))
(map identity))
conj []
(a/to-chan [0 1 (Exception. "fake") 3])))
; => #error{:via [{::type java.lang.Exception, :message "fake", ..}] ..}
However, when included in the xf
of a pipeline
, the pipeline returns all elements but those where the halt-when
predicate was true because pipeline
starts a new transduction for each item. The nice result is that you will get the same output no matter the parallelism of the pipeline.
Tip 2: Prepend a transducer that turns exceptions into anomalies
A transducer can (catch ..)
exceptions from both the downstream transducers and the reducing function. We can leverage it to create a transducer that catches all exceptions and propagates them as anomalies (namely as Throwables, in this case):
(defmacro err-or
"If body throws an exception, catch it and return it"
[& forms]
`(try
~@forms
(catch #?(:clj Throwable :cljs :default) t# t#)))
(def throwable? (partial instance? #?(:clj Throwable :cljs js/Error)))
(defn catch-ex-as-data
"Transducer that catches errors from the transducers below (catching errors
both in the transducing and reducing functions) and returns the first one.
It should be first, i.e. at the top of `(comp (catch-ex-as-data) ...)`)"
([] (catch-ex-as-data nil))
([on-error]
(fn [xf]
(fn
([] (err-or (xf)))
([result]
(let [res (if (throwable? result)
result ; don't pass anomalies down
(err-or (xf result)))]
(when (and on-error (throwable? res))
(on-error res))
res))
([result input]
(try (xf result input)
(catch #?(:clj Throwable :cljs :default) t
(reduced t))))))))
If we add it to the two transduce
examples from above, we will get back the exception-as-anomaly instead of nil:
;; Error in the xform:
(<!! (a/transduce
(comp (catch-ex-as-data) (map inc))
+ 0 (a/to-chan [1 "" 3])))
; => #error{:cause "class java.lang.String cannot be cast..",..}
;; Error in reduce:
(<!! (a/transduce (comp (catch-ex-as-data) (map identity)
(completing (fn [_ v] (inc v)))
0
(a/to-chan [1 "" 3]))))
; => #error{:cause "class java.lang.String cannot be cast..",..}
;; ClojureScript:
(defn ++ [x y] {:pre [(int? x) (int? y)]} (+ x y))
;; Error in the xform:
(a/take! (a/transduce
(comp (catch-ex-as-data) (map inc))
++ 0 (a/to-chan! [1 "" 3]))
#(println "result=" %))
; OUT: result= #object[Error Error: Assert failed: (int? y)]
;; Error in reduce:
(a/take! (a/transduce (comp (catch-ex-as-data) (map identity))
(completing (fn [_ v] (++ 1 v)))
0
(a/to-chan! [1 "" 3]))
println)
; OUT: result= #object[Error Error: Assert failed: (int? y)]
When the transduction ends prematurely, the remaining items on the input channel are not consumed. You likely want to drain and close the channel to avoid any go blocks on downstream channels getting blocked forever. See the example later on below. |
Error handling in pipelines
Default behavior: Same - skip the item, print the exception to stderr
(let [out (chan 1)]
(a/pipeline
1 ; parallelism
out
(map inc) ; transformation
(a/to-chan [1 "" 3]))
(<!! (a/into [] out)))
; => [1 3]
; [STDERR] Exception in thread "async-dispatch-11" java.lang.ClassCastException
The max-arity signature is (pipeline n to xf from close? ex-handler)
. We can leverage the ex-handler
parameter to change the default behavior. Both the xf
and ex-handler
arguments are simply passed to chan
so we can learn from its docstring:
[..] ex-handler must be a fn of one argument - if an exception occurs during transformation it will be called with the Throwable as an argument, and any non-nil return value will be placed in the channel.
The default ex-handler calls the thread’s UncaughtExceptionHandler’s .uncaughtException
(printing it to stderr) and returns nil
(effectively skipping the item).
We can f.ex. propagate the exception as data:
(let [out (chan 1)]
(a/pipeline
1
out
(map inc)
(a/to-chan [0 "" 2])
true
(fn ex-handler [throwable] throwable))
(<!! (a/into [] out)))
; => [1, #error{:cause "class java.lang.String cannot be cast.." ..}, 3]
A complete error handling example
This is code from our project. We propagate exceptions as data (an anomaly), namely as java.lang.Throwable
. There may be multiple anomalies in the input channel while our transduction should stop upon the first exception. We use our custom catching-transduce
similar to core.async/transduce
but either returning the result (instead of a channel) or throwing. It does the following error handling:
Anomalies from the input channel are routed to another channel,
err-ch
, and combined into a single vector item (via(let [errors-ch (a/into [] err-ch)] ..
) - because we want to know how many anomalies there were in the input)Exceptions during the transduction - whether in the transformation or reduction step - are caught by the custom transducer
catch-ex-as-data
- and the transduction stops immediately via(halt-when throwable?)
.Finally, if there is any anomaly in the
err[ors]-ch
or if the result of the transduction is an anomaly, we throw an exception; otherwise we return the result
To avoid getting blocked we need to ensure that we consume all items on all channels - alts!!
and consume-rest
help with that.
(defn catch-ex-as-data [] ...) ; defined in a previous section
(defn consume-rest
"Consume all remaining items on `ch`"
[ch]
(a/go-loop []
(when (a/<! ch) (recur)))
nil)
(defn catching-transduce
"Similar to `core.async/transduce` but returns the reduced value and
captures 'anomalies' (i.e. exceptions sent as data) in the `chan` data and
captures exceptions in `xf` and `f`, stopping at the first one.
Returns the result or throws if there was any anomaly / exception."
[xf f init ch]
(let [[err-ch data-ch] (a/split throwable? ch) ; (1)
;; ALTERNATIVE IMPL: Upon anomaly discovery in `ch`, `untap[-all]` the
;; data chan + close it, consume the test of `ch` counting
;; # items / errors
errors-ch (a/into [] err-ch) ; (2)
data-cnt (atom 0)
result-ch (->>
data-ch
(a/transduce
(comp
(catch-ex-as-data (fn [_] (consume-rest data-ch))) ; (3)
(map #(do (swap! data-cnt inc) %))
xf)
f
init))
[val src] (a/alts!! [result-ch errors-ch]) ; (4)
result (if (= src result-ch) val (a/<!! result-ch))
errs (if (= src errors-ch) val (a/<!! errors-ch))]
(cond
(seq errs) (throw (ex-info (format "Fetching data failed for %d (ok for %d); first error: %s"
(count errs) @data-cnt (first errs))
{:errs errs}
(first errs)))
(throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result))
:else result)))
1 | Split input into anomalies and valid items |
2 | Consume all the anomalies (if any) |
3 | Turn exceptions during transform/reduce into anomalies stopping at the first one; most importantly, drain the input channel upon an exception |
4 | Get the results; we don’t know where there are any data so we need to look at both channels using alts!! and then read the other one using <!! . (If we used <!! on both, we could get block forever.) |
Note: This quite certainly isn’t the best implementation of our needs I could come up with. But it seems to work :-)
If you want to see another core.async pipeline with error handling, have a look at mbrainz-importer’s load-parallel
and reader
.
Aside: Why does everything return a channel?
I wondered why do onto-chan
, pipeline
etc. return a new channel instead of the target channel, a channel that contains nothing and just gets closed when the process is finished. The reason seems to be to provide you an ability to see what is happening, namely that/whether the transformation step is finished.
You can use the channel returned from onto-chan
to wait before doing next onto to do proper back pressure: (do (<! (onto-chan input out)) (recur))
.
Cleanup
Something crucial both in Go and Clojure channels is to clean them up properly so that you don’t end up with stray go loops waiting infinitely for an input that never arrives. It is also important to prevent live/dead-blocking parts of your code.
In Clojure you typically want to close the input channel, which is normally propagated downstream - though you might need to take care to ensure it really happens. You can also close the end stream to signal “I am done, no more stuff please!” which, if coded properly, will propagate upstream.
To support closing the downstream channel, you want to check the output of the write operation and only continue if true:
(go-loop []
(when (>! out-chan (get-value-from-somewhere))
(recur)))
In some cases you might need to drain a channel (see the example above) to ensure that no downstream go
blocks stay blocked.
You might want to implement a "kill switch" / "poison pill" channel that go-loops check and stop themselves when it is closed.
An example while this is important is the Core.async and crashing the repl question.
Garbage Collection
A channel is essentially a queue of puts (pairs of values and callbacks) pointing to the queue of values already inserted (based on the buffer size) pointing to a queue of takes (callbacks). A go
is [in/out chan →] callback → stateful-machine → return-chan. So if it reads from an input channel and the channel is GC-ed, the go block will also be GC-ed (the →
shows who references what).
So if we have (let [c (chan)] (go ..)
and c
is GC-ed the go
also gets GC-ed (unless there is a loop of go’s using each other’s channels). But with a thread - (let [c (chan)] (thread (<!! c))
it is different because it is in a thread pool and thus "used" by the OS and thus not GCed.
Resources
Tim Baldridge’s Core.Async tutorials (paid / 7 days free trial of PivotShare)
The core.async Patterns course by Eric Normand