diff --git a/.circleci/config.yml b/.circleci/config.yml index 6453bfda..d01167fa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,6 +100,17 @@ jobs: key: v1-jackdaw-repo-{{ .Branch }}-{{ .Revision }} paths: - . + + outdated_deps: + docker: + - image: clojure:temurin-18-tools-deps-focal + steps: + - *restore_repo + - checkout + - run: | + clojure -Ttools install com.github.liquidz/antq '{:git/tag "1.7.798"}' :as antq + clojure -Tantq outdated || true + lint: machine: true working_directory: /home/circleci/jackdaw @@ -170,6 +181,7 @@ workflows: build_and_test: jobs: - lint + - outdated_deps - checkout_code - deps: requires: diff --git a/README.md b/README.md index 833d0617..3525a34b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,13 @@ -# Jackdaw · [![Clojars Project](https://img.shields.io/clojars/v/fundingcircle/jackdaw.svg)](https://clojars.org/fundingcircle/jackdaw) [![Code Coverage](https://codecov.io/gh/FundingCircle/jackdaw/branch/master/graph/badge.svg)](https://codecov.io/gh/FundingCircle/jackdaw) [![cljdoc badge](https://cljdoc.org/badge/fundingcircle/jackdaw)](https://cljdoc.org/d/fundingcircle/jackdaw/CURRENT) [![CircleCI](https://circleci.com/gh/FundingCircle/jackdaw.svg?style=shield)](https://circleci.com/gh/FundingCircle/jackdaw) +# Jackdaw +[![Clojars Project](https://img.shields.io/clojars/v/fundingcircle/jackdaw.svg)](https://clojars.org/fundingcircle/jackdaw) +[![Code Coverage](https://codecov.io/gh/FundingCircle/jackdaw/branch/master/graph/badge.svg)](https://codecov.io/gh/FundingCircle/jackdaw) +[![cljdoc badge](https://cljdoc.org/badge/fundingcircle/jackdaw)](https://cljdoc.org/d/fundingcircle/jackdaw/CURRENT) +[![CircleCI](https://circleci.com/gh/FundingCircle/jackdaw.svg?style=shield)](https://circleci.com/gh/FundingCircle/jackdaw) -Jackdaw is a Clojure library for the Apache Kafka distributed streaming platform. With Jackdaw, you can create and list topics using the AdminClient API, produce and consume records using the Producer and Consumer APIs, and create stream processing applications using the Streams API. Jackdaw also contains functions to serialize and deserialize records as JSON, EDN, and Avro, as well as functions for writing unit and integration tests. +Jackdaw is a Clojure library for the Apache Kafka distributed streaming platform. With Jackdaw, you can create and list +topics using the AdminClient API, produce and consume records using the Producer and Consumer APIs, and create stream +processing applications using the Streams API. Jackdaw also contains functions to serialize and deserialize records as +JSON, EDN, and Avro, as well as functions for writing unit and integration tests. # Supported versions @@ -20,25 +27,33 @@ You can find all the documentation on [cljdoc](https://cljdoc.org/d/fundingcircl ## Contributing -We welcome any thoughts or patches. You can reach us in [`#jackdaw`](https://clojurians.slack.com/messages/CEA3C7UG0/) (or [open an issue](https://github.com/fundingcircle/jackdaw/issues)). +We welcome any thoughts or patches. You can reach us in [`#jackdaw`](https://clojurians.slack.com/messages/CEA3C7UG0/) +(or [open an issue](https://github.com/fundingcircle/jackdaw/issues)). ## Related projects If you want to get more insight about your topologies, you can use the [Topology Grapher](https://github.com/FundingCircle/topology-grapher) library to generate graphs. -See [an example using jackdaw](https://github.com/FundingCircle/topology-grapher/blob/master/sample_project/src/jackdaw_topology.clj) to check how to integrate it with your topology. +See [an example using jackdaw](https://github.com/FundingCircle/topology-grapher/blob/master/sample_project/src/jackdaw_topology.clj) +to check how to integrate it with your topology. ## Releasing Anyone with the appropriate credentials can "cut a release" of jackdaw using the following steps. - 1. Review the diff of master vs the latest released tag (e.g. while preparing 0.7.0, I looked at https://github.com/FundingCircle/jackdaw/compare/0.6.9...master to see what was actually merged vs what was in the Changelog). Make a PR to put a date on the version being released and if necessary ensure completeness and consistency of the Changelog - 2. Use the [Draft a new release](https://github.com/FundingCircle/jackdaw/releases/new) feature in github to prepare a release + 1. Review the diff of master vs the latest released tag (e.g. while preparing 0.7.0, I looked at + https://github.com/FundingCircle/jackdaw/compare/0.6.9...master to see what was actually merged vs what was in the + Changelog). Make a PR to put a date on the version being released and if necessary ensure completeness and + consistency of the changelog. + 2. Use the [Draft a new release](https://github.com/FundingCircle/jackdaw/releases/new) feature in github to prepare a + release. 3. In the "tag version" field, enter the proposed version 4. In the "release title" field, enter "v[version]" - 5. In the "describe this release" field, enter the contents of the Changelog and add a credit to the contributors of the release + 5. In the "describe this release" field, enter the contents of the Changelog and add a credit to the contributors of + the release 6. When happy, use the "Publish Release" button to publish the release in github which creates a corresponding git tag - 7. Once the tag is seen by circleci, a deployment build is triggered which builds the project and deploys a release to clojars + 7. Once the tag is seen by circleci, a deployment build is triggered which builds the project and deploys a release to + clojars Steps 2 to 6 is essentially `git tag $version -m "$title\n\n$description" && git push --tags` diff --git a/dev/jackdaw/repl.clj b/dev/jackdaw/repl.clj index 9585b0d7..b6e1cc9c 100644 --- a/dev/jackdaw/repl.clj +++ b/dev/jackdaw/repl.clj @@ -6,7 +6,8 @@ [jackdaw.serdes :as js] [jackdaw.streams :as j] [jackdaw.streams.describe :as jsd]) - (:import [clojure.lang ILookup Associative])) + (:import [clojure.lang ILookup Associative] + (org.apache.kafka.streams StreamsBuilder))) ;;; ------------------------------------------------------------ @@ -119,7 +120,7 @@ "Takes a topology and streams config and walks the topology to find all the user-defined topics." [topology streams-config] - (->> (jsd/describe-topology (.build (j/streams-builder* topology)) + (->> (jsd/describe-topology (.build ^StreamsBuilder (j/streams-builder* topology)) streams-config) (map :nodes) (reduce concat) diff --git a/project.clj b/project.clj index 6db126f5..8a3d4018 100644 --- a/project.clj +++ b/project.clj @@ -8,7 +8,7 @@ :repositories [["confluent" {:url "https://packages.confluent.io/maven/"}] ["mulesoft" {:url "https://repository.mulesoft.org/nexus/content/repositories/public/"}]] - :dependencies [[aleph "0.4.6"] + :dependencies [[aleph/aleph "0.4.7"] [danlentz/clj-uuid "0.1.9" :exclusions [primitive-math]] @@ -19,27 +19,16 @@ :exclusions [com.fasterxml.jackson.core/jackson-databind]] [io.confluent/kafka-avro-serializer "6.1.1"] [io.confluent/kafka-json-schema-serializer "6.1.1"] - [org.apache.kafka/kafka-clients "2.8.0"] - [org.apache.kafka/kafka-streams "2.8.0"] - [org.apache.kafka/kafka-streams-test-utils "2.8.0"] + [org.apache.kafka/kafka-clients "2.8.1"] + [org.apache.kafka/kafka-streams "2.8.1"] + [org.apache.kafka/kafka-streams-test-utils "2.8.1"] [org.clojure/clojure "1.10.1" :scope "provided"] - [org.clojure/data.json "0.2.6"] - [org.clojure/data.fressian "0.2.1"] - [org.clojure/tools.logging "0.4.1"] - [org.clojure/core.cache "0.7.2"] - [metosin/jsonista "0.3.3"] - - ;; Use specific netty version to avoid critical CVE - ;; pulled by Aleph v0.4.6 (last stable version) - [io.netty/netty-transport "4.1.68.Final"] - [io.netty/netty-transport-native-epoll "4.1.68.Final"] - [io.netty/netty-codec "4.1.68.Final"] - [io.netty/netty-codec-http "4.1.68.Final"] - [io.netty/netty-handler "4.1.68.Final"] - [io.netty/netty-handler-proxy "4.1.68.Final"] - [io.netty/netty-resolver "4.1.68.Final"] - [io.netty/netty-resolver-dns "4.1.68.Final"] + [org.clojure/data.json "2.4.0"] + [org.clojure/data.fressian "1.0.0"] + [org.clojure/tools.logging "1.2.4"] + [org.clojure/core.cache "1.0.225"] + [metosin/jsonista "0.3.6"] ;; Use specific commons-compress version to avoid ;; CVE-2021-36090 pulled by avro 1.9.2 @@ -86,14 +75,14 @@ :resource-paths ["test/resources"] :injections [(require 'io.aviso.logging.setup)] - :dependencies [[io.aviso/logging "0.3.2"] - [org.apache.kafka/kafka-streams-test-utils "2.8.0"] - [org.apache.kafka/kafka-clients "2.8.0" :classifier "test"] - [org.clojure/test.check "0.9.0"] - [org.apache.kafka/kafka_2.13 "2.8.0"] - [lambdaisland/kaocha "0.0-529"] - [lambdaisland/kaocha-cloverage "0.0-32"] - [lambdaisland/kaocha-junit-xml "0.0-70"]]} + :dependencies [[io.aviso/logging "1.0"] + [org.apache.kafka/kafka-streams-test-utils "2.8.1"] + [org.apache.kafka/kafka-clients "2.8.1" :classifier "test"] + [org.clojure/test.check "1.1.1"] + [org.apache.kafka/kafka_2.13 "2.8.1"] + [lambdaisland/kaocha "1.66.1034"] + [lambdaisland/kaocha-cloverage "1.0.75"] + [lambdaisland/kaocha-junit-xml "0.0.76"]]} ;; This is not in fact what lein defines repl to be :repl diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index a9e253e3..6ce8f400 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -1,8 +1,8 @@ (ns jackdaw.serdes.avro "DEPRECATION NOTICE: - This namespace is deprecated and will soon be removed. Please use - jackdaw.serdes.avro.confluent. + Since Jackdaw 0.6.0 this namespace is deprecated and will soon be removed. + Please use jackdaw.serdes.avro.confluent. Generating Serdes mapping Clojure <-> Avro. @@ -58,6 +58,8 @@ for all of Avro's fundamental types and most of its compounds. " + {:deprecated "0.6.0" + :superseded-by "jackdaw.serdes.avro.confluent"} (:require [clojure.tools.logging :as log] [clojure.core.cache :as cache] [clojure.data] @@ -439,7 +441,7 @@ {:path path, :clj-data clj-map} (AvroTypeException. "Type Error")))) - (let [record-builder (GenericRecordBuilder. schema)] + (let [record-builder (GenericRecordBuilder. ^Schema schema)] (try (doseq [[k v] clj-map] (let [new-k (mangle (name k)) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 93ae169b..f70de80d 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -108,20 +108,32 @@ [kstream] (p/print! kstream)) -(defn through +(defn ^{:deprecated "2.6.0"} + through "Materializes a stream to a topic, and returns a new KStream that will consume messages from the topic." [kstream topic-config] (p/through kstream topic-config)) -(defn to +(defn ^{:deprecated "Jackdaw 0.9.6" + :superseded-by "to!"} + to + "Please use `to!` which is part of the Kafka Stream API. + `to` is an incorrect name that was introduced by Jackdaw. + + Materializes a stream to a topic." + [kstream topic-config] + (p/to! kstream topic-config)) + +(defn to! "Materializes a stream to a topic." [kstream topic-config] (p/to! kstream topic-config)) ;; IKStream -(defn branch +(defn ^{:deprecated "2.8.0"} + branch "Returns a list of KStreams, one for each of the `predicate-fns` provided." [kstream predicate-fns] diff --git a/src/jackdaw/streams/mock.clj b/src/jackdaw/streams/mock.clj index 9418fb3a..801a839c 100644 --- a/src/jackdaw/streams/mock.clj +++ b/src/jackdaw/streams/mock.clj @@ -10,7 +10,7 @@ org.apache.kafka.common.header.internals.RecordHeaders [org.apache.kafka.common.serialization Serde Serdes] (org.apache.kafka.streams.test TestRecord) - (java.util UUID List))) + (java.util UUID Arrays))) (set! *warn-on-reflection* false) @@ -48,7 +48,7 @@ (.pipeInput test-input-topic (TestRecord. k v))) ([time-ms k v] (let [record (TestRecord. k v (RecordHeaders.) ^Long time-ms)] - (.pipeRecordList test-input-topic (List/of record))))))) + (.pipeRecordList test-input-topic (Arrays/asList (to-array [record])))))))) (defn publish ([test-driver topic-config k v] diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 63c73032..068ea7dd 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -19,7 +19,7 @@ (defn- new-topic [t] - (doto (NewTopic. (:topic-name t) + (doto (NewTopic. ^String (:topic-name t) (int (:partition-count t)) (short (:replication-factor t))) (.configs (:config t)))) @@ -204,7 +204,10 @@ (fn [t] (if-not (class-exists? 'kafka.tools.StreamsResetter) (throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool")) - (let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter")) + (let [rt (-> "kafka.tools.StreamsResetter" + (clojure.lang.RT/classForName) + (.getDeclaredConstructor (into-array Class [])) + (.newInstance (into-array []))) args (concat ["--application-id" (get app-config "application.id") "--bootstrap-servers" (get app-config "bootstrap.servers")] reset-args) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index 55f9598a..dd4e4540 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -12,6 +12,7 @@ serde-map byte-array-serde]]) (:import + java.time.Duration org.apache.kafka.common.header.Header org.apache.kafka.clients.consumer.Consumer org.apache.kafka.clients.consumer.ConsumerRecord @@ -30,7 +31,7 @@ (defn load-assignments [consumer] - (.poll ^Consumer consumer 0) + (.poll ^Consumer consumer (Duration/ofMillis 0)) (.assignment ^Consumer consumer)) (defn seek-to-end @@ -50,7 +51,7 @@ [messages] (fn [consumer] (try - (let [m (.poll ^Consumer consumer 1000)] + (let [m (.poll ^Consumer consumer (Duration/ofMillis 1000))] (when m (s/put-all! messages m))) (catch Throwable e diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index a2440fa5..6e38ae1a 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -3,14 +3,17 @@ [clojure.stacktrace :as stacktrace] [clojure.tools.logging :as log] [jackdaw.test.journal :as j] + [jackdaw.serdes.fn :as jfn] + [jackdaw.streams.mock :as smock] [jackdaw.test.transports :as t :refer [deftransport]] - [jackdaw.test.serde :refer [byte-array-deserializer + [jackdaw.test.serde :refer [byte-array-serde apply-serializers apply-deserializers serde-map]] [manifold.stream :as s] [manifold.deferred :as d]) (:import - (org.apache.kafka.common.record TimestampType) - (org.apache.kafka.clients.consumer ConsumerRecord))) + (org.apache.kafka.common.record TimestampType) + (org.apache.kafka.clients.consumer ConsumerRecord) + (org.apache.kafka.clients.producer ProducerRecord))) (set! *warn-on-reflection* false) @@ -79,13 +82,12 @@ [messages topic-config] (fn [driver] (let [fetch (fn [[k t]] - {:topic k - :output (loop [collected []] - (if-let [o (.readOutput driver (:topic-name t) - byte-array-deserializer - byte-array-deserializer)] - (recur (conj collected o)) - collected))}) + (let [topic-name (:topic-name t)] + {:topic k + :output (loop [collected []] + (if-let [{:keys [key value]} (smock/consume driver (assoc byte-array-serde :topic-name topic-name))] + (recur (conj collected (ProducerRecord. topic-name key value))) + collected))})) topic-batches (->> topic-config (map fetch) (remove #(empty? (:output %))) @@ -160,13 +162,22 @@ {:messages messages :process process})) +(def identity-serializer (jfn/new-serializer {:serialize (fn [_ _ data] data)})) + (deftransport :mock [{:keys [driver topics]}] (let [serdes (serde-map topics) test-consumer (mock-consumer driver topics (get serdes :deserializers)) record-fn (fn [input-record] (try - (.pipeInput driver input-record) + (-> driver + (.createInputTopic + (.topic input-record) + identity-serializer ;; already serialized in mock-producer + identity-serializer) + (.pipeInput + (.key input-record) + (.value input-record))) (catch Exception e (let [trace (with-out-str (stacktrace/print-cause-trace e))] diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 94a95e3e..b274aa26 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -313,7 +313,7 @@ (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b") topic-c (mock/topic "topic-c") - windows (JoinWindows/of 1000) + windows (JoinWindows/of (Duration/ofMillis 1000)) driver (mock/build-driver (fn [builder] (let [left-kstream (k/kstream builder topic-a) right-kstream (k/kstream builder topic-b)] @@ -368,7 +368,7 @@ (let [topic-a (mock/topic "topic-a") topic-b (mock/topic "topic-b") topic-c (mock/topic "topic-c") - windows (JoinWindows/of 1000) + windows (JoinWindows/of (Duration/ofMillis 1000)) driver (mock/build-driver (fn [builder] (let [left-kstream (k/kstream builder topic-a) right-kstream (k/kstream builder topic-b)] @@ -1060,7 +1060,7 @@ (-> builder (k/kstream topic-a) (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) - (k/window-by-time (TimeWindows/of 1000)) + (k/window-by-time (TimeWindows/of (Duration/ofMillis 1000))) (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) @@ -1084,7 +1084,7 @@ (-> builder (k/kstream topic-a) (k/group-by-key) - (k/window-by-time (TimeWindows/of 1000)) + (k/window-by-time (TimeWindows/of (Duration/ofMillis 1000))) (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) @@ -1108,7 +1108,7 @@ (-> builder (k/kstream topic-a) (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) - (k/window-by-session (SessionWindows/with 1000)) + (k/window-by-session (SessionWindows/with (Duration/ofMillis 1000))) (k/reduce + topic-a) (k/to-kstream) (k/map (fn [[k v]] [(.key k) v])) @@ -1135,7 +1135,7 @@ (-> builder (k/kstream topic-a) (k/group-by-key) - (k/window-by-session (SessionWindows/with 1000)) + (k/window-by-session (SessionWindows/with (Duration/ofMillis 1000))) (k/aggregate (constantly 0) (fn [agg [_k v]] (+ agg v)) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index aa100db5..fbafa85c 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -129,7 +129,7 @@ (filter (fn [m] (= 2 (get-in m [:value :id])))) first)) - 1000 + 10000 "failed to find test-out=2") :info)] diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index b188b145..939b50d1 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -117,7 +117,8 @@ (with-open [t (jd.test/test-machine (kafka-transport))] (let [write (cmd/write! "foo" {:id "msg1" :payload "yolo"}) watch (cmd/watch (by-id "foo" "msg1") - {:info "failed to find foo with id=msg1"}) + {:info "failed to find foo with id=msg1" + :timeout 10000}) {:keys [results journal]} (jd.test/run-test t [write watch]) [write-result watch-result] results] @@ -143,7 +144,8 @@ {"MESSAGE.DATE" (.getBytes "1970-01-01") "MESSAGE.VERSION" (.getBytes "1.0.1")}}) watch (cmd/watch (by-id "foo" "msg1") - {:info "failed to find foo with id=msg1"}) + {:info "failed to find foo with id=msg1" + :timeout 10000}) {:keys [results journal]} (jd.test/run-test t [write watch]) [write-result watch-result] results] @@ -174,11 +176,13 @@ (with-open [t (jd.test/test-machine (kafka-transport))] (let [prog1 [(cmd/write! "foo" {:id "msg2" :payload "yolo"}) (cmd/watch (by-id "foo" "msg2") - {:info "failed to find foo with id=msg2"})] + {:info "failed to find foo with id=msg2" + :timeout 10000})] prog2 [(cmd/write! "foo" {:id "msg3" :payload "you only live twice"}) (cmd/watch (by-id "foo" "msg3") - {:info "failed to find foo with id=msg3"})]] + {:info "failed to find foo with id=msg3" + :timeout 10000})]] (testing "run test sequence and inspect results" (let [{:keys [results journal]} (jd.test/run-test t prog1)] @@ -248,7 +252,7 @@ (= (get-in r [:value :id]) "2"))) (not-empty)) true)) - {:timeout 2000})]))) + {:timeout 10000})]))) (catch Exception e (reset! error-raised e))) (is (not @error-raised))))