diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 0000000..3dcd971 --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1 @@ +{:lint-as {threatgrid.integration-test.kafka/test-with-kafka clojure.core/let}} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8852409..3d2751e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,11 +18,24 @@ jobs: distribution: 'temurin' cache: maven + - name: Set up Clojure + uses: DeLaGuardo/setup-clojure@12.1 + with: + cli: latest + - name: Build run: | mvn -Drevision=${{ github.ref_name }} --batch-mode --update-snapshots verify mv ./target/kafka-connect-plugins-${{ github.ref_name }}-jar-with-dependencies.jar ./kafka-connect-plugins-${{ github.ref_name }}.jar + - name: Set up Docker environment + run: | + docker compose up --wait --no-color --quiet-pull + + - name: Test + run: | + clojure -X:test :dirs '["src/test/clojure"]' + - name: Release uses: softprops/action-gh-release@v1 if: startsWith(github.ref, 'refs/tags/') diff --git a/.gitignore b/.gitignore index 21fadb4..29c9455 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ /.clj-kondo/.cache/ /.lsp/.cache/ +/.cpcache/ +/.nrepl-port diff --git a/bb.edn b/bb.edn index 0132b9e..097cba8 100644 --- a/bb.edn +++ b/bb.edn @@ -1,3 +1,3 @@ -{:paths ["bb_src"] +{:paths ["src/main/bb"] :deps {org.babashka/http-client {:mvn/version "0.3.11"} progrock/progrock {:mvn/version "0.1.2"}}} diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..6bdbecb --- /dev/null +++ b/deps.edn @@ -0,0 +1,8 @@ +{:paths ["src/test/clojure"] + :deps {org.apache.kafka/kafka-clients {:mvn/version "3.6.0"} + hato/hato {:mvn/version "0.9.0"} + metosin/jsonista {:mvn/version "0.3.8"}} + :aliases {:test {:extra-deps {io.github.cognitect-labs/test-runner + {:git/tag "v0.5.1" :git/sha "dfb30dd"}} + :main-opts ["-m" "cognitect.test-runner"] + :exec-fn cognitect.test-runner.api/test}}} diff --git a/docker-compose.yml b/docker-compose.yml index f8ac79a..665a856 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,11 +48,13 @@ services: depends_on: zookeeper: condition: service_healthy + ports: + - 11091:11091 environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper.local:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091,EXTERNAL://localhost:11091 KAFKA_DEFAULT_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 @@ -69,6 +71,8 @@ services: depends_on: broker: condition: service_healthy + ports: + - 8083:8083 volumes: - ./target/kafka-connect-plugins-1.0-jar-with-dependencies.jar:/usr/share/java/kafka-connect-plugins/kafka-connect-plugins-1.0-jar-with-dependencies.jar environment: @@ -104,12 +108,14 @@ services: akhq: image: tchiotludo/akhq + profiles: + - dev networks: kafka: aliases: - akhq.local healthcheck: - test: echo srvr | nc akhq.local 8080 || exit 1 + test: curl --fail --silent http://akhq.local:8080/ --output /dev/null || exit 1 retries: 20 interval: 10s environment: diff --git a/bb_src/elastic/dispose.clj b/src/main/bb/threatgrid/elastic/dispose.clj similarity index 87% rename from bb_src/elastic/dispose.clj rename to src/main/bb/threatgrid/elastic/dispose.clj index 76e777a..bc1fb3d 100644 --- a/bb_src/elastic/dispose.clj +++ b/src/main/bb/threatgrid/elastic/dispose.clj @@ -1,4 +1,4 @@ -(ns elastic.dispose +(ns threatgrid.elastic.dispose (:require [babashka.http-client :as http])) diff --git a/bb_src/elastic/setup.clj b/src/main/bb/threatgrid/elastic/setup.clj similarity index 98% rename from bb_src/elastic/setup.clj rename to src/main/bb/threatgrid/elastic/setup.clj index 9f037a1..412c9e9 100644 --- a/bb_src/elastic/setup.clj +++ b/src/main/bb/threatgrid/elastic/setup.clj @@ -1,4 +1,4 @@ -(ns elastic.setup +(ns threatgrid.elastic.setup (:require [babashka.http-client :as http] [cheshire.core :as json] diff --git a/src/test/clojure/threatgrid/integration_test/core_test.clj b/src/test/clojure/threatgrid/integration_test/core_test.clj new file mode 100644 index 0000000..6df3fb6 --- /dev/null +++ b/src/test/clojure/threatgrid/integration_test/core_test.clj @@ -0,0 +1,57 @@ +(ns threatgrid.integration-test.core-test + (:require + [clojure.test :refer [deftest is]] + [threatgrid.integration-test.elastic :as elastic] + [threatgrid.integration-test.kafka :as kafka] + [threatgrid.integration-test.kafka-connect :as kafka-connect]) + (:import + java.time.Duration + java.time.format.DateTimeFormatter + java.time.LocalDateTime)) + +(def formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss")) + +(defn now [] + (let [date (LocalDateTime/now)] + (.format date formatter))) + +(defmacro integration-test [{:keys [kafka documents]} f] + (let [topic (str (random-uuid)) + index (str (random-uuid)) + connector (str (random-uuid))] + `(do + (println "Testing:") + (println (format " Topic: %s" ~topic)) + (println (format " ES index: %s" ~index)) + (println (format " Connector: %s" ~connector)) + (let [ka# (kafka/admin-client ~kafka) + kc# (kafka/consumer (merge ~kafka + {"group.id" (str (random-uuid)) + "auto.offset.reset" "earliest"}))] + (try + (kafka/create-topic ka# ~topic) + (elastic/create-index ~index ~documents) + (kafka-connect/start-connector ~connector ~index ~topic) + (.subscribe kc# (re-pattern ~topic)) + (Thread/sleep 5000) + (~f kc#) + (finally + (kafka-connect/stop-connector ~connector) + (elastic/delete-index ~index) + (kafka/delete-topic ka# ~topic))))))) + +(deftest all-in-test + (let [documents (for [id (repeatedly 10 random-uuid) + :let [doc {:id (str id) + (keyword "@timestamp") (now) + :field1 "a" + :field2 "b" + :field3 "c"}]] + doc)] + (integration-test {:kafka {"bootstrap.servers" "localhost:11091"} + :documents documents} + (fn [consumer] + (let [recs (seq (.poll consumer (Duration/ofSeconds 10)))] + (is (= 10 (count recs))) + (is (= (into #{} documents) + (into #{} (map kafka/cr->map) recs)))))))) diff --git a/src/test/clojure/threatgrid/integration_test/elastic.clj b/src/test/clojure/threatgrid/integration_test/elastic.clj new file mode 100644 index 0000000..0a898be --- /dev/null +++ b/src/test/clojure/threatgrid/integration_test/elastic.clj @@ -0,0 +1,24 @@ +(ns threatgrid.integration-test.elastic + (:require + [hato.client :as http] + [jsonista.core :as json])) + +(defn create-index [name documents] + (http/put (format "http://elastic:elastic@localhost:9200/%s" name) + {:body (json/write-value-as-string + {:settings {:number_of_shards 1} + :mappings {:properties {:id {:type "keyword"} + "@timestamp" {:type "date" + :format "yyyy-MM-dd HH:mm:ss"} + :field1 {:type "text"} + :field2 {:type "text"} + :field3 {:type "text"}}}}) + :content-type :json}) + (doseq [doc documents] + (http/post (format "http://elastic:elastic@localhost:9200/%s/_doc/" name) + {:body (json/write-value-as-string + doc) + :content-type :json}))) + +(defn delete-index [name] + (http/delete (format "http://elastic:elastic@localhost:9200/%s" name))) diff --git a/src/test/clojure/threatgrid/integration_test/kafka.clj b/src/test/clojure/threatgrid/integration_test/kafka.clj new file mode 100644 index 0000000..90fba8c --- /dev/null +++ b/src/test/clojure/threatgrid/integration_test/kafka.clj @@ -0,0 +1,71 @@ +(ns threatgrid.integration-test.kafka + (:require + [jsonista.core :as json]) + (:import + [java.util Optional Properties] + [org.apache.kafka.clients.admin Admin AdminClient NewTopic] + [org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer] + [org.apache.kafka.common + TopicCollection$TopicNameCollection] + [org.apache.kafka.common.header Headers] + [org.apache.kafka.common.serialization + Deserializer + Serde + Serdes$StringSerde + Serializer])) + +(set! *warn-on-reflection* true) + +(defn admin-client [properties] + (let [props (doto (Properties.) + (.putAll properties))] + (AdminClient/create props))) + +(defn create-topic [^Admin admin ^String topic] + (-> admin + (.createTopics [(NewTopic. topic (Optional/empty) (Optional/empty))]) + (.all) + (.get))) + +(defn delete-topic [^Admin admin ^String topic] + (-> admin + (.deleteTopics (TopicCollection$TopicNameCollection/ofTopicNames [topic])) + (.all) + (.get))) + +(defn serde ^Serde [clj->bytes bytes->clj] + (reify Serde + (serializer [_] + (reify Serializer + (serialize [_this _topic data] + (clj->bytes data)) + (serialize [_this _topic _headers data] + (clj->bytes data)))) + (deserializer [_] + (reify Deserializer + (deserialize [_this _topic data] + (bytes->clj data)) + (deserialize [_this ^String _topic ^Headers _headers ^bytes data] + (bytes->clj data)))))) + +(defn json-serde + "JSON serializer/deserializer configurable through jsonista/object-mapper." + (^Serde [] (json-serde (json/object-mapper {:encode-key-fn true :decode-key-fn true}))) + (^Serde [object-mapper] + (serde + (fn [obj] + (json/write-value-as-bytes obj object-mapper)) + (fn [^bytes bs] + (json/read-value bs object-mapper))))) + +(defn consumer [properties] + (let [^Serde key-serde (Serdes$StringSerde.) + value-serde (json-serde) + ^Deserializer key-deserializer (.deserializer key-serde) + ^Deserializer value-deserializer (.deserializer value-serde)] + (KafkaConsumer. (doto (Properties.) (.putAll properties)) + key-deserializer value-deserializer))) + +(defn cr->map [^ConsumerRecord cr] + (let [value (.value cr)] + value)) diff --git a/src/test/clojure/threatgrid/integration_test/kafka_connect.clj b/src/test/clojure/threatgrid/integration_test/kafka_connect.clj new file mode 100644 index 0000000..ac6ac2b --- /dev/null +++ b/src/test/clojure/threatgrid/integration_test/kafka_connect.clj @@ -0,0 +1,37 @@ +(ns threatgrid.integration-test.kafka-connect + (:require + [hato.client :as http] + [jsonista.core :as json])) + +(defn start-connector [name index topic] + (http/post "http://localhost:8083/connectors" + {:body (json/write-value-as-string + {:name name + :config {"connector.class" "threatgrid.kafka.connect.ElasticsearchSourceConnector" + "es.compatibility" true + "es.host" "elasticsearch.local" + "es.password" "elastic" + "es.port" 9200 + "es.scheme" "http" + "es.user" "elastic" + "index" index + "key.converter" "org.apache.kafka.connect.storage.StringConverter" + "query" "{\"match_all\": {}}" + "sort" "[{\"@timestamp\": {\"order\": \"asc\"}}, \"id\"]" + "topic" topic + "value.converter" "org.apache.kafka.connect.storage.StringConverter"}}) + :content-type :json})) + +(defn stop-connector [name] + (http/delete (format "http://localhost:8083/connectors/%s/" name))) + +(defn running? [x] + (= "RUNNING" (get-in x ["connector" "state"]))) + +(defn check-status [name] + (try + (-> (http/get (format "http://localhost:8083/connectors/%s/status" name)) + (:body) + (json/read-value) + (running?)) + (catch Exception _)))