Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade deps and remove deprecated api calls #325

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ jobs:
key: v1-jackdaw-repo-{{ .Branch }}-{{ .Revision }}
paths:
- .

outdated_deps:
docker:
- image: clojure:temurin-18-tools-deps-focal
steps:
- *restore_repo
- checkout
- run: |
clojure -Ttools install com.github.liquidz/antq '{:git/tag "1.7.798"}' :as antq
clojure -Tantq outdated || true

lint:
machine: true
working_directory: /home/circleci/jackdaw
Expand Down Expand Up @@ -170,6 +181,7 @@ workflows:
build_and_test:
jobs:
- lint
- outdated_deps
- checkout_code
- deps:
requires:
Expand Down
31 changes: 23 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Jackdaw · [![Clojars Project](https://img.shields.io/clojars/v/fundingcircle/jackdaw.svg)](https://clojars.org/fundingcircle/jackdaw) [![Code Coverage](https://codecov.io/gh/FundingCircle/jackdaw/branch/master/graph/badge.svg)](https://codecov.io/gh/FundingCircle/jackdaw) [![cljdoc badge](https://cljdoc.org/badge/fundingcircle/jackdaw)](https://cljdoc.org/d/fundingcircle/jackdaw/CURRENT) [![CircleCI](https://circleci.com/gh/FundingCircle/jackdaw.svg?style=shield)](https://circleci.com/gh/FundingCircle/jackdaw)
# Jackdaw
[![Clojars Project](https://img.shields.io/clojars/v/fundingcircle/jackdaw.svg)](https://clojars.org/fundingcircle/jackdaw)
[![Code Coverage](https://codecov.io/gh/FundingCircle/jackdaw/branch/master/graph/badge.svg)](https://codecov.io/gh/FundingCircle/jackdaw)
[![cljdoc badge](https://cljdoc.org/badge/fundingcircle/jackdaw)](https://cljdoc.org/d/fundingcircle/jackdaw/CURRENT)
[![CircleCI](https://circleci.com/gh/FundingCircle/jackdaw.svg?style=shield)](https://circleci.com/gh/FundingCircle/jackdaw)

Jackdaw is a Clojure library for the Apache Kafka distributed streaming platform. With Jackdaw, you can create and list topics using the AdminClient API, produce and consume records using the Producer and Consumer APIs, and create stream processing applications using the Streams API. Jackdaw also contains functions to serialize and deserialize records as JSON, EDN, and Avro, as well as functions for writing unit and integration tests.
Jackdaw is a Clojure library for the Apache Kafka distributed streaming platform. With Jackdaw, you can create and list
topics using the AdminClient API, produce and consume records using the Producer and Consumer APIs, and create stream
processing applications using the Streams API. Jackdaw also contains functions to serialize and deserialize records as
JSON, EDN, and Avro, as well as functions for writing unit and integration tests.

# Supported versions

Expand All @@ -20,25 +27,33 @@ You can find all the documentation on [cljdoc](https://cljdoc.org/d/fundingcircl

## Contributing

We welcome any thoughts or patches. You can reach us in [`#jackdaw`](https://clojurians.slack.com/messages/CEA3C7UG0/) (or [open an issue](https://github.com/fundingcircle/jackdaw/issues)).
We welcome any thoughts or patches. You can reach us in [`#jackdaw`](https://clojurians.slack.com/messages/CEA3C7UG0/)
(or [open an issue](https://github.com/fundingcircle/jackdaw/issues)).

## Related projects

If you want to get more insight about your topologies, you can use the
[Topology Grapher](https://github.com/FundingCircle/topology-grapher) library to generate graphs.
See [an example using jackdaw](https://github.com/FundingCircle/topology-grapher/blob/master/sample_project/src/jackdaw_topology.clj) to check how to integrate it with your topology.
See [an example using jackdaw](https://github.com/FundingCircle/topology-grapher/blob/master/sample_project/src/jackdaw_topology.clj)
to check how to integrate it with your topology.

## Releasing

Anyone with the appropriate credentials can "cut a release" of jackdaw using the following steps.

1. Review the diff of master vs the latest released tag (e.g. while preparing 0.7.0, I looked at https://github.com/FundingCircle/jackdaw/compare/0.6.9...master to see what was actually merged vs what was in the Changelog). Make a PR to put a date on the version being released and if necessary ensure completeness and consistency of the Changelog
2. Use the [Draft a new release](https://github.com/FundingCircle/jackdaw/releases/new) feature in github to prepare a release
1. Review the diff of master vs the latest released tag (e.g. while preparing 0.7.0, I looked at
https://github.com/FundingCircle/jackdaw/compare/0.6.9...master to see what was actually merged vs what was in the
Changelog). Make a PR to put a date on the version being released and if necessary ensure completeness and
consistency of the changelog.
2. Use the [Draft a new release](https://github.com/FundingCircle/jackdaw/releases/new) feature in github to prepare a
release.
3. In the "tag version" field, enter the proposed version
4. In the "release title" field, enter "v[version]"
5. In the "describe this release" field, enter the contents of the Changelog and add a credit to the contributors of the release
5. In the "describe this release" field, enter the contents of the Changelog and add a credit to the contributors of
the release
6. When happy, use the "Publish Release" button to publish the release in github which creates a corresponding git tag
7. Once the tag is seen by circleci, a deployment build is triggered which builds the project and deploys a release to clojars
7. Once the tag is seen by circleci, a deployment build is triggered which builds the project and deploys a release to
clojars

Steps 2 to 6 is essentially `git tag $version -m "$title\n\n$description" && git push --tags`

Expand Down
5 changes: 3 additions & 2 deletions dev/jackdaw/repl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.describe :as jsd])
(:import [clojure.lang ILookup Associative]))
(:import [clojure.lang ILookup Associative]
(org.apache.kafka.streams StreamsBuilder)))


;;; ------------------------------------------------------------
Expand Down Expand Up @@ -119,7 +120,7 @@
"Takes a topology and streams config and walks the topology to find
all the user-defined topics."
[topology streams-config]
(->> (jsd/describe-topology (.build (j/streams-builder* topology))
(->> (jsd/describe-topology (.build ^StreamsBuilder (j/streams-builder* topology))
streams-config)
(map :nodes)
(reduce concat)
Expand Down
45 changes: 17 additions & 28 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
:repositories [["confluent" {:url "https://packages.confluent.io/maven/"}]
["mulesoft" {:url "https://repository.mulesoft.org/nexus/content/repositories/public/"}]]

:dependencies [[aleph "0.4.6"]
:dependencies [[aleph/aleph "0.4.7"]
[danlentz/clj-uuid "0.1.9"
:exclusions [primitive-math]]

Expand All @@ -19,27 +19,16 @@
:exclusions [com.fasterxml.jackson.core/jackson-databind]]
[io.confluent/kafka-avro-serializer "6.1.1"]
[io.confluent/kafka-json-schema-serializer "6.1.1"]
[org.apache.kafka/kafka-clients "2.8.0"]
[org.apache.kafka/kafka-streams "2.8.0"]
[org.apache.kafka/kafka-streams-test-utils "2.8.0"]
[org.apache.kafka/kafka-clients "2.8.1"]
[org.apache.kafka/kafka-streams "2.8.1"]
[org.apache.kafka/kafka-streams-test-utils "2.8.1"]

[org.clojure/clojure "1.10.1" :scope "provided"]
[org.clojure/data.json "0.2.6"]
[org.clojure/data.fressian "0.2.1"]
[org.clojure/tools.logging "0.4.1"]
[org.clojure/core.cache "0.7.2"]
[metosin/jsonista "0.3.3"]

;; Use specific netty version to avoid critical CVE
;; pulled by Aleph v0.4.6 (last stable version)
[io.netty/netty-transport "4.1.68.Final"]
[io.netty/netty-transport-native-epoll "4.1.68.Final"]
[io.netty/netty-codec "4.1.68.Final"]
[io.netty/netty-codec-http "4.1.68.Final"]
[io.netty/netty-handler "4.1.68.Final"]
[io.netty/netty-handler-proxy "4.1.68.Final"]
[io.netty/netty-resolver "4.1.68.Final"]
[io.netty/netty-resolver-dns "4.1.68.Final"]
[org.clojure/data.json "2.4.0"]
[org.clojure/data.fressian "1.0.0"]
[org.clojure/tools.logging "1.2.4"]
[org.clojure/core.cache "1.0.225"]
[metosin/jsonista "0.3.6"]

;; Use specific commons-compress version to avoid
;; CVE-2021-36090 pulled by avro 1.9.2
Expand Down Expand Up @@ -86,14 +75,14 @@

:resource-paths ["test/resources"]
:injections [(require 'io.aviso.logging.setup)]
:dependencies [[io.aviso/logging "0.3.2"]
[org.apache.kafka/kafka-streams-test-utils "2.8.0"]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "0.9.0"]
[org.apache.kafka/kafka_2.13 "2.8.0"]
[lambdaisland/kaocha "0.0-529"]
[lambdaisland/kaocha-cloverage "0.0-32"]
[lambdaisland/kaocha-junit-xml "0.0-70"]]}
:dependencies [[io.aviso/logging "1.0"]
[org.apache.kafka/kafka-streams-test-utils "2.8.1"]
[org.apache.kafka/kafka-clients "2.8.1" :classifier "test"]
[org.clojure/test.check "1.1.1"]
[org.apache.kafka/kafka_2.13 "2.8.1"]
[lambdaisland/kaocha "1.66.1034"]
[lambdaisland/kaocha-cloverage "1.0.75"]
[lambdaisland/kaocha-junit-xml "0.0.76"]]}

;; This is not in fact what lein defines repl to be
:repl
Expand Down
8 changes: 5 additions & 3 deletions src/jackdaw/serdes/avro.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns jackdaw.serdes.avro
"DEPRECATION NOTICE:

This namespace is deprecated and will soon be removed. Please use
jackdaw.serdes.avro.confluent.
Since Jackdaw 0.6.0 this namespace is deprecated and will soon be removed.
Please use jackdaw.serdes.avro.confluent.


Generating Serdes mapping Clojure <-> Avro.
Expand Down Expand Up @@ -58,6 +58,8 @@
for all of Avro's fundamental types and most of its compounds.

"
{:deprecated "0.6.0"
:superseded-by "jackdaw.serdes.avro.confluent"}
(:require [clojure.tools.logging :as log]
[clojure.core.cache :as cache]
[clojure.data]
Expand Down Expand Up @@ -439,7 +441,7 @@
{:path path, :clj-data clj-map}
(AvroTypeException. "Type Error"))))

(let [record-builder (GenericRecordBuilder. schema)]
(let [record-builder (GenericRecordBuilder. ^Schema schema)]
(try
(doseq [[k v] clj-map]
(let [new-k (mangle (name k))
Expand Down
18 changes: 15 additions & 3 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,32 @@
[kstream]
(p/print! kstream))

(defn through
(defn ^{:deprecated "2.6.0"}
through
"Materializes a stream to a topic, and returns a new KStream that will
consume messages from the topic."
[kstream topic-config]
(p/through kstream topic-config))

(defn to
(defn ^{:deprecated "Jackdaw 0.9.6"
:superseded-by "to!"}
to
"Please use `to!` which is part of the Kafka Stream API.
`to` is an incorrect name that was introduced by Jackdaw.

Materializes a stream to a topic."
[kstream topic-config]
(p/to! kstream topic-config))

(defn to!
"Materializes a stream to a topic."
[kstream topic-config]
(p/to! kstream topic-config))

;; IKStream

(defn branch
(defn ^{:deprecated "2.8.0"}
branch
"Returns a list of KStreams, one for each of the `predicate-fns`
provided."
[kstream predicate-fns]
Expand Down
4 changes: 2 additions & 2 deletions src/jackdaw/streams/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
org.apache.kafka.common.header.internals.RecordHeaders
[org.apache.kafka.common.serialization Serde Serdes]
(org.apache.kafka.streams.test TestRecord)
(java.util UUID List)))
(java.util UUID Arrays)))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -48,7 +48,7 @@
(.pipeInput test-input-topic (TestRecord. k v)))
([time-ms k v]
(let [record (TestRecord. k v (RecordHeaders.) ^Long time-ms)]
(.pipeRecordList test-input-topic (List/of record)))))))
(.pipeRecordList test-input-topic (Arrays/asList (to-array [record]))))))))

(defn publish
([test-driver topic-config k v]
Expand Down
7 changes: 5 additions & 2 deletions src/jackdaw/test/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

(defn- new-topic
[t]
(doto (NewTopic. (:topic-name t)
(doto (NewTopic. ^String (:topic-name t)
(int (:partition-count t))
(short (:replication-factor t)))
(.configs (:config t))))
Expand Down Expand Up @@ -204,7 +204,10 @@
(fn [t]
(if-not (class-exists? 'kafka.tools.StreamsResetter)
(throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool"))
(let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter"))
(let [rt (-> "kafka.tools.StreamsResetter"
(clojure.lang.RT/classForName)
(.getDeclaredConstructor (into-array Class []))
(.newInstance (into-array [])))
args (concat ["--application-id" (get app-config "application.id")
"--bootstrap-servers" (get app-config "bootstrap.servers")]
reset-args)
Expand Down
5 changes: 3 additions & 2 deletions src/jackdaw/test/transports/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
serde-map
byte-array-serde]])
(:import
java.time.Duration
org.apache.kafka.common.header.Header
org.apache.kafka.clients.consumer.Consumer
org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -30,7 +31,7 @@

(defn load-assignments
[consumer]
(.poll ^Consumer consumer 0)
(.poll ^Consumer consumer (Duration/ofMillis 0))
(.assignment ^Consumer consumer))

(defn seek-to-end
Expand All @@ -50,7 +51,7 @@
[messages]
(fn [consumer]
(try
(let [m (.poll ^Consumer consumer 1000)]
(let [m (.poll ^Consumer consumer (Duration/ofMillis 1000))]
(when m
(s/put-all! messages m)))
(catch Throwable e
Expand Down
33 changes: 22 additions & 11 deletions src/jackdaw/test/transports/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
[clojure.stacktrace :as stacktrace]
[clojure.tools.logging :as log]
[jackdaw.test.journal :as j]
[jackdaw.serdes.fn :as jfn]
[jackdaw.streams.mock :as smock]
[jackdaw.test.transports :as t :refer [deftransport]]
[jackdaw.test.serde :refer [byte-array-deserializer
[jackdaw.test.serde :refer [byte-array-serde
apply-serializers apply-deserializers serde-map]]
[manifold.stream :as s]
[manifold.deferred :as d])
(:import
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)))
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)
(org.apache.kafka.clients.producer ProducerRecord)))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -79,13 +82,12 @@
[messages topic-config]
(fn [driver]
(let [fetch (fn [[k t]]
{:topic k
:output (loop [collected []]
(if-let [o (.readOutput driver (:topic-name t)
byte-array-deserializer
byte-array-deserializer)]
(recur (conj collected o))
collected))})
(let [topic-name (:topic-name t)]
{:topic k
:output (loop [collected []]
(if-let [{:keys [key value]} (smock/consume driver (assoc byte-array-serde :topic-name topic-name))]
(recur (conj collected (ProducerRecord. topic-name key value)))
collected))}))
topic-batches (->> topic-config
(map fetch)
(remove #(empty? (:output %)))
Expand Down Expand Up @@ -160,13 +162,22 @@
{:messages messages
:process process}))

(def identity-serializer (jfn/new-serializer {:serialize (fn [_ _ data] data)}))

(deftransport :mock
[{:keys [driver topics]}]
(let [serdes (serde-map topics)
test-consumer (mock-consumer driver topics (get serdes :deserializers))
record-fn (fn [input-record]
(try
(.pipeInput driver input-record)
(-> driver
(.createInputTopic
(.topic input-record)
identity-serializer ;; already serialized in mock-producer
identity-serializer)
(.pipeInput
(.key input-record)
(.value input-record)))
(catch Exception e
(let [trace (with-out-str
(stacktrace/print-cause-trace e))]
Expand Down
Loading