Skip to content

Commit

Permalink
initial-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jbropho committed Feb 27, 2024
1 parent de97f6d commit 46a9fb2
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 46 deletions.
69 changes: 69 additions & 0 deletions .github/workflows/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
run-name: Jackdaw build

on:
push:
branches-ignore:
- master
- main
tags:
- '[0-9]+.[0-9]+.[0-9]'

jobs:
Explore-GitHub-Actions:
runs-on: ubuntu-latest
container: clojure:latest
environment: test
services:
kafka:
image: confluentinc/cp-kafka:5.1.3
env:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.1.2
env:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
kafka-rest:
image: confluentinc/cp-kafka-rest:6.1.1
env:
KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_REST_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_REST_LISTENERS: http://0.0.0.0:8082
KAFKA_REST_SCHEMA_REGISTRY: http://schema-registry:8081
KAFKA_REST_HOST_NAME: kafka-rest
zookeeper:
image: confluentinc/cp-zookeeper:5.1.3
env:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
steps:
- name: checkout
uses: actions/checkout@v4
with:
fetch-tags: true
fetch-depth: 100
- name: build
run: |
git config --global --add safe.directory /__w/jackdaw/jackdaw
lein kaocha
env:
KAFKA_BOOTSTRAP_SERVERS: kafka
ZOOKEEPER_HOST: zoopkeeper
SCHEMA_REGISTRY_HOST: schema-registry
KAFKA_REST_PROXY_HOST: kafka-rest
- name: lint
run: lein clj-kondo --lint src test || true
- name: deploy
env:
CLOJARS_USERNAME: ${{ secrets.CLOJARS_USERNAME }}
CLOJARS_PASSWORD: ${{ secrets.CLOJARS_PASSWORD }}
run: lein do jar, pom, deploy clojars

5 changes: 4 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(defproject fundingcircle/jackdaw "_"
:description "A Clojure library for the Apache Kafka distributed streaming platform."

:license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"}

:scm {:name "git" :url "https://github.com/fundingcircle/jackdaw"}

:url "https://github.com/FundingCircle/jackdaw/"
Expand Down Expand Up @@ -33,7 +35,8 @@

:aliases {"kaocha" ["run" "-m" "kaocha.runner"]}
:aot [jackdaw.serdes.edn2 jackdaw.serdes.fressian jackdaw.serdes.fn-impl]
:plugins [[me.arrdem/lein-git-version "2.0.8"]]
:plugins [[me.arrdem/lein-git-version "2.0.8"]
[com.github.clj-kondo/lein-clj-kondo "0.2.5"]]

:git-version
{:status-to-version
Expand Down
5 changes: 3 additions & 2 deletions test/jackdaw/client/partitioning_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require
[clojure.test :refer [deftest is testing]]
[jackdaw.client :as client]
[jackdaw.client.partitioning :as part]))
[jackdaw.client.partitioning :as part]
[jackdaw.utils :as utils]))

(set! *warn-on-reflection* false)

Expand All @@ -22,7 +23,7 @@


