diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml new file mode 100644 index 00000000..9ebf3a43 --- /dev/null +++ b/.github/workflows/pipeline.yaml @@ -0,0 +1,69 @@ +run-name: Jackdaw build + +on: + push: + branches-ignore: + - master + - main + tags: + - '[0-9]+.[0-9]+.[0-9]' + +jobs: + Explore-GitHub-Actions: + runs-on: ubuntu-latest + container: clojure:latest + environment: test + services: + kafka: + image: confluentinc/cp-kafka:5.1.3 + env: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: true + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + schema-registry: + image: confluentinc/cp-schema-registry:5.1.2 + env: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' + kafka-rest: + image: confluentinc/cp-kafka-rest:6.1.1 + env: + KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_REST_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_REST_LISTENERS: http://0.0.0.0:8082 + KAFKA_REST_SCHEMA_REGISTRY: http://schema-registry:8081 + KAFKA_REST_HOST_NAME: kafka-rest + zookeeper: + image: confluentinc/cp-zookeeper:5.1.3 + env: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + steps: + - name: checkout + uses: actions/checkout@v4 + with: + fetch-tags: true + fetch-depth: 100 + - name: build + run: | + git config --global --add safe.directory /__w/jackdaw/jackdaw + lein kaocha + env: + KAFKA_BOOTSTRAP_SERVERS: kafka + ZOOKEEPER_HOST: zoopkeeper + SCHEMA_REGISTRY_HOST: schema-registry + KAFKA_REST_PROXY_HOST: kafka-rest + - name: lint + run: lein clj-kondo --lint src test || true + - name: deploy + env: + CLOJARS_USERNAME: ${{ secrets.CLOJARS_USERNAME }} + CLOJARS_PASSWORD: ${{ secrets.CLOJARS_PASSWORD }} + run: lein do jar, pom, deploy clojars + diff --git a/project.clj b/project.clj index 82051c84..c0f5e2b0 100644 --- a/project.clj +++ b/project.clj @@ -1,6 +1,8 @@ (defproject fundingcircle/jackdaw "_" :description "A Clojure library for the Apache Kafka distributed streaming platform." + :license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"} + :scm {:name "git" :url "https://github.com/fundingcircle/jackdaw"} :url "https://github.com/FundingCircle/jackdaw/" @@ -33,7 +35,8 @@ :aliases {"kaocha" ["run" "-m" "kaocha.runner"]} :aot [jackdaw.serdes.edn2 jackdaw.serdes.fressian jackdaw.serdes.fn-impl] - :plugins [[me.arrdem/lein-git-version "2.0.8"]] + :plugins [[me.arrdem/lein-git-version "2.0.8"] + [com.github.clj-kondo/lein-clj-kondo "0.2.5"]] :git-version {:status-to-version diff --git a/test/jackdaw/client/partitioning_test.clj b/test/jackdaw/client/partitioning_test.clj index 166335cb..a4f2065f 100644 --- a/test/jackdaw/client/partitioning_test.clj +++ b/test/jackdaw/client/partitioning_test.clj @@ -2,7 +2,8 @@ (:require [clojure.test :refer [deftest is testing]] [jackdaw.client :as client] - [jackdaw.client.partitioning :as part])) + [jackdaw.client.partitioning :as part] + [jackdaw.utils :as utils])) (set! *warn-on-reflection* false) @@ -22,7 +23,7 @@ (deftest test->ProducerRecord - (with-open [p (client/producer {"bootstrap.servers" "localhost:9092" + (with-open [p (client/producer {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})] (testing "absent key-fn" diff --git a/test/jackdaw/client_test.clj b/test/jackdaw/client_test.clj index 9e2eee14..20b3f67f 100644 --- a/test/jackdaw/client_test.clj +++ b/test/jackdaw/client_test.clj @@ -4,6 +4,7 @@ [jackdaw.client :as client] [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] + [jackdaw.utils :as utils] [jackdaw.data :as data]) (:import [java.util.concurrent LinkedBlockingQueue TimeUnit] java.time.Duration @@ -40,7 +41,7 @@ "high-partition-topic" high-partition-topic}) (defn broker-config [] - {"bootstrap.servers" "localhost:9092"}) + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")}) (defn producer-config [] (-> (broker-config) @@ -159,7 +160,7 @@ (deftest ^:integration consumer-test (let [config {"group.id" "jackdaw-client-test-consumer-test" - "bootstrap.servers" "localhost:9092" + "bootstrap.servers" "kafka:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"} key-serde (:key-serde foo-topic) diff --git a/test/jackdaw/serdes/avro/integration_test.clj b/test/jackdaw/serdes/avro/integration_test.clj index 1349777f..1acd7293 100644 --- a/test/jackdaw/serdes/avro/integration_test.clj +++ b/test/jackdaw/serdes/avro/integration_test.clj @@ -8,13 +8,14 @@ [jackdaw.data :as jd] [jackdaw.serdes.avro :as avro] [jackdaw.serdes.avro.schema-registry :as reg] - [jackdaw.test.fixtures :as fix]) + [jackdaw.test.fixtures :as fix] + [jackdaw.utils :as utils]) (:import [org.apache.kafka.common.serialization Serde Serdes])) (set! *warn-on-reflection* false) (def +real-schema-registry-url+ - "http://localhost:8081") + (format "http://%s:8081" (utils/schema-registry-host))) (def +type-registry+ (merge avro/+base-schema-type-registry+ @@ -65,8 +66,8 @@ "A Kafka consumer or streams config." (let [id (str "dev-" (java.util.UUID/randomUUID))] {"replication.factor" "1", "group.id" id, "application.id" id, - "bootstrap.servers" "localhost:9092" - "zookeeper.connect" "localhost:2181" + "bootstrap.servers" (str (utils/bootstrap-servers) ":9092") + "zookeeper.connect" (str (utils/zookeeper-host) ":2181") "request.timeout.ms" "1000"})) ;;;; Schemas diff --git a/test/jackdaw/serdes/avro_test.clj b/test/jackdaw/serdes/avro_test.clj index 7f4d4a2b..61a26dbc 100644 --- a/test/jackdaw/serdes/avro_test.clj +++ b/test/jackdaw/serdes/avro_test.clj @@ -4,7 +4,8 @@ [clojure.data :refer [diff]] [clojure.data.json :as json] [jackdaw.serdes.avro :as avro] - [jackdaw.serdes.avro.schema-registry :as reg]) + [jackdaw.serdes.avro.schema-registry :as reg] + [jackdaw.utils :as utils]) (:import [java.nio ByteBuffer] [java.util Collection] [org.apache.avro Schema$Parser Schema] @@ -46,7 +47,7 @@ serde-config) schema-registry-config {:avro.schema-registry/client registry-client - :avro.schema-registry/url "localhost:8081"}] + :avro.schema-registry/url (str (utils/schema-registry-host) ":8081")}] (avro/serde +registry+ schema-registry-config serde-config)))) (defn deserialize [serde topic x] diff --git a/test/jackdaw/serdes/json_schema/confluent_test.clj b/test/jackdaw/serdes/json_schema/confluent_test.clj index 1eb6e9e1..410c8503 100644 --- a/test/jackdaw/serdes/json_schema/confluent_test.clj +++ b/test/jackdaw/serdes/json_schema/confluent_test.clj @@ -1,6 +1,7 @@ (ns jackdaw.serdes.json-schema.confluent-test (:require [jackdaw.serdes.json-schema.confluent :as jsco] [jackdaw.serdes.avro.schema-registry :as reg] + [jackdaw.utils :as utils] [clojure.data.json :as json] [clojure.test :refer [deftest is testing] :as test])) @@ -15,7 +16,7 @@ {"json.fail.invalid.schema" false} :serializer-properties {"json.fail.invalid.schema" false}}))) - schema-registry-url "localhost:8081" + schema-registry-url (str (utils/schema-registry-host) ":8081") key? false] (jsco/serde schema-registry-url schema-str key? serde-config)))) diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index 51eae7d1..d4ac8119 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -40,7 +40,7 @@ :key-serde :long :value-serde :json})) -(def kafka-config {"bootstrap.servers" "localhost:9092" +(def kafka-config {"bootstrap.servers" "kafka:9092" "group.id" "kafka-write-test"}) (defn with-transport diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 5825623d..72160dd5 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -1,7 +1,8 @@ (ns jackdaw.test.fixtures-test (:require [clojure.test :refer [deftest is]] - [jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]]) + [jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]] + [jackdaw.utils :as utils]) (:import (org.apache.kafka.clients.admin AdminClient))) @@ -14,7 +15,7 @@ :config {}}) (def kafka-config - {"bootstrap.servers" "localhost:9092"}) + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092")}) (def test-topics (let [topics {"foo" topic-foo}] @@ -61,18 +62,18 @@ :reset-fn (fn [reset-args rt args] (reset! reset-args [rt args]) 0)} - (fn [{:keys [resetter reset-args error-data]}] - (is (instance? kafka.tools.StreamsResetter resetter)) - (is (= ["--application-id" "yolo" - "--bootstrap-servers" "kafka.test:9092" - "--foo" "foo" - "--bar" "bar"] - reset-args)) - (is (empty? error-data))))) + (fn [{:keys [resetter reset-args error-data]}] + (is (instance? kafka.tools.StreamsResetter resetter)) + (is (= ["--application-id" "yolo" + "--bootstrap-servers" "kafka.test:9092" + "--foo" "foo" + "--bar" "bar"] + reset-args)) + (is (empty? error-data))))) (deftest test-reset-application-fixture-failure (test-resetter {:app-config {"application.id" "yolo" - "bootstrap.servers" "kafka.test:9092"} + "bootstrap.servers" (str (utils/bootstrap-servers) ":9092")} :reset-params ["--foo" "foo" "--bar" "bar"] :reset-fn (fn [reset-args rt args] @@ -80,8 +81,8 @@ (.write *err* "helpful error message\n") (.write *out* "essential application info\n") 1)} - (fn [{:keys [resetter error-data]}] - (is (instance? kafka.tools.StreamsResetter resetter)) - (is (= 1 (:status error-data))) - (is (= "helpful error message\n" (:err error-data))) - (is (= "essential application info\n" (:out error-data)))))) + (fn [{:keys [resetter error-data]}] + (is (instance? kafka.tools.StreamsResetter resetter)) + (is (= 1 (:status error-data))) + (is (= "helpful error message\n" (:err error-data))) + (is (= "essential application info\n" (:out error-data)))))) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index aa100db5..bd08d572 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -8,17 +8,18 @@ [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.serde :as serde] [jackdaw.test.transports.kafka] + [jackdaw.utils :as utils] [manifold.stream :as s])) (set! *warn-on-reflection* false) -(def kafka-config {"bootstrap.servers" "localhost:9092" +(def kafka-config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "group.id" "kafka-write-test"}) (defn kstream-config [app app-id] {:topology app - :config {"bootstrap.servers" "localhost:9092" + :config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" app-id}}) (defn echo-stream diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 92022127..af15f194 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -7,10 +7,11 @@ [jackdaw.test :as jd.test] [jackdaw.test.transports :as trns] [jackdaw.test.serde :as serde] + [jackdaw.utils :as utils] [manifold.stream :as s]) (:import - (java.util Properties) - (org.apache.kafka.streams TopologyTestDriver Topology))) + (java.util Properties) + (org.apache.kafka.streams TopologyTestDriver Topology))) (set! *warn-on-reflection* false) @@ -46,8 +47,8 @@ [f app-config] (let [builder (k/streams-builder) ^Topology topology (let [builder (f builder)] - (-> (k/streams-builder* builder) - (.build))) + (-> (k/streams-builder* builder) + (.build))) ^Properties props (let [props (Properties.)] (doseq [[k v] app-config] (.setProperty props k v)) @@ -58,7 +59,7 @@ [] (trns/transport {:type :mock :driver (test-driver (echo-stream test-in test-out) - {"bootstrap.servers" "localhost:9092" + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" "test-echo-stream"}) :topics {"test-in" test-in "test-out" test-out}})) diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index fd0b916a..a8d48060 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -11,16 +11,17 @@ [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.transports :as trns] [jackdaw.test.transports.rest-proxy :as proxy] + [jackdaw.utils :as utils] [manifold.stream :as s] [manifold.deferred :as d])) (set! *warn-on-reflection* false) -(def kafka-config {"bootstrap.servers" "localhost:9092" +(def kafka-config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "group.id" "kafka-write-test"}) (def +real-rest-proxy-url+ - "http://localhost:8082") + "http://kafka-rest:8082") (defn rest-proxy-config [group-id] @@ -30,7 +31,7 @@ (defn kstream-config [app app-id] {:topology app - :config {"bootstrap.servers" "localhost:9092" + :config {"bootstrap.servers" "kafka:9092" "application.id" app-id}}) (defn echo-stream @@ -173,7 +174,7 @@ :consumer.fetch.timeout.ms 200}))) (proxy/with-consumer)) [url options] (first @http-reqs)] - (is (= "http://localhost:8082/consumers/test-group-config" url)) + (is (= "http://kafka-rest:8082/consumers/test-group-config" url)) (is (= {"Accept" "application/vnd.kafka.v2+json" "Content-Type" "application/vnd.kafka.v2+json"} (:headers options))) diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index b188b145..737ca6ad 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -7,7 +7,8 @@ [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] [jackdaw.test.transports :as trns] - [jackdaw.test.middleware :refer [with-status]])) + [jackdaw.test.middleware :refer [with-status]] + [jackdaw.utils :as utils])) (set! *warn-on-reflection* false) @@ -32,7 +33,7 @@ :key-serde :string :value-serde :json})) -(def kafka-config {"bootstrap.servers" "localhost:9092" +(def kafka-config {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "group.id" "kafka-write-test"}) (defn kafka-transport @@ -234,7 +235,7 @@ (try (jd.test/with-test-machine (trns/transport {:type :mock :driver (jd.test/mock-test-driver (echo-stream test-in test-out) - {"bootstrap.servers" "localhost:9092" + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" "test-echo-stream"}) :topics {:in test-in :out test-out}}) @@ -258,7 +259,7 @@ (try (jd.test/with-test-machine (trns/transport {:type :mock :driver (jd.test/mock-test-driver (bad-topology test-in test-out) - {"bootstrap.servers" "localhost:9092" + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" "test-echo-stream"}) :topics {:in test-in :out test-out}}) @@ -280,7 +281,7 @@ (try (jd.test/with-test-machine (trns/transport {:type :mock :driver (jd.test/mock-test-driver (echo-stream test-in test-out) - {"bootstrap.servers" "localhost:9092" + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" "test-echo-stream"}) :topics {:in test-in :out test-out}}) @@ -302,7 +303,7 @@ (try (jd.test/with-test-machine (trns/transport {:type :mock :driver (jd.test/mock-test-driver (echo-stream test-in test-out) - {"bootstrap.servers" "localhost:9092" + {"bootstrap.servers" (str (utils/bootstrap-servers) ":9092") "application.id" "test-echo-stream"}) :topics {:in test-in :out test-out}}) diff --git a/test/jackdaw/utils.clj b/test/jackdaw/utils.clj new file mode 100644 index 00000000..1bef7fe9 --- /dev/null +++ b/test/jackdaw/utils.clj @@ -0,0 +1,17 @@ +(ns jackdaw.utils) + +(defn bootstrap-servers + [] + (or (System/getenv "KAFKA_BOOTSTRAP_SERVERS") "localhost")) + +(defn zookeeper-host + [] + (or (System/getenv "ZOOKEEPER_HOST") "localhost")) + +(defn schema-registry-host + [] + (or (System/getenv "SCHEMA_REGISTRY_HOST") "localhost")) + +(defn kafka-rest-proxy-host + [] + (or (System/getenv "KAFKA_REST_PROXY_HOST") "localhost")) \ No newline at end of file