Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DeLaGuardo committed Oct 17, 2023
1 parent da3087f commit 3647f1b
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 6 deletions.
1 change: 1 addition & 0 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{:lint-as {threatgrid.integration-test.kafka/test-with-kafka clojure.core/let}}
13 changes: 13 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@ jobs:
distribution: 'temurin'
cache: maven

- name: Set up Clojure
uses: DeLaGuardo/[email protected]
with:
cli: latest

- name: Build
run: |
mvn -Drevision=${{ github.ref_name }} --batch-mode --update-snapshots verify
mv ./target/kafka-connect-plugins-${{ github.ref_name }}-jar-with-dependencies.jar ./kafka-connect-plugins-${{ github.ref_name }}.jar
- name: Set up Docker environment
run: |
docker compose up --wait --no-color --quiet-pull
- name: Test
run: |
clojure -X:test :dirs '["src/test/clojure"]'
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@

/.clj-kondo/.cache/
/.lsp/.cache/
/.cpcache/
/.nrepl-port
2 changes: 1 addition & 1 deletion bb.edn
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{:paths ["bb_src"]
{:paths ["src/main/bb"]
:deps {org.babashka/http-client {:mvn/version "0.3.11"}
progrock/progrock {:mvn/version "0.1.2"}}}
8 changes: 8 additions & 0 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{:paths ["src/test/clojure"]
:deps {org.apache.kafka/kafka-clients {:mvn/version "3.6.0"}
hato/hato {:mvn/version "0.9.0"}
metosin/jsonista {:mvn/version "0.3.8"}}
:aliases {:test {:extra-deps {io.github.cognitect-labs/test-runner
{:git/tag "v0.5.1" :git/sha "dfb30dd"}}
:main-opts ["-m" "cognitect.test-runner"]
:exec-fn cognitect.test-runner.api/test}}}
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ services:
depends_on:
zookeeper:
condition: service_healthy
ports:
- 11091:11091
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper.local:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091,EXTERNAL://localhost:11091
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Expand All @@ -69,6 +71,8 @@ services:
depends_on:
broker:
condition: service_healthy
ports:
- 8083:8083
volumes:
- ./target/kafka-connect-plugins-1.0-jar-with-dependencies.jar:/usr/share/java/kafka-connect-plugins/kafka-connect-plugins-1.0-jar-with-dependencies.jar
environment:
Expand Down Expand Up @@ -104,12 +108,14 @@ services:

akhq:
image: tchiotludo/akhq
profiles:
- dev
networks:
kafka:
aliases:
- akhq.local
healthcheck:
test: echo srvr | nc akhq.local 8080 || exit 1
test: curl --fail --silent http://akhq.local:8080/ --output /dev/null || exit 1
retries: 20
interval: 10s
environment:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns elastic.dispose
(ns threatgrid.elastic.dispose
(:require
[babashka.http-client :as http]))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns elastic.setup
(ns threatgrid.elastic.setup
(:require
[babashka.http-client :as http]
[cheshire.core :as json]
Expand Down
57 changes: 57 additions & 0 deletions src/test/clojure/threatgrid/integration_test/core_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
(ns threatgrid.integration-test.core-test
(:require
[clojure.test :refer [deftest is]]
[threatgrid.integration-test.elastic :as elastic]
[threatgrid.integration-test.kafka :as kafka]
[threatgrid.integration-test.kafka-connect :as kafka-connect])
(:import
java.time.Duration
java.time.format.DateTimeFormatter
java.time.LocalDateTime))

(def formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss"))

(defn now []
(let [date (LocalDateTime/now)]
(.format date formatter)))

(defmacro integration-test [{:keys [kafka documents]} f]
(let [topic (str (random-uuid))
index (str (random-uuid))
connector (str (random-uuid))]
`(do
(println "Testing:")
(println (format " Topic: %s" ~topic))
(println (format " ES index: %s" ~index))
(println (format " Connector: %s" ~connector))
(let [ka# (kafka/admin-client ~kafka)
kc# (kafka/consumer (merge ~kafka
{"group.id" (str (random-uuid))
"auto.offset.reset" "earliest"}))]
(try
(kafka/create-topic ka# ~topic)
(elastic/create-index ~index ~documents)
(kafka-connect/start-connector ~connector ~index ~topic)
(.subscribe kc# (re-pattern ~topic))
(Thread/sleep 5000)
(~f kc#)
(finally
(kafka-connect/stop-connector ~connector)
(elastic/delete-index ~index)
(kafka/delete-topic ka# ~topic)))))))

(deftest all-in-test
(let [documents (for [id (repeatedly 10 random-uuid)
:let [doc {:id (str id)
(keyword "@timestamp") (now)
:field1 "a"
:field2 "b"
:field3 "c"}]]
doc)]
(integration-test {:kafka {"bootstrap.servers" "localhost:11091"}
:documents documents}
(fn [consumer]
(let [recs (seq (.poll consumer (Duration/ofSeconds 10)))]
(is (= 10 (count recs)))
(is (= (into #{} documents)
(into #{} (map kafka/cr->map) recs))))))))
24 changes: 24 additions & 0 deletions src/test/clojure/threatgrid/integration_test/elastic.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns threatgrid.integration-test.elastic
(:require
[hato.client :as http]
[jsonista.core :as json]))

(defn create-index [name documents]
(http/put (format "http://elastic:elastic@localhost:9200/%s" name)
{:body (json/write-value-as-string
{:settings {:number_of_shards 1}
:mappings {:properties {:id {:type "keyword"}
"@timestamp" {:type "date"
:format "yyyy-MM-dd HH:mm:ss"}
:field1 {:type "text"}
:field2 {:type "text"}
:field3 {:type "text"}}}})
:content-type :json})
(doseq [doc documents]
(http/post (format "http://elastic:elastic@localhost:9200/%s/_doc/" name)
{:body (json/write-value-as-string
doc)
:content-type :json})))

(defn delete-index [name]
(http/delete (format "http://elastic:elastic@localhost:9200/%s" name)))
71 changes: 71 additions & 0 deletions src/test/clojure/threatgrid/integration_test/kafka.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns threatgrid.integration-test.kafka
(:require
[jsonista.core :as json])
(:import
[java.util Optional Properties]
[org.apache.kafka.clients.admin Admin AdminClient NewTopic]
[org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer]
[org.apache.kafka.common
TopicCollection$TopicNameCollection]
[org.apache.kafka.common.header Headers]
[org.apache.kafka.common.serialization
Deserializer
Serde
Serdes$StringSerde
Serializer]))

(set! *warn-on-reflection* true)

(defn admin-client [properties]
(let [props (doto (Properties.)
(.putAll properties))]
(AdminClient/create props)))

(defn create-topic [^Admin admin ^String topic]
(-> admin
(.createTopics [(NewTopic. topic (Optional/empty) (Optional/empty))])
(.all)
(.get)))

(defn delete-topic [^Admin admin ^String topic]
(-> admin
(.deleteTopics (TopicCollection$TopicNameCollection/ofTopicNames [topic]))
(.all)
(.get)))

(defn serde ^Serde [clj->bytes bytes->clj]
(reify Serde
(serializer [_]
(reify Serializer
(serialize [_this _topic data]
(clj->bytes data))
(serialize [_this _topic _headers data]
(clj->bytes data))))
(deserializer [_]
(reify Deserializer
(deserialize [_this _topic data]
(bytes->clj data))
(deserialize [_this ^String _topic ^Headers _headers ^bytes data]
(bytes->clj data))))))

(defn json-serde
"JSON serializer/deserializer configurable through jsonista/object-mapper."
(^Serde [] (json-serde (json/object-mapper {:encode-key-fn true :decode-key-fn true})))
(^Serde [object-mapper]
(serde
(fn [obj]
(json/write-value-as-bytes obj object-mapper))
(fn [^bytes bs]
(json/read-value bs object-mapper)))))

(defn consumer [properties]
(let [^Serde key-serde (Serdes$StringSerde.)
value-serde (json-serde)
^Deserializer key-deserializer (.deserializer key-serde)
^Deserializer value-deserializer (.deserializer value-serde)]
(KafkaConsumer. (doto (Properties.) (.putAll properties))
key-deserializer value-deserializer)))

(defn cr->map [^ConsumerRecord cr]
(let [value (.value cr)]
value))
37 changes: 37 additions & 0 deletions src/test/clojure/threatgrid/integration_test/kafka_connect.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
(ns threatgrid.integration-test.kafka-connect
(:require
[hato.client :as http]
[jsonista.core :as json]))

(defn start-connector [name index topic]
(http/post "http://localhost:8083/connectors"
{:body (json/write-value-as-string
{:name name
:config {"connector.class" "threatgrid.kafka.connect.ElasticsearchSourceConnector"
"es.compatibility" true
"es.host" "elasticsearch.local"
"es.password" "elastic"
"es.port" 9200
"es.scheme" "http"
"es.user" "elastic"
"index" index
"key.converter" "org.apache.kafka.connect.storage.StringConverter"
"query" "{\"match_all\": {}}"
"sort" "[{\"@timestamp\": {\"order\": \"asc\"}}, \"id\"]"
"topic" topic
"value.converter" "org.apache.kafka.connect.storage.StringConverter"}})
:content-type :json}))

(defn stop-connector [name]
(http/delete (format "http://localhost:8083/connectors/%s/" name)))

(defn running? [x]
(= "RUNNING" (get-in x ["connector" "state"])))

(defn check-status [name]
(try
(-> (http/get (format "http://localhost:8083/connectors/%s/status" name))
(:body)
(json/read-value)
(running?))
(catch Exception _)))

0 comments on commit 3647f1b

Please sign in to comment.