Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Yet another example app #312

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/streams-examples/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pom.xml
pom.xml.asc
*.jar
*.class
/lib/
/classes/
/target/
/checkouts/
/test-results/
/logs/
.lein-deps-sum
.lein-repl-history
.lein-plugins/
.lein-failures
.lein-env
.nrepl-port
.cpcache/
2 changes: 2 additions & 0 deletions examples/streams-examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Kafka Streams Jackdaw
A typical example of the use of Kafka Streams, Jackdaw, Test Machine et al
31 changes: 31 additions & 0 deletions examples/streams-examples/deps.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{:paths ["resources" "src"]

:deps {org.clojure/clojure {:mvn/version "1.10.3"}

danlentz/clj-uuid {:mvn/version "0.1.9"}
org.clojure/tools.logging {:mvn/version "1.1.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.7"}

;; Kafka Helpers
fundingcircle/topology-grapher {:mvn/version "0.1.3"
:exclusions [org.clojure/data.priority-map]}

;; Kafka
;; Explicitly bring in Kafka, rather than transitively
org.apache.kafka/kafka-clients {:mvn/version "2.8.1"}
org.apache.kafka/kafka-streams {:mvn/version "2.8.1"}
fundingcircle/jackdaw {:mvn/version "0.9.1"
:exclusions [com.fasterxml.jackson.core/jackson-annotations
com.fasterxml.jackson.core/jackson-databind
com.thoughtworks.paranamer/paranamer
joda-time/joda-time
org.apache.kafka/kafka-clients
org.apache.kafka/kafka-streams
org.apache.zookeeper/zookeeper
org.slf4j/slf4j-log4j12]}}

:aliases {:test {:extra-paths ["test"]}
:dev {:extra-paths ["test"]}}

:mvn/repos {"confluent" {:url "https://packages.confluent.io/maven/"}
"mulesoft" {:url "https://repository.mulesoft.org/nexus/content/repositories/public/"}}}
44 changes: 44 additions & 0 deletions examples/streams-examples/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
(defproject streams "0.1.0-SNAPSHOT"
:description "streams-examples"

:target-path "target/%s"
:uberjar-name "streams-examples.jar"
:main streams.main

:url "https://github.com/FundingCircle/jackdaw"

:dependencies [[org.clojure/clojure "1.10.3"]

[danlentz/clj-uuid "0.1.9"]
[org.clojure/tools.logging "1.1.0"]
[ch.qos.logback/logback-classic "1.2.7"]

;; Kafka Helpers
[fundingcircle/topology-grapher "0.1.3"
:exclusions [org.clojure/data.priority-map]]

;; Kafka
;; Explicitly bring in Kafka, rather than transitively
[org.apache.kafka/kafka-clients "2.8.1"]
[org.apache.kafka/kafka-streams "2.8.1"]
[fundingcircle/jackdaw "0.9.1"
:exclusions [com.fasterxml.jackson.core/jackson-annotations
com.fasterxml.jackson.core/jackson-databind
com.thoughtworks.paranamer/paranamer
joda-time
org.apache.kafka/kafka-clients
org.apache.kafka/kafka-streams
org.apache.zookeeper/zookeeper
org.slf4j/slf4j-log4j12]]]

:resource-paths ["resources" "resources/avro-schemas"]

:repl-options {:init-ns streams.main}

:profiles
{:dev {:resource-paths ["test/resources"]
:dependencies [[org.apache.kafka/kafka-streams-test-utils "2.8.1"]
[org.apache.kafka/kafka_2.13 "2.8.1"]]}}

:repositories
{"confluent" {:url "https://packages.confluent.io/maven/"}})
13 changes: 13 additions & 0 deletions examples/streams-examples/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<configuration scan="true" scanPeriod="5 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d %level %logger: %msg%n</pattern>
<outputPatternAsHeader>true</outputPatternAsHeader>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

40 changes: 40 additions & 0 deletions examples/streams-examples/src/streams/config.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
(ns streams.config)

(def ^:private base-event-topic-config
{:partition-count 3
:key-serde {:serde-keyword :jackdaw.serdes/string-serde}
:value-serde {:serde-keyword :jackdaw.serdes.json/serde}})

