Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transducers are Coming #200

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open

Transducers are Coming #200

wants to merge 8 commits into from

Conversation

creese
Copy link
Contributor

@creese creese commented Sep 23, 2019

This has been a long time coming but I think we’re finally here. This proposal is composable with the existing Jackdaw Streams DSL. Just define your transducers and use transduce-kstream:

(defn transduce-kstream
  [kstream xf]
  "Takes a kstream and xf and transduces the stream."
  (-> kstream
      (j/transform (fn [] (transformer xf)) ["transducer"])
      (j/flat-map (fn [[_ v]] v))))

It turns out that KStream::transform followed by KStream::flatMap is equivalent to transduce with concat. We can use the latter to test our business logic with pure Clojure (no Kafka Streams). This approach was pioneered by Matthias awhile ago. The difference is now we're adding state.

Here is how to test your transducers:

(def coll
  [[nil {:debit-account "tech"
         :credit-account "cash"
         :amount 1000}]
   [nil {:debit-account "cash"
         :credit-account "sales"
         :amount 2000}]])

(->> coll
     (transduce (xf-split-entries nil nil) concat)
     (transduce (xf-running-balances (atom {}) swap!) concat))

The function xf-running-balances takes two arguments, a "store" and a function that "behaves like clojure.core/swap!" and returns a transducer. When developing your tranducers, you can use an atom and swap!.

When using your tranducers from Kafka Streams, no changes are needed. You supply different arguments. The examples show how to provide a state store and a helper function defined in jackdaw.streams.xform. However, if this doesn't work for you, you can write your own.

Here is the topology:

