Skip to content

Commit

Permalink
Merge pull request #2 from threatgrid/tests
Browse files Browse the repository at this point in the history
Add tests
  • Loading branch information
DeLaGuardo authored Oct 18, 2023
2 parents da3087f + 836bbc7 commit 62fb7c7
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 10 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
name: Release

on: push
on:
push:
tags:
- '*'

jobs:

build:
release:
runs-on: ubuntu-latest
permissions:
contents: write

steps:
- uses: actions/checkout@v3

Expand All @@ -18,14 +22,13 @@ jobs:
distribution: 'temurin'
cache: maven

- name: Build
- name: Release build
run: |
mvn -Drevision=${{ github.ref_name }} --batch-mode --update-snapshots verify
mv ./target/kafka-connect-plugins-${{ github.ref_name }}-jar-with-dependencies.jar ./kafka-connect-plugins-${{ github.ref_name }}.jar
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
files: |
kafka-connect-plugins-${{ github.ref_name }}.jar
35 changes: 35 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Test

on: push

jobs:

test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
cache: maven

- name: Set up Clojure
uses: DeLaGuardo/[email protected]
with:
cli: latest

- name: Build
run: |
mvn --batch-mode --update-snapshots verify
- name: Set up Docker environment
run: |
docker compose up --wait --no-color --quiet-pull
- name: Test
run: |
clojure -X:test :dirs '["src/test/clojure"]'
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@