(def ^:private base-changelog-topic-config
{:partition-count 3
:key-serde {:serde-keyword :jackdaw.serdes/string-serde}
:value-serde {:serde-keyword :jackdaw.serdes.json/serde}
;; Changelog
:config {"cleanup.policy" "compact"
"segment.ms" "86400000"}})

(defn config []
{:app-name "streams"

:streams-settings
{:schema-registry-url "http://localhost:8081"
:bootstrap-servers "localhost:9092"
:commit-interval "1"
:num-stream-threads "3"
:cache-max-bytes-buffering "0"
:replication-factor "1"
:state-dir "/tmp/kafka-streams"
:sidecar-port "9090"
:sidecar-host "0.0.0.0"}

:topics
{:input (merge {:topic-name "input-1"} base-event-topic-config)
:output (merge {:topic-name "output-1"} base-event-topic-config)
:state (merge {:topic-name "state-1"} base-changelog-topic-config)}

:stores
{:state-store {:store-name "state-store-1"
:key-serde {:serde-keyword :jackdaw.serdes/string-serde}
:value-serde {:serde-keyword :jackdaw.serdes.json/serde}}}

:global-stores {}})
173 changes: 173 additions & 0 deletions examples/streams-examples/src/streams/core.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
(ns streams.core
(:require
[clojure.tools.logging :as log]
[clojure.java.io :as io]
[clojure.string :as string]
[jackdaw.serdes.resolver :as jd-resolver]
[jackdaw.streams :as js]
[streams.config :refer [config]]
[topology-grapher.describe :as td]
[topology-grapher.render :as tr])
(:import
(jackdaw.streams.interop CljStreamsBuilder)
(java.time Duration)
(java.util Properties)
(org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler)
(org.apache.kafka.streams KafkaStreams KafkaStreams$State KafkaStreams$StateListener StreamsBuilder Topology)))


;; Config

(defn load-config []
(config))

(defn props-for ^Properties [config-map]
(doto (Properties.)
(.putAll (reduce-kv (fn [m k v]
(assoc m (-> (name k)
(string/replace "-" "."))
(str v)))
{} config-map))))

;; Some Jackdaw serdes sugar

(defn- resolve-serde
[schema-reg-url schema-reg-client serde-config]
(reduce (fn [conf serdes-key]
(assoc conf serdes-key ((jd-resolver/serde-resolver
:schema-registry-url schema-reg-url
:schema-registry-client schema-reg-client)
(serdes-key serde-config))))
serde-config [:key-serde :value-serde]))

(defn reify-serdes-config
"Converts the serdes references in the topic & store configs into
actual jackdaw serdes implementations, so the config can be used
in the jackdaw API calls."
([config]
(reify-serdes-config config nil))
([config schema-reg-client]
;; If no schema-reg-client is provided, Jackdaw will construct
;; one as needed using the :streams-settings :schema-registry-url in the kafka config
;; for the app (see config.edn).
;; Typically a schema-reg-client is passed here for testing (to pass a mock)
(let [schema-reg-url (get-in config [:streams-settings :schema-registry-url])
topics (into {} (map (fn [[k t]]
[k (resolve-serde schema-reg-url schema-reg-client t)])
(:topics config)))
stores (into {} (map (fn [[k s]]
[k (resolve-serde schema-reg-url schema-reg-client s)])
(:stores config)))
global-stores (into {} (map (fn [[k v]]
(if-let [resolved-topic (get topics (:source-topic v))]
[k (assoc v :source-topic resolved-topic)]
(throw (ex-info (str "Source topic not found for global store " k) v))))
(:global-stores config)))]
(assoc config
:topics topics
:stores stores
:global-stores global-stores))))

;; Some Running-a-topology Sugar

(defn exit-on-error-handler ^StreamsUncaughtExceptionHandler []
(reify StreamsUncaughtExceptionHandler
(handle [_ ex]
(try
(future (System/exit 1))
(finally (throw ex))))))

(defn logging-error-handler ^StreamsUncaughtExceptionHandler []
(reify StreamsUncaughtExceptionHandler
(handle [_ ex]
(log/error ex "Kafka Streams error!"))))

(defn startup-listener [startup-result]
(reify KafkaStreams$StateListener
(onChange [_ new-state old-state]
(when-not (realized? startup-result)
(cond
(= KafkaStreams$State/RUNNING new-state)
(deliver startup-result :running)
(or (= KafkaStreams$State/ERROR new-state)
(= KafkaStreams$State/PENDING_SHUTDOWN new-state))
(deliver startup-result :failed)
:else nil)))))