(require '[jackdaw.streams :as j])
(require '[jackdaw.streams.xform :as jxf])

(defn topology-builder
  [{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms]
  (fn [builder]
    (jxf/add-state-store! builder)
    (-> (j/kstream builder entry-requested)
        (jxf/transduce-kstream (::xf-split-entries xforms))
        (j/through transaction-pending)
        (jxf/transduce-kstream (::xf-running-balances xforms))
        (j/to transaction-added))
    builder))

This PR contains examples for Word Count and the Simple Ledger.

@creese creese requested a review from a team as a code owner September 23, 2019 02:44
@codecov
Copy link

codecov bot commented Sep 23, 2019

Codecov Report

❗ No coverage uploaded for pull request base (master@cee3aba). Click here to learn what that means.
The diff coverage is 15.62%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master     #200   +/-   ##
=========================================
  Coverage          ?   78.18%           
=========================================
  Files             ?       43           
  Lines             ?     2530           
  Branches          ?      151           
=========================================
  Hits              ?     1978           
  Misses            ?      401           
  Partials          ?      151
Impacted Files Coverage Δ
src/jackdaw/streams/xform.clj 11.53% <11.53%> (ø)
src/jackdaw/streams/xform/fakes.clj 33.33% <33.33%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cee3aba...9efe7b7. Read the comment docs.

@creese creese changed the title Transducers Transducers are Coming Sep 25, 2019
@kidpollo
Copy link
Contributor

Looking good! I would add examples of unit tests of the actual word count transducer. Also what happens to the simple ledger tests?

"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you want >1 stateful mapping? won't all the mappers get the same state store? We should perhaps add an airily to pass in the backing store name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this. I was planning to add this in a separate PR.

Copy link
Contributor

@cddr cddr Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that being able to configure the name of the state store should part of MVP.

Copy link
Contributor Author

@creese creese Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me think on that. Each transduce-kstream can have many stateful transducers. I need to generalize the interface.

Copy link

@vijumathew vijumathew Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think generalizing this to handle multiple transducers is good. i think that would handle the implicit coupling with add-state-store and transduce-kstream.

but i imagine there isn't a nice way around the user calling add-state-store for each transducer added. we could keep one store for all tranducer state and key them by "transducer-id" like [transducer-id k] to allow us to only need 1 store

(init [_ context]
(reset! ctx context))
(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add an airily to pass the backing store name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of think this function is the only thing that should really be in this PR (plus a test in xform_test.clj that proves it works). Everything else feels like arbitrary policy decisions that shouldn't be made at the library level.

Copy link
Contributor Author

@creese creese Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. fake-kv-store and kv-store-swap-fn are what make transducers reusable. We can test our business logic without the TopologyTestDriver and Kafka Streams. The implementation of kv-store-swap-fn allows you to treat your state stores like Clojure atoms. You don't have to use it, but if you want to, it's there. The examples show how to use all of this to solve non-trivial problems. I think we need them. I could see adding a few tests for this namespace though.

[kstream xf]
"Takes a kstream and xf and transduces the stream."
(-> kstream
(j/transform (fn [] (transformer xf)) ["transducer"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me again with the backing store name :)

(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
v (first (into [] (xf store) [[k v]]))]
(KeyValue/pair k v)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transform does not mess with the key - for clarity we should consider providing a transformer (which can affect the key) and a value-transformer which won't (i.e calls to the underlying transform and transformValues on the streams DSL).

Similarly for transduce-kstream vs transduce-kstream-values

This separation is useful when reading the high level code as it telegraphs to the reader whether a repartition may be occurring (a-la map vs mapValues)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each KStream::transform is always followed by a KStream::flatMap. The final key or keys are obtained from the resulting value of KStream::transform. We don't care about the resulting key.

Suppose the input to KStream::transform is [input-key input-value]. The output might be [input-key [[k1 v1] [k2 v2]]. KStream::flatMap discards the key from the previous step and publishes two records with keys k1 and k2.

Copy link

@vijumathew vijumathew Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. but the link between transformer and flatmap might not be clear to someone. if we decide not to replace it with .forward and .commit, i think this dependency should be documented. or transformer becomes a private function so it is clear transduce-kstream is the API to use

"Takes an instance of KeyValueStore, a function f, and map m, and
updates the store in a manner similar to `clojure.core/swap!`."
[^KeyValueStore store f m]
(let [ks (keys (f {} m))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (I think) means that the transform must create data when there is none, or else you will get no keys in this step for the subsequent reduction (i.e. it cannot be update-only). Something which should be documented so its explicit - as the result of (f {} m) is important. (An update-only transform doesn't really make sense anyway, but worth a note on the doc string IMO)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a working example?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this just be (keys m)?

"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
Copy link
Contributor

@cddr cddr Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that being able to configure the name of the state store should part of MVP.

"Takes a kstream and xf and transduces the stream."
(-> kstream
(j/transform (fn [] (transformer xf)) ["transducer"])
(j/flat-map (fn [[_ v]] v))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the flat-map necessary here? What if someone wants to keep the keys?

Copy link
Contributor Author

@creese creese Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flat map corresponds to the reducing function in transduce. In this function, we always transduce with concat. This allows you to ingest a record and transform it into zero or more records. Without this step, you wouldn't be able to implement either of the examples.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could achieve the same functionality using the forward and commit methods on ctx (which is a ProcessorContext). There's an example here. Not saying that way's better, just that there is an alternative way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to make the same point! I used ProcessorContext in the example @DaveWM linked because it felt closer to a transducible context than anything in the high-level KStreams API. ProcessorContext also gives a bit more fine-grained control over state stores.

(init [_ context]
(reset! ctx context))
(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of think this function is the only thing that should really be in this PR (plus a test in xform_test.clj that proves it works). Everything else feels like arbitrary policy decisions that shouldn't be made at the library level.


(defn transduce-kstream
[kstream xf]
"Takes a kstream and xf and transduces the stream."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a bit more information around stateful transducers here. It needs to be clear to the user that this won't work with the stateful transducers from the Clojure core lib, and that if they need a stateful transducer they need to write their own.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also docstring and args are in the wrong order

[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))

(defn kv-store-swap-fn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the -fn suffix I would have thought it would return a function, but it's actually doing the swapping from what I understand.
Maybe something like default-kv-store-swap would be more clear?
Do we need the -swap! as well if it's a write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like default-kv-store-swap would be more clear?

I like this!

Do we need the -swap! as well if it's a write?

According to the style guide:
"The names of functions/macros that are not safe in STM transactions should end with an exclamation mark (e.g. reset!)."

So yeah... I can get behind that.

@DaveWM
Copy link
Contributor

DaveWM commented Nov 12, 2019

I spoke to @blak3mill3r (the author of Noah), yesterday about how he's implemented stateful transducers in Noah. He came up with a broadly similar solution to what we have here, it seems like the use of volatile! within the transducer code is a real sticking point. The main difference between our solutions is that in Noah, all the transducers use a single state store rather than each transducer having its own. We weren't sure what the performance implications of that would be, but it's worth bearing in mind in case we run into perf issues in future.

We also discussed starting a shared library for all the core transducers re-written to support persisting their state, so that they can be used with Jackdaw and Noah. Blake's going to set this up, then I thought we could potentially pull this in in Jackdaw. There are some open questions around this though, such as what we do about other popular transducer libraries like xforms.

@blak3mill3r
Copy link

Here is that shared library which reimplements (the transducer arity of) all of the functions in clojure.core that return a stateful transducer:

https://github.com/blak3mill3r/coddled-super-centaurs

That function is then bound twice by noah to instrument the transducer state and tie it into a StateStore:

https://github.com/blak3mill3r/noah/blob/5803dd5/src/noah/transduce.clj#L34-L35
https://github.com/blak3mill3r/noah/blob/5803dd5/src/noah/transduce.clj#L85-L86

Also, @DaveWM ... I checked, and as far as I can tell, there aren't any stateful transducers in xforms or kixi.stats. They have interesting higher-order transducers and reducing fns, and (I think...) these should work fine composed with these instrumented stateful transducers.

@blak3mill3r
Copy link

Also I want to clarify regarding: "all the transducers use a single state store rather than each transducer having its own"

Each time you transduce a KStream, if that transduction needs state, you must provide a store. That transducer can of course be a composition of several transducers, any of which can be stateful, and all of the states for these composed transducers will be stored together in a clojure vector as the record in the state store. To transduce multiple KStreams, you would use multiple state stores.

"Use this namespace for interactive development.

This namespace requires libs needed to reset the app and helpers
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not* typo


(defn transduce-kstream
[kstream xf]
"Takes a kstream and xf and transduces the stream."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also docstring and args are in the wrong order

(reset! ctx context))
(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
v (first (into [] (xf store) [[k v]]))]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the goal of this line is to get an updated value for v, could instead do (first (sequence xf [[k v]]).

i could be missing what (xf store) is needed for

"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
Copy link

@vijumathew vijumathew Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think generalizing this to handle multiple transducers is good. i think that would handle the implicit coupling with add-state-store and transduce-kstream.

but i imagine there isn't a nice way around the user calling add-state-store for each transducer added. we could keep one store for all tranducer state and key them by "transducer-id" like [transducer-id k] to allow us to only need 1 store

(transform [_ k v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")
v (first (into [] (xf store) [[k v]]))]
(KeyValue/pair k v)))
Copy link

@vijumathew vijumathew Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. but the link between transformer and flatmap might not be clear to someone. if we decide not to replace it with .forward and .commit, i think this dependency should be documented. or transformer becomes a private function so it is clear transduce-kstream is the API to use

"Takes an instance of KeyValueStore, a function f, and map m, and
updates the store in a manner similar to `clojure.core/swap!`."
[^KeyValueStore store f m]
(let [ks (keys (f {} m))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this just be (keys m)?


(defn kv-store-swap-fn
"Takes an instance of KeyValueStore, a function f, and map m, and
updates the store in a manner similar to `clojure.core/swap!`."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add something that explains the shape of f? maybe like:

f is a function of 2 args that takes the old map (store) and the provided map and combines them together.

@kidpollo
Copy link
Contributor

Plz merge this already 😛 !!

@kidpollo
Copy link
Contributor

kidpollo commented May 9, 2024

Bump!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants