diff --git a/CHANGELOG.md b/CHANGELOG.md index e493d4e5..28a2ba5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Unreleased +- Add `divert` stream operation + ### [0.9.6] - [2022-08-01] - Add clj-kondo and fix all lint warnings and errors [#323](https://github.com/FundingCircle/jackdaw/pull/323) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 93ae169b..72287724 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -127,6 +127,20 @@ [kstream predicate-fns] (p/branch kstream predicate-fns)) +(defn divert + "Diverts records that match any `pred` to the provided `topic-config`. Records that + do not match any `pred` are pushed on through the app. + + When providing multiple diverts, diverts can either be: + - ordered (by providing a vector of `predicate` `topic-config` tuples); or + - unordered (by providing a map `predicate` to `topic-config` key pairs)" + ([stream diverts] + (clojure.core/reduce #(apply divert %1 %2) stream diverts)) + ([stream pred topic-config] + (let [[divert-stream continue-stream] (branch stream [pred (constantly true)])] + (to divert-stream topic-config) + continue-stream))) + (defn flat-map "Creates a KStream that will consist of the concatenation of messages returned by calling `key-value-mapper-fn` on each key/value pair in the diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 94a95e3e..4f1405cc 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -274,6 +274,78 @@ (is (= [[1 1]] (mock/get-keyvals driver topic-pos))) (is (= [[1 -1]] (mock/get-keyvals driver topic-neg))))) + (testing "divert" + (let [topic-a (mock/topic "topic-a") + topic-odd (mock/topic "topic-odd") + topic-even (mock/topic "topic-even") + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + (k/divert (comp odd? last) topic-odd) + (k/to topic-even)))) + publish (partial mock/publish driver topic-a)] + + (publish 1 1) + (publish 1 2) + (publish 1 3) + (publish 1 4) + + (is (= [1 3] (map last (mock/get-keyvals driver topic-odd)))) + (is (= [2 4] (map last (mock/get-keyvals driver topic-even))))) + (testing "multiple diverts" + (testing "un-ordered sorting" + (let [topic-a (mock/topic "topic-a") + topic-four (mock/topic "topic-mod-4") + topic-five (mock/topic "topic-mod-5") + topic-seven (mock/topic "topic-mod-7") + topic-rest (mock/topic "topic-rest") + + mod-four? (comp zero? #(mod % 4) last) + mod-five? (comp zero? #(mod % 5) last) + mod-seven? (comp zero? #(mod % 7) last) + + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + (k/divert (hash-map mod-four? topic-four + mod-five? topic-five + mod-seven? topic-seven)) + (k/to topic-rest)))) + publish (partial mock/publish driver topic-a)] + + (doseq [i (range 10)] + (publish 1 (inc i))) + + (is (= [4 8] (map last (mock/get-keyvals driver topic-four)))) + (is (= [5 10] (map last (mock/get-keyvals driver topic-five)))) + (is (= [7] (map last (mock/get-keyvals driver topic-seven)))) + (is (= [1 2 3 6 9] (map last (mock/get-keyvals driver topic-rest)))))) + + (testing "ordered sorting" + (let [topic-a (mock/topic "topic-a") + topic-three (mock/topic "topic-mod-3") + topic-five (mock/topic "topic-mod-5") + topic-rest (mock/topic "topic-rest") + + mod-three? (comp zero? #(mod % 3) last) + mod-five? (comp zero? #(mod % 5) last) + + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + (k/divert [[mod-five? topic-five] + [mod-three? topic-three]]) + (k/to topic-rest)))) + publish (partial mock/publish driver topic-a)] + + (doseq [i (range 15)] + (publish 1 (inc i))) + + (is (= [3 6 9 12] (map last (mock/get-keyvals driver topic-three)))) + ;; mod-5 gets 15 b/c it was applied first + (is (= [5 10 15] (map last (mock/get-keyvals driver topic-five)))) + (is (= [1 2 4 7 8 11 13 14] (map last (mock/get-keyvals driver topic-rest)))))))) + (testing "flat-map" (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b")