/.clj-kondo/.cache/
/.lsp/.cache/
/.cpcache/
/.nrepl-port
2 changes: 1 addition & 1 deletion bb.edn
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{:paths ["bb_src"]
{:paths ["src/main/bb"]
:deps {org.babashka/http-client {:mvn/version "0.3.11"}
progrock/progrock {:mvn/version "0.1.2"}}}
8 changes: 8 additions & 0 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{:paths ["src/test/clojure"]
:deps {org.apache.kafka/kafka-clients {:mvn/version "3.6.0"}
hato/hato {:mvn/version "0.9.0"}
metosin/jsonista {:mvn/version "0.3.8"}}
:aliases {:test {:extra-deps {io.github.cognitect-labs/test-runner
{:git/tag "v0.5.1" :git/sha "dfb30dd"}}
:main-opts ["-m" "cognitect.test-runner"]
:exec-fn cognitect.test-runner.api/test}}}
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ services:
depends_on:
zookeeper:
condition: service_healthy
ports:
- 11091:11091
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper.local:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker.local:10091,EXTERNAL://localhost:11091
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Expand All @@ -69,6 +71,8 @@ services:
depends_on:
broker:
condition: service_healthy
ports:
- 8083:8083
volumes:
- ./target/kafka-connect-plugins-1.0-jar-with-dependencies.jar:/usr/share/java/kafka-connect-plugins/kafka-connect-plugins-1.0-jar-with-dependencies.jar
environment:
Expand Down Expand Up @@ -104,12 +108,14 @@ services:

akhq:
image: tchiotludo/akhq
profiles:
- dev
networks:
kafka:
aliases:
- akhq.local
healthcheck:
test: echo srvr | nc akhq.local 8080 || exit 1
test: curl --fail --silent http://akhq.local:8080/ --output /dev/null || exit 1
retries: 20
interval: 10s
environment:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns elastic.dispose
(ns threatgrid.elastic.dispose
(:require
[babashka.http-client :as http]))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns elastic.setup
(ns threatgrid.elastic.setup
(:require
[babashka.http-client :as http]
[cheshire.core :as json]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public Class<? extends Task> taskClass() {

@Override
public void start(Map<String, String> props) {
log.info("starting elastic source");
try {
configProperties = props;
new ElasticsearchSourceConnectorConfig(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public String version() {

@Override
public void start(Map<String, String> props) {

log.info("starting elastic source task");

config = new ElasticsearchSourceConnectorConfig(props);
topic = config.getString(ElasticsearchSourceConnectorConfig.TOPIC_CONF);
index = config.getString(ElasticsearchSourceConnectorConfig.INDEX_NAME_CONF);
Expand Down Expand Up @@ -156,6 +159,9 @@ public List<SourceRecord> poll() throws InterruptedException {
// will be called by connect with a different thread than poll thread
@Override
public void stop() {

log.info("stopping elastic source task");

stopping.set(true);
if (elasticConnection != null) {
elasticConnection.closeQuietly();
Expand Down
59 changes: 59 additions & 0 deletions src/test/clojure/threatgrid/integration_test/core_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
(ns threatgrid.integration-test.core-test
(:require
[clojure.test :refer [deftest is]]
[threatgrid.integration-test.elastic :as elastic]
[threatgrid.integration-test.kafka :as kafka]
[threatgrid.integration-test.kafka-connect :as kafka-connect])
(:import
java.time.Duration
java.time.format.DateTimeFormatter
java.time.LocalDateTime))

(def formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss"))

(defn now []
(let [date (LocalDateTime/now)]
(.format date formatter)))

(defmacro integration-test [{:keys [kafka documents]} f]
(let [topic (str (random-uuid))
index (str (random-uuid))
connector (str (random-uuid))]
`(do
(println "Testing:")
(println (format " Topic: %s" ~topic))
(println (format " ES index: %s" ~index))
(println (format " Connector: %s" ~connector))
(let [ka# (kafka/admin-client ~kafka)
kc# (kafka/consumer (merge ~kafka
{"group.id" (str (random-uuid))
"auto.offset.reset" "earliest"}))]
(try
(kafka/create-topic ka# ~topic)
(elastic/create-index ~index ~documents)
(kafka-connect/start-connector ~connector ~index ~topic)
(.subscribe kc# (re-pattern ~topic))
(Thread/sleep 5000)
(~f kc#)
(catch Exception e#
(println e#))
(finally
(kafka-connect/stop-connector ~connector)
(elastic/delete-index ~index)
(kafka/delete-topic ka# ~topic)))))))

(deftest all-in-test
(let [documents (for [id (repeatedly 10 random-uuid)
:let [doc {:id (str id)
(keyword "@timestamp") (now)
:field1 "a"
:field2 "b"
:field3 "c"}]]
doc)]
(integration-test {:kafka {"bootstrap.servers" "localhost:11091"}
:documents documents}
(fn [consumer]
(let [recs (seq (.poll consumer (Duration/ofSeconds 10)))]
(is (= 10 (count recs)))
(is (= (into #{} documents)
(into #{} (map kafka/cr->map) recs))))))))
24 changes: 24 additions & 0 deletions src/test/clojure/threatgrid/integration_test/elastic.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns threatgrid.integration-test.elastic
(:require
[hato.client :as http]
[jsonista.core :as json]))

(defn create-index [name documents]
(http/put (format "http://elastic:elastic@localhost:9200/%s" name)
{:body (json/write-value-as-string
{:settings {:number_of_shards 1}
:mappings {:properties {:id {:type "keyword"}
"@timestamp" {:type "date"
:format "yyyy-MM-dd HH:mm:ss"}
:field1 {:type "text"}
:field2 {:type "text"}
:field3 {:type "text"}}}})
:content-type :json})
(doseq [doc documents]
(http/post (format "http://elastic:elastic@localhost:9200/%s/_doc/" name)
{:body (json/write-value-as-string
doc)
:content-type :json})))

(defn delete-index [name]
(http/delete (format "http://elastic:elastic@localhost:9200/%s" name)))
71 changes: 71 additions & 0 deletions src/test/clojure/threatgrid/integration_test/kafka.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns threatgrid.integration-test.kafka
(:require
[jsonista.core :as json])
(:import
[java.util Optional Properties]
[org.apache.kafka.clients.admin Admin AdminClient NewTopic]
[org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer]
[org.apache.kafka.common
TopicCollection$TopicNameCollection]
[org.apache.kafka.common.header Headers]
[org.apache.kafka.common.serialization
Deserializer
Serde
Serdes$StringSerde
Serializer]))

(set! *warn-on-reflection* true)

(defn admin-client [properties]
(let [props (doto (Properties.)
(.putAll properties))]
(AdminClient/create props)))

(defn create-topic [^Admin admin ^String topic]
(-> admin
(.createTopics [(NewTopic. topic (Optional/empty) (Optional/empty))])
(.all)
(.get)))

(defn delete-topic [^Admin admin ^String topic]
(-> admin
(.deleteTopics (TopicCollection$TopicNameCollection/ofTopicNames [topic]))
(.all)
(.get)))

(defn serde ^Serde [clj->bytes bytes->clj]
(reify Serde
(serializer [_]
(reify Serializer
(serialize [_this _topic data]
(clj->bytes data))
(serialize [_this _topic _headers data]
(clj->bytes data))))
(deserializer [_]
(reify Deserializer
(deserialize [_this _topic data]
(bytes->clj data))
(deserialize [_this ^String _topic ^Headers _headers ^bytes data]
(bytes->clj data))))))

(defn json-serde
"JSON serializer/deserializer configurable through jsonista/object-mapper."
(^Serde [] (json-serde (json/object-mapper {:encode-key-fn true :decode-key-fn true})))
(^Serde [object-mapper]
(serde
(fn [obj]
(json/write-value-as-bytes obj object-mapper))
(fn [^bytes bs]
(json/read-value bs object-mapper)))))

(defn consumer [properties]
(let [^Serde key-serde (Serdes$StringSerde.)
value-serde (json-serde)
^Deserializer key-deserializer (.deserializer key-serde)
^Deserializer value-deserializer (.deserializer value-serde)]
(KafkaConsumer. (doto (Properties.) (.putAll properties))
key-deserializer value-deserializer)))

(defn cr->map [^ConsumerRecord cr]
(let [value (.value cr)]
value))
37 changes: 37 additions & 0 deletions src/test/clojure/threatgrid/integration_test/kafka_connect.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
(ns threatgrid.integration-test.kafka-connect
(:require
[hato.client :as http]
[jsonista.core :as json]))

(defn start-connector [name index topic]
(http/post "http://localhost:8083/connectors"
{:body (json/write-value-as-string
{:name name
:config {"connector.class" "threatgrid.kafka.connect.ElasticsearchSourceConnector"
"es.compatibility" true
"es.host" "elasticsearch.local"
"es.password" "elastic"
"es.port" 9200
"es.scheme" "http"
"es.user" "elastic"
"index" index
"key.converter" "org.apache.kafka.connect.storage.StringConverter"
"query" "{\"match_all\": {}}"
"sort" "[{\"@timestamp\": {\"order\": \"asc\"}}, \"id\"]"
"topic" topic
"value.converter" "org.apache.kafka.connect.storage.StringConverter"}})
:content-type :json}))

(defn stop-connector [name]
(http/delete (format "http://localhost:8083/connectors/%s/" name)))

(defn running? [x]
(= "RUNNING" (get-in x ["connector" "state"])))

(defn check-status [name]
(try
(-> (http/get (format "http://localhost:8083/connectors/%s/status" name))
(:body)
(json/read-value)
(running?))
(catch Exception _)))

0 comments on commit 62fb7c7

Please sign in to comment.