Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transducers are Coming #200

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/simple-ledger/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
log/
3 changes: 3 additions & 0 deletions examples/simple-ledger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Simple Ledger

This example creates a simple accounting ledger using transducers.
41 changes: 25 additions & 16 deletions examples/simple-ledger/deps.edn
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
{:deps
{danlentz/clj-uuid {:mvn/version "0.1.7"}
fundingcircle/jackdaw {:mvn/version "0.6.4"}
org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
org.clojure/clojure {:mvn/version "1.10.0"}
org.clojure/tools.logging {:mvn/version "0.4.1"}}
{:paths
["src" "resources"]

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}

:paths
["src" "test" "dev" "../dev"]
:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:aliases
{:test {:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}}
{:dev
{:extra-paths ["dev" "../../dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}
danlentz/clj-uuid {:mvn/version "0.1.7"
:exclusions [primitive-math]}}}

:test
{:extra-paths ["test"]
:extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
68 changes: 0 additions & 68 deletions examples/simple-ledger/dev/system.clj

This file was deleted.

90 changes: 90 additions & 0 deletions examples/simple-ledger/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
(ns user
"Use this namespace for interactive development.

This namespace requires libs needed to reset the app and helpers
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not* typo

equivalent) to clean this namespace since these tools cannot tell
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
[simple-ledger :as sl]))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
{:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"])
:topic-metadata {:entry-added
{:topic-name "entry-pending"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:transaction-pending
{:topic-name "transaction-pending"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:transaction-added
{:topic-name "transaction-added"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}}}

:topology {:topology-builder sl/topology-builder
:xforms [#'sl/split-entries #'sl/running-balances]
:swap-fn jxf/kv-store-swap-fn}

:app {:streams-config sl/streams-config
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly repl-config))


(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata))

(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}]
(let [xform-map (reduce-kv (fn [m k v]
(let [k (keyword (str (:ns (meta v)))
(str (:name (meta v))))]
(assoc m k #(v % jxf/kv-store-swap-fn))))
{}
xforms)
streams-builder (j/streams-builder)]
((topology-builder topic-metadata xform-map) streams-builder)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology streams-config)]
(j/start streams-app)
(assoc opts :streams-app streams-app)))

(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}]
(let [re (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")"))]
(re-delete-topics client-config re)))

(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}]
(j/close streams-app)
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))
1 change: 1 addition & 0 deletions examples/simple-ledger/resources/logback.xml
Loading