(deftest test->ProducerRecord
(with-open [p (client/producer {"bootstrap.servers" "localhost:9092"
(with-open [p (client/producer {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})]
(testing "absent key-fn"
Expand Down
5 changes: 3 additions & 2 deletions test/jackdaw/client_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[jackdaw.client :as client]
[jackdaw.test.fixtures :as fix]
[jackdaw.test.serde :as serde]
[jackdaw.utils :as utils]
[jackdaw.data :as data])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit]
java.time.Duration
Expand Down Expand Up @@ -40,7 +41,7 @@
"high-partition-topic" high-partition-topic})

(defn broker-config []
{"bootstrap.servers" "localhost:9092"})
{"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")})

(defn producer-config []
(-> (broker-config)
Expand Down Expand Up @@ -159,7 +160,7 @@

(deftest ^:integration consumer-test
(let [config {"group.id" "jackdaw-client-test-consumer-test"
"bootstrap.servers" "localhost:9092"
"bootstrap.servers" "kafka:9092"
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"}
key-serde (:key-serde foo-topic)
Expand Down
9 changes: 5 additions & 4 deletions test/jackdaw/serdes/avro/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
[jackdaw.data :as jd]
[jackdaw.serdes.avro :as avro]
[jackdaw.serdes.avro.schema-registry :as reg]
[jackdaw.test.fixtures :as fix])
[jackdaw.test.fixtures :as fix]
[jackdaw.utils :as utils])
(:import [org.apache.kafka.common.serialization Serde Serdes]))

(set! *warn-on-reflection* false)

(def +real-schema-registry-url+
"http://localhost:8081")
(format "http://%s:8081" (utils/schema-registry-host)))

(def +type-registry+
(merge avro/+base-schema-type-registry+
Expand Down Expand Up @@ -65,8 +66,8 @@
"A Kafka consumer or streams config."
(let [id (str "dev-" (java.util.UUID/randomUUID))]
{"replication.factor" "1", "group.id" id, "application.id" id,
"bootstrap.servers" "localhost:9092"
"zookeeper.connect" "localhost:2181"
"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"zookeeper.connect" (str (utils/zookeeper-host) ":2181")
"request.timeout.ms" "1000"}))

;;;; Schemas
Expand Down
5 changes: 3 additions & 2 deletions test/jackdaw/serdes/avro_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[clojure.data :refer [diff]]
[clojure.data.json :as json]
[jackdaw.serdes.avro :as avro]
[jackdaw.serdes.avro.schema-registry :as reg])
[jackdaw.serdes.avro.schema-registry :as reg]
[jackdaw.utils :as utils])
(:import [java.nio ByteBuffer]
[java.util Collection]
[org.apache.avro Schema$Parser Schema]
Expand Down Expand Up @@ -46,7 +47,7 @@
serde-config)
schema-registry-config
{:avro.schema-registry/client registry-client
:avro.schema-registry/url "localhost:8081"}]
:avro.schema-registry/url (str (utils/schema-registry-host) ":8081")}]
(avro/serde +registry+ schema-registry-config serde-config))))

(defn deserialize [serde topic x]
Expand Down
3 changes: 2 additions & 1 deletion test/jackdaw/serdes/json_schema/confluent_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns jackdaw.serdes.json-schema.confluent-test
(:require [jackdaw.serdes.json-schema.confluent :as jsco]
[jackdaw.serdes.avro.schema-registry :as reg]
[jackdaw.utils :as utils]
[clojure.data.json :as json]
[clojure.test :refer [deftest is testing] :as test]))

Expand All @@ -15,7 +16,7 @@
{"json.fail.invalid.schema" false}
:serializer-properties
{"json.fail.invalid.schema" false}})))
schema-registry-url "localhost:8081"
schema-registry-url (str (utils/schema-registry-host) ":8081")
key? false]
(jsco/serde schema-registry-url schema-str key? serde-config))))

Expand Down
2 changes: 1 addition & 1 deletion test/jackdaw/test/commands/write_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
:key-serde :long
:value-serde :json}))

(def kafka-config {"bootstrap.servers" "localhost:9092"
(def kafka-config {"bootstrap.servers" "kafka:9092"
"group.id" "kafka-write-test"})

(defn with-transport
Expand Down
33 changes: 17 additions & 16 deletions test/jackdaw/test/fixtures_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns jackdaw.test.fixtures-test
(:require
[clojure.test :refer [deftest is]]
[jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]])
[jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]]
[jackdaw.utils :as utils])
(:import
(org.apache.kafka.clients.admin AdminClient)))

Expand All @@ -14,7 +15,7 @@
:config {}})

(def kafka-config
{"bootstrap.servers" "localhost:9092"})
{"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")})

(def test-topics
(let [topics {"foo" topic-foo}]
Expand Down Expand Up @@ -61,27 +62,27 @@
:reset-fn (fn [reset-args rt args]
(reset! reset-args [rt args])
0)}
(fn [{:keys [resetter reset-args error-data]}]
(is (instance? kafka.tools.StreamsResetter resetter))
(is (= ["--application-id" "yolo"
"--bootstrap-servers" "kafka.test:9092"
"--foo" "foo"
"--bar" "bar"]
reset-args))
(is (empty? error-data)))))
(fn [{:keys [resetter reset-args error-data]}]
(is (instance? kafka.tools.StreamsResetter resetter))
(is (= ["--application-id" "yolo"
"--bootstrap-servers" "kafka.test:9092"
"--foo" "foo"
"--bar" "bar"]
reset-args))
(is (empty? error-data)))))

