diff --git a/src/jackdaw/admin.clj b/src/jackdaw/admin.clj index 7d0ca4c1..7a30113e 100644 --- a/src/jackdaw/admin.clj +++ b/src/jackdaw/admin.clj @@ -27,7 +27,7 @@ (def client-impl {:alter-topics* (fn [this topics] (d/future - @(.all (.alterConfigs ^AdminClient this topics)))) + @(.all (.incrementalAlterConfigs ^AdminClient this topics)))) :create-topics* (fn [this topics] (d/future @(.all (.createTopics ^AdminClient this topics)))) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index cbcd0bd7..a2f63ccd 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -180,7 +180,7 @@ of datafied messages." [^Consumer consumer timeout] (some->> (if (int? timeout) - (.poll consumer ^long timeout) + (.poll consumer (Duration/ofMillis timeout)) (.poll consumer ^Duration timeout)) (map jd/datafy))) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 844d3c4c..f70de80d 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -108,8 +108,7 @@ [kstream] (p/print! kstream)) -(defn ^{:deprecated "2.6.0" - :superseded-by "repartition"} +(defn ^{:deprecated "2.6.0"} through "Materializes a stream to a topic, and returns a new KStream that will consume messages from the topic." @@ -133,8 +132,7 @@ ;; IKStream -(defn ^{:deprecated "2.8.0" - :superseded-by "split"} +(defn ^{:deprecated "2.8.0"} branch "Returns a list of KStreams, one for each of the `predicate-fns` provided." diff --git a/src/jackdaw/streams/describe.clj b/src/jackdaw/streams/describe.clj index ba760671..aaf67d38 100644 --- a/src/jackdaw/streams/describe.clj +++ b/src/jackdaw/streams/describe.clj @@ -38,7 +38,7 @@ (base-node :node n)) (defmethod describe-node :source [n] - (let [topics (map str/trim (-> (.topics ^TopologyDescription$Source n) + (let [topics (map str/trim (-> (.topicSet ^TopologyDescription$Source n) (str/replace "[" "") (str/replace "]" "") (str/split #",")))] diff --git a/src/jackdaw/streams/extras.clj b/src/jackdaw/streams/extras.clj index 7646702c..173cd5d1 100644 --- a/src/jackdaw/streams/extras.clj +++ b/src/jackdaw/streams/extras.clj @@ -69,12 +69,12 @@ `(let [t# ~topic] (-> ~builder (map-validating! t# ~topic-spec ~(with-file (meta &form))) - (js/to t#)))) + (js/to! t#)))) ([builder partition-fn topic topic-spec] `(let [t# ~topic] (-> ~builder (map-validating! t# ~topic-spec ~(with-file (meta &form))) - (js/to ~partition-fn t#))))) + (js/to! ~partition-fn t#))))) (defmacro through "Wraps `#'jackdaw.streams/through`, providing validation of records diff --git a/test/jackdaw/serdes/avro_test.clj b/test/jackdaw/serdes/avro_test.clj index 7f4d4a2b..6892276f 100644 --- a/test/jackdaw/serdes/avro_test.clj +++ b/test/jackdaw/serdes/avro_test.clj @@ -160,7 +160,7 @@ :namespace "com.fundingcircle"}) schema-type (schema-type avro-schema) clj-data 4 - avro-data (Integer. 4)] + avro-data (Integer/valueOf 4)] (is (= clj-data (avro/avro->clj schema-type avro-data))) (is (= avro-data (avro/clj->avro schema-type clj-data []))) diff --git a/test/jackdaw/serdes/edn2_test.clj b/test/jackdaw/serdes/edn2_test.clj index 13d92a74..878d496b 100644 --- a/test/jackdaw/serdes/edn2_test.clj +++ b/test/jackdaw/serdes/edn2_test.clj @@ -25,7 +25,7 @@ (defspec edn-print-length-test 20 (testing "EDN data is the same after serialization and deserialization with *print-length*." (binding [*print-length* 100] - (prop/for-all [x (gen/vector gen/int (inc *print-length*))] + (prop/for-all [x (gen/vector gen/small-integer (inc *print-length*))] (is (= x (->> (.serialize (jse/edn-serializer) nil x) (.deserialize (jse/edn-deserializer) nil)))))))) diff --git a/test/jackdaw/serdes/edn_test.clj b/test/jackdaw/serdes/edn_test.clj index a6eb70be..0d2ca05d 100644 --- a/test/jackdaw/serdes/edn_test.clj +++ b/test/jackdaw/serdes/edn_test.clj @@ -25,7 +25,7 @@ (defspec edn-print-length-test 20 (testing "EDN data is the same after serialization and deserialization with *print-length*." (binding [*print-length* 100] - (prop/for-all [x (gen/vector gen/int (inc *print-length*))] + (prop/for-all [x (gen/vector gen/small-integer (inc *print-length*))] (is (= x (->> (.serialize (jse/serializer) nil x) (.deserialize (jse/deserializer) nil)))))))) diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index b274aa26..dfa63fb2 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -67,7 +67,7 @@ (let [left (k/kstream builder topic-a) right (k/ktable builder topic-b)] (-> (k/join left right +) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -93,7 +93,7 @@ (with-open [driver (mock/build-driver (fn [builder] (let [left (k/kstream builder topic-a) right (k/ktable builder topic-b)] - (k/to (k/left-join left right safe-add) topic-c))))] + (k/to! (k/left-join left right safe-add) topic-c))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -113,7 +113,7 @@ (with-open [driver (mock/build-driver (fn [builder] (let [left (k/kstream builder topic-a) right (k/ktable builder topic-b)] - (k/to (k/left-join left right safe-add topic-a topic-b) topic-c))))] + (k/to! (k/left-join left right safe-add topic-a topic-b) topic-c))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -147,7 +147,7 @@ (-> builder (k/kstream topic-a) (k/filter (fn [[_k v]] (> v 1))) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -162,7 +162,7 @@ (-> builder (k/kstream topic-a) (k/filter-not (fn [[_k v]] (> v 1))) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -177,7 +177,7 @@ (-> builder (k/kstream topic-a) (k/map-values inc) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -195,7 +195,7 @@ (-> builder (k/kstream topic-a) (k/peek (fn [[_ x]] (swap! sentinel conj x))) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -233,7 +233,7 @@ (-> builder (k/kstream topic-a) (k/through topic-b) - (k/to topic-c)))) + (k/to! topic-c)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -247,7 +247,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -264,8 +264,8 @@ (k/branch [(fn [[_k v]] (<= 0 v)) (constantly true)]))] - (k/to pos-stream topic-pos) - (k/to neg-stream topic-neg)))) + (k/to! pos-stream topic-pos) + (k/to! neg-stream topic-neg)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -283,7 +283,7 @@ (k/flat-map (fn [[k v]] [[k v] [k 0]])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -300,7 +300,7 @@ (k/kstream topic-a) (k/flat-map-values (fn [v] [v (inc v)])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -323,7 +323,7 @@ windows topic-a topic-b) - (k/to topic-c))))) + (k/to! topic-c))))) publish-a (partial mock/publish driver topic-a) publish-b (partial mock/publish driver topic-b)] @@ -340,7 +340,7 @@ (k/kstream topic-a) (k/map (fn [[k v]] [(inc k) (inc v)])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -355,7 +355,7 @@ (-> builder (k/kstream topic-a) (k/merge (k/kstream builder topic-b)) - (k/to topic-c)))) + (k/to! topic-c)))) produce-a (mock/producer driver topic-a) produce-b (mock/producer driver topic-b)] @@ -378,7 +378,7 @@ windows topic-a topic-b) - (k/to topic-c))))) + (k/to! topic-c))))) publish-a (partial mock/publish driver topic-a) publish-b (partial mock/publish driver topic-b)] @@ -413,7 +413,7 @@ (k/kstream topic-a) (k/select-key (fn [[k _v]] (inc k))) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -434,7 +434,7 @@ (-> builder (k/kstream topic-a) (k/transform transformer-supplier-fn) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -465,7 +465,7 @@ (-> builder (k/kstream topic-a) (k/flat-transform transformer-supplier-fn) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -495,7 +495,7 @@ (-> builder (k/kstream topic-a) (k/transform-values transformer-supplier-fn) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -526,7 +526,7 @@ (-> builder (k/kstream topic-a) (k/flat-transform-values transformer-supplier-fn) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -549,7 +549,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstreams [topic-a topic-b]) - (k/to topic-c)))) + (k/to! topic-c)))) publish (partial mock/publish driver)] (publish topic-a 1 1) @@ -567,7 +567,7 @@ (k/filter (fn [[_k v]] (not (zero? v)))) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 2) @@ -587,7 +587,7 @@ (k/filter-not (fn [[_k v]] (not (zero? v)))) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 0) @@ -607,7 +607,7 @@ (k/map-values (fn [v] (inc v))) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 0) @@ -632,7 +632,7 @@ topic-a) (k/count topic-b) (k/to-kstream) - (k/to topic-c))))] + (k/to! topic-c))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 0) @@ -653,7 +653,7 @@ right (k/ktable builder topic-b)] (-> (k/join left right +) (k/to-kstream) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -677,7 +677,7 @@ right (k/ktable builder topic-b)] (-> (k/outer-join left right safe-add) (k/to-kstream) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -703,7 +703,7 @@ right (k/ktable builder topic-b)] (-> (k/left-join left right safe-add) (k/to-kstream) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -741,7 +741,7 @@ (k/suppress {}) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-c))))] + (k/to! topic-c))))] (let [publish (partial mock/publish driver topic-a)] @@ -778,7 +778,7 @@ (k/suppress {:max-records max-records}) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-c))))] + (k/to! topic-c))))] (let [publish (partial mock/publish driver topic-a)] @@ -816,7 +816,7 @@ (k/suppress {:max-records max-records}) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-c))))] + (k/to! topic-c))))] (let [publish (partial mock/publish driver topic-a)] @@ -841,7 +841,7 @@ (k/count topic-a) (k/suppress {:until-time-limit-ms 10}) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] @@ -868,7 +868,7 @@ (-> (k/left-join left right safe-add) (k/suppress {:until-time-limit-ms 10}) (k/to-kstream) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-left (partial mock/publish driver topic-a) publish-right (partial mock/publish driver topic-b)] @@ -894,7 +894,7 @@ (k/group-by-key) (k/count topic-a) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -914,7 +914,7 @@ (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce + topic-a) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -936,7 +936,7 @@ (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce +) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -960,7 +960,7 @@ (fn [acc [_k v]] (+ acc v)) topic-a) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -986,7 +986,7 @@ (fn [acc [_k v]] (+ acc (:i v))) topic-a) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a) uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149" uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"] @@ -1010,7 +1010,7 @@ (k/aggregate (constantly -10) (fn [acc [_k v]] (+ acc v))) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -1042,7 +1042,7 @@ (let [[prev current] v] [k (when (and prev current) (- current prev))]))) - (k/to topic-diffs))))) + (k/to! topic-diffs))))) publish (partial mock/publish driver topic-in)] (publish 1 5) @@ -1064,7 +1064,7 @@ (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1000 1 1) @@ -1088,7 +1088,7 @@ (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1000 "a" 1) @@ -1112,7 +1112,7 @@ (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 1000 1 1) @@ -1145,7 +1145,7 @@ topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a)] (publish 100 1 4) @@ -1179,7 +1179,7 @@ (fn [acc [_k v]] (- acc v)) topic-b) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -1207,7 +1207,7 @@ (fn [acc [_k v]] (- acc (:i v))) topic-b) (k/to-kstream) - (k/to topic-b)))) + (k/to! topic-b)))) publish (partial mock/publish driver topic-a) uuid-a #uuid "a8e310d7-f0d6-4f81-a474-aab5d6234149" uuid-b #uuid "0fb9ad92-dad8-45e7-9a87-7dcd9783076e"] @@ -1235,7 +1235,7 @@ (fn [acc [_k v]] (+ acc v)) (fn [acc [_k v]] (- acc v))) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -1262,7 +1262,7 @@ topic-a) (k/count topic-b) (k/to-kstream) - (k/to topic-c))))] + (k/to! topic-c))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 0) @@ -1284,7 +1284,7 @@ topic-a) (k/reduce + - topic-b) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -1310,7 +1310,7 @@ topic-a) (k/reduce + -) (k/to-kstream) - (k/to topic-b))))] + (k/to! topic-b))))] (let [publish (partial mock/publish driver topic-a)] (publish 1 1) @@ -1339,7 +1339,7 @@ (fn [[k _v]] k) +) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-stream (partial mock/publish driver topic-a) publish-table (partial mock/publish driver topic-b)] @@ -1362,7 +1362,7 @@ (fn [[k _v]] k) safe-add) - (k/to topic-c)))))] + (k/to! topic-c)))))] (let [publish-stream (partial mock/publish driver topic-a) publish-table (partial mock/publish driver topic-b)] @@ -1387,7 +1387,7 @@ (fn [ctx v] {:new-val (+ (:val v) 1) :topic (.topic ctx)}))) - (k/to output-t))))] + (k/to! output-t))))] (let [publisher (partial mock/publish driver input-t)] (publisher 100 {:val 10}) @@ -1419,7 +1419,7 @@ (.put store k new-val) (key-value [k new-val])))) ["test-store"]) - (k/to output-t))))] + (k/to! output-t))))] (let [publisher (partial mock/publish driver input-t)] (publisher 1 1) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index aa100db5..90925fb0 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -29,7 +29,7 @@ (let [in (-> (k/kstream builder in) (k/map (fn [[k v]] [k v])))] - (k/to in out) + (k/to! in out) builder))) (def test-in diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 05115a4a..0d9e4191 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -39,7 +39,7 @@ (let [in (-> (k/kstream builder in) (k/map (fn [[k v]] [k v])))] - (k/to in out) + (k/to! in out) builder))) (defn test-driver diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index fd0b916a..9edbbd6c 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -41,7 +41,7 @@ (let [in (-> (k/kstream builder in) (k/map (fn [[k v]] [k v])))] - (k/to in out) + (k/to! in out) builder))) (def test-in