;; Main interfaces to build and run a topology

(defn build-topology
"Given one of the various ways of building a topology object for
kafka streams, returns a built Topology."
[stream-build-fn config]
(let [configured-stream (stream-build-fn config)]
(condp instance? configured-stream
;; If we built a Topology already, e.g use of the Processor API, just return it
Topology configured-stream
;; Jackdaw Streams DSL, wrapped streams builder
CljStreamsBuilder (.build ^StreamsBuilder (js/streams-builder* configured-stream))
;; Kafka Streams DSL
StreamsBuilder (.build ^StreamsBuilder configured-stream)
;; Erk
(throw (Exception. (str "Unknown builder type: " (type configured-stream)))))))

(defn start-topology
"Given a topology and the streams config to run it, create and run a
KafkaStreams for ever and ever. Accepts optional state listener and
exception handler."
(^KafkaStreams [^Topology topology config]
(start-topology topology config nil))
(^KafkaStreams [^Topology topology config extra-setup-fn]
(let [kafka-streams (KafkaStreams. ^Topology topology
(props-for (:streams-settings config)))]
(when extra-setup-fn
(extra-setup-fn kafka-streams))
(.start kafka-streams)
(.addShutdownHook (Runtime/getRuntime)
(Thread. (let [main-thread (Thread/currentThread)]
(fn []
(.close kafka-streams (Duration/ofSeconds 60))
(.join main-thread 45000)))
"Shutdown thread"))
kafka-streams)))

;;
;; topology rendering
;;
(def topology-domain-meta {:domain "jackdaw"
:subdomain "examples"
:application "stream"})

(defn render-topology [stream-build-fn stream-config]
(let [topology [{:application-name (get-in stream-config [:streams-settings :application-id])
:topology (build-topology stream-build-fn stream-config)}]
graph (td/gen-topologies topology topology-domain-meta)]
{:topic-level (tr/render-graph (vals graph) {:fmt "pdf" :mode "topics" :cache false})
:detail (tr/render-graph (vals graph) {:fmt "pdf" :mode "detail" :cache false})}))

;; The whole app config is passed through the application here.
;; This config contains:
;; - the kafka streams configuration
;; - the topic configurations, in jackdaw format
;; - the state store configurations, in jackdaw format
;; All serdes should be reified here

(defn run-topology ^KafkaStreams [stream-build-fn stream-config]
(let [startup-result (promise)
error-handler-fn (if (true? (:hard-exit-on-error stream-config))
exit-on-error-handler
logging-error-handler)
stream (-> (build-topology stream-build-fn stream-config)
(start-topology stream-config
(fn [^KafkaStreams kafka-streams]
(doto kafka-streams
(.setUncaughtExceptionHandler
^StreamsUncaughtExceptionHandler (error-handler-fn))
(.setStateListener
(startup-listener startup-result))))))]
(let [startup-state @startup-result]
(if (= :running startup-state)
(log/info (:streams-settings stream-config) "Started topology")
(throw (ex-info "Failed to start topology :(" {:state startup-state
:config stream-config}))))
stream))
34 changes: 34 additions & 0 deletions examples/streams-examples/src/streams/main.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(ns streams.main
(:require
[clojure.tools.logging :as log]
[streams.stack-calculator :as stack-calculator]
[streams.core :as core]))

(defn execute!
[topology-config-fn stream-build-fn action-fn]
(let [stream-config (-> (core/load-config)
(assoc :hard-exit-on-error true)
(topology-config-fn)
(core/reify-serdes-config))]
(action-fn stream-build-fn stream-config)))

(defn render-basic-stream-dsl [topology-name]
(case topology-name
"stack-calculator"
(execute! stack-calculator/configure-topology
stack-calculator/build-stream
core/render-topology)
(str "Unknown topology " topology-name)))

(defn -main [& args]
(log/info {:args args} "Running application ...")
(let [base-config (assoc (core/load-config)
:hard-exit-on-error true)]
(doseq [topology-name args]
(log/info {:topology-name topology-name} "Starting topology")
(case topology-name
"stack-calculator"
(execute! stack-calculator/configure-topology
stack-calculator/build-stream
core/run-topology)
(throw (ex-info (str "Unknown topology " topology-name) {:args args}))))))
Loading