(deftest test-reset-application-fixture-failure
(test-resetter {:app-config {"application.id" "yolo"
"bootstrap.servers" "kafka.test:9092"}
"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")}
:reset-params ["--foo" "foo"
"--bar" "bar"]
:reset-fn (fn [reset-args rt args]
(reset! reset-args [rt args])
(.write *err* "helpful error message\n")
(.write *out* "essential application info\n")
1)}
(fn [{:keys [resetter error-data]}]
(is (instance? kafka.tools.StreamsResetter resetter))
(is (= 1 (:status error-data)))
(is (= "helpful error message\n" (:err error-data)))
(is (= "essential application info\n" (:out error-data))))))
(fn [{:keys [resetter error-data]}]
(is (instance? kafka.tools.StreamsResetter resetter))
(is (= 1 (:status error-data)))
(is (= "helpful error message\n" (:err error-data)))
(is (= "essential application info\n" (:out error-data))))))
5 changes: 3 additions & 2 deletions test/jackdaw/test/transports/kafka_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
[jackdaw.test.journal :refer [watch-for]]
[jackdaw.test.serde :as serde]
[jackdaw.test.transports.kafka]
[jackdaw.utils :as utils]
[manifold.stream :as s]))

(set! *warn-on-reflection* false)

(def kafka-config {"bootstrap.servers" "localhost:9092"
(def kafka-config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"group.id" "kafka-write-test"})

(defn kstream-config
[app app-id]
{:topology app
:config {"bootstrap.servers" "localhost:9092"
:config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"application.id" app-id}})

(defn echo-stream
Expand Down
11 changes: 6 additions & 5 deletions test/jackdaw/test/transports/mock_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
[jackdaw.test :as jd.test]
[jackdaw.test.transports :as trns]
[jackdaw.test.serde :as serde]
[jackdaw.utils :as utils]
[manifold.stream :as s])
(:import
(java.util Properties)
(org.apache.kafka.streams TopologyTestDriver Topology)))
(java.util Properties)
(org.apache.kafka.streams TopologyTestDriver Topology)))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -46,8 +47,8 @@
[f app-config]
(let [builder (k/streams-builder)
^Topology topology (let [builder (f builder)]
(-> (k/streams-builder* builder)
(.build)))
(-> (k/streams-builder* builder)
(.build)))
^Properties props (let [props (Properties.)]
(doseq [[k v] app-config]
(.setProperty props k v))
Expand All @@ -58,7 +59,7 @@
[]
(trns/transport {:type :mock
:driver (test-driver (echo-stream test-in test-out)
{"bootstrap.servers" "localhost:9092"
{"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"application.id" "test-echo-stream"})
:topics {"test-in" test-in
"test-out" test-out}}))
Expand Down
9 changes: 5 additions & 4 deletions test/jackdaw/test/transports/rest_proxy_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
[jackdaw.test.journal :refer [watch-for]]
[jackdaw.test.transports :as trns]
[jackdaw.test.transports.rest-proxy :as proxy]
[jackdaw.utils :as utils]
[manifold.stream :as s]
[manifold.deferred :as d]))

(set! *warn-on-reflection* false)

(def kafka-config {"bootstrap.servers" "localhost:9092"
(def kafka-config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")
"group.id" "kafka-write-test"})

(def +real-rest-proxy-url+
"http://localhost:8082")
"http://kafka-rest:8082")

(defn rest-proxy-config
[group-id]
Expand All @@ -30,7 +31,7 @@
(defn kstream-config
[app app-id]
{:topology app
:config {"bootstrap.servers" "localhost:9092"
:config {"bootstrap.servers" "kafka:9092"
"application.id" app-id}})

(defn echo-stream
Expand Down Expand Up @@ -173,7 +174,7 @@
:consumer.fetch.timeout.ms 200})))
(proxy/with-consumer))
[url options] (first @http-reqs)]
(is (= "http://localhost:8082/consumers/test-group-config" url))
(is (= "http://kafka-rest:8082/consumers/test-group-config" url))
(is (= {"Accept" "application/vnd.kafka.v2+json"
"Content-Type" "application/vnd.kafka.v2+json"}
(:headers options)))
Expand Down
Loading

0 comments on commit 46a9fb2

Please sign in to comment.