A Clojure library for building data-processing chain pipelines
Add this dependency to your leiningen project.clj
[polyglogon/pipeline "0.1.0"]
Here is a contrived example that repeatedly reuses a pipeline
(ns pipeline-example.core
(:require [clojure.core.async :as async]
[clojure.pprint :refer [pprint]]
[pipeline.control :as control]
[pipeline.kill-switch :as kill-switch]
[pipeline.protocols :refer [PipelineImpl]]
[pipeline.process :as process]))
(defn- square [x]
(* x x))
(defrecord Squarer [_]
(handle [_ input-message]
[(square input-message)])
(finish [_ _]
(defrecord Summer [accum]
(handle [_ input-message]
(swap! accum + input-message)
(finish [_ completed?]
(if completed?
(defn summer-factory [ignored]
(->Summer (atom 0)))
(defn run []
(let [control-chan-1 (async/chan 1)
control-chan-2 (async/chan 1)]
(process/create 3
(process/create 1
(doseq [i (range 100)
:let [kill-switch (kill-switch/create)
in-chan-1 (async/chan)
in-chan-2 (async/chan)
out-chan-2 (async/chan)]]
(async/onto-chan in-chan-1 [1 2 3 4 5])
(control/send-message control-chan-1
:input-chan in-chan-1
:output-chan in-chan-2
:kill-switch kill-switch
:context {:foo :bar})
(control/send-message control-chan-2
:input-chan in-chan-2
:output-chan out-chan-2
:kill-switch kill-switch
:context {:spam :eggs})
(pprint (async/<!! out-chan-2)))
(map async/close! [control-chan-1 control-chan-2])))
There are more examples in the test code
The data-processing pipeline "is a set of data processing elements connected in series, where the output of one element is the input of the next one". This library makes it easy to create the processing elements (called processes here) and to coordinate processing tasks on the pipeline.
A link in the chain. TODO.
This pipeline implementation is inspired by core.async/pipeline, but there are a few differences.
- Uses instances of the PipelineImpl protocol instead of transducers.
- This pipeline does not not try to maintain output order, the core.async/pipeline does.
- Instances of PipelineImpl are reused for handling multiple messages, but keep in mind that messages will be spread across multiple instances if concurrency is greater than one. The core.async/pipeline create a new xform instance for each message.
- The processes used in this pipeline are instantiated separately from the input/output channels. This pipeline can know about multiple input/output channel pairs at the same time, and will work on tasks as long as control channel(s) are open.
Copyright © 2014-2015 Staples, Inc. Distributed under the MIT License