diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/BranchedStream.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/BranchedStream.kt index 7a6db2ce..11d3cc98 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/BranchedStream.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/BranchedStream.kt @@ -14,7 +14,7 @@ class BranchedKStream internal constructor( fun branch( predicate: (T) -> Boolean, - consumed: (ConsumedStream) -> Unit, + consumed: ConsumedStream.() -> Unit, ): BranchedKStream { val namedBranch = "-branch-$nextBranchNumber" val internalPredicate = internalPredicate(predicate) @@ -23,7 +23,7 @@ class BranchedKStream internal constructor( return this } - fun default(consumed: (ConsumedStream) -> Unit) { + fun default(consumed: ConsumedStream.() -> Unit) { val namedBranch = "-branch-default" val internalBranch = internalBranch(consumed, namedBranch) { "via$namedBranch-${namedSupplier()}" } stream.defaultBranch(internalBranch) @@ -49,7 +49,7 @@ class BranchedMappedKStream internal constructor( fun branch( predicate: (T) -> Boolean, - consumed: (MappedStream) -> Unit, + consumed: MappedStream.() -> Unit, ): BranchedMappedKStream { val namedBranch = "-branch-$nextBranchNumber" val internalPredicate = internalPredicate(predicate) @@ -58,7 +58,7 @@ class BranchedMappedKStream internal constructor( return this } - fun default(consumed: (MappedStream) -> Unit) { + fun default(consumed: MappedStream.() -> Unit) { val namedBranch = "-branch-default" val internalBranch = internalBranch(consumed, namedBranch) { "via$namedBranch-${namedSupplier()}" } stream.defaultBranch(internalBranch) diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/ConsumedStream.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/ConsumedStream.kt index 25348c81..dc2bafe0 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/ConsumedStream.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/ConsumedStream.kt @@ -189,7 +189,7 @@ class ConsumedStream internal constructor( return JoinedStream(topic.name, joinedStream, named) } - fun branch(predicate: (T) -> Boolean, consumed: (ConsumedStream) -> Unit): BranchedKStream { + fun branch(predicate: (T) -> Boolean, consumed: ConsumedStream.() -> Unit): BranchedKStream { val splittedStream = stream.split(Named.`as`("split-${namedSupplier()}")) return BranchedKStream(topic, splittedStream, namedSupplier).branch(predicate, consumed) } diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/JoinedStream.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/JoinedStream.kt index 164ed690..903e0ed3 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/JoinedStream.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/JoinedStream.kt @@ -51,7 +51,7 @@ class JoinedStream internal constructor( fun branch( predicate: (StreamsPair) -> Boolean, - consumed: (MappedStream>) -> Unit, + consumed: MappedStream>.() -> Unit, ): BranchedMappedKStream> { val branchedStream = stream.split(Named.`as`("split-${namedSupplier()}")) return BranchedMappedKStream(sourceTopicName, branchedStream, namedSupplier).branch(predicate, consumed) diff --git a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/MappedStream.kt b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/MappedStream.kt index cd197f9d..b89ae160 100644 --- a/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/MappedStream.kt +++ b/kafka-2/main/no/nav/aap/kafka/streams/v2/stream/MappedStream.kt @@ -63,7 +63,7 @@ class MappedStream internal constructor( return MappedStream(sourceTopicName, filteredStream, namedSupplier) } - fun branch(predicate: (T) -> Boolean, consumed: (MappedStream) -> Unit): BranchedMappedKStream { + fun branch(predicate: (T) -> Boolean, consumed: MappedStream.() -> Unit): BranchedMappedKStream { val named = Named.`as`("split-${namedSupplier()}") val branchedStream = stream.split(named) return BranchedMappedKStream(sourceTopicName, branchedStream, namedSupplier).branch(predicate, consumed) diff --git a/kafka-2/test/no/nav/aap/kafka/streams/v2/stream/BranchedStreamTest.kt b/kafka-2/test/no/nav/aap/kafka/streams/v2/stream/BranchedStreamTest.kt index eb23080a..2062d864 100644 --- a/kafka-2/test/no/nav/aap/kafka/streams/v2/stream/BranchedStreamTest.kt +++ b/kafka-2/test/no/nav/aap/kafka/streams/v2/stream/BranchedStreamTest.kt @@ -14,10 +14,10 @@ internal class BranchedStreamTest { val kafka = StreamsMock.withTopology { consume(Topics.A) .branch({ v -> v == "lol" }, { - it.produce(Topics.C) + produce(Topics.C) }) .branch({ v -> v != "lol" }, { - it.produce(Topics.B) + produce(Topics.B) }) } @@ -36,10 +36,10 @@ internal class BranchedStreamTest { val kafka = StreamsMock.withTopology { consume(Topics.A) .branch({ v -> v == "lol" }, { - it.produce(Topics.C) + produce(Topics.C) }) .default { - it.produce(Topics.B) + produce(Topics.B) } } @@ -59,10 +59,10 @@ internal class BranchedStreamTest { consume(Topics.A) .map { i -> i } .branch({ v -> v == "lol" }, { - it.produce(Topics.C) + produce(Topics.C) }) .branch({ v -> v != "lol" }, { - it.produce(Topics.B) + produce(Topics.B) }) } @@ -82,12 +82,12 @@ internal class BranchedStreamTest { consume(Topics.A) .map { i -> i } .branch({ v -> v == "lol" }, { - it - .branch({ true }) { b -> b.produce(Topics.C) } - .branch({ false }) { b -> b.produce(Topics.B) } + this + .branch({ true }) { produce(Topics.C) } + .branch({ false }) { produce(Topics.B) } }) .branch({ v -> v != "lol" }, { - it.produce(Topics.B) + produce(Topics.B) }) } @@ -107,10 +107,10 @@ internal class BranchedStreamTest { consume(Topics.A) .map { i -> i } .branch({ v -> v == "lol" }, { - it.produce(Topics.C) + produce(Topics.C) }) .default { - it.produce(Topics.B) + produce(Topics.B) } } @@ -132,13 +132,13 @@ internal class BranchedStreamTest { .joinWith(tableB) .branch({ (left, _) -> left == "lol" }, { - it.map { (left, right) -> left + right } + map { (left, right) -> left + right } .produce(Topics.C) }) .branch({ (_, right) -> right == "lol" }, { - it.map { (_, right) -> right + right } + map { (_, right) -> right + right } .produce(Topics.D) }) @@ -163,11 +163,11 @@ internal class BranchedStreamTest { consume(Topics.A) .joinWith(tableB) .branch({ (left, _) -> left == "lol" }, { - it.map { (left, right) -> left + right }.produce(Topics.C) + map { (left, right) -> left + right }.produce(Topics.C) }) .default { - it.map { (_, right) -> right + right }.produce(Topics.D) + map { (_, right) -> right + right }.produce(Topics.D) } } @@ -190,16 +190,10 @@ internal class BranchedStreamTest { consume(Topics.A) .leftJoinWith(tableB) .branch({ (left, _) -> left == "lol" }, { - - it.map { (left, right) -> left + right } - .produce(Topics.C) - + map { (left, right) -> left + right }.produce(Topics.C) }) .branch({ (_, right) -> right == "lol" }, { - - it.map { (_, right) -> right + right } - .produce(Topics.D) - + map { (_, right) -> right + right }.produce(Topics.D) }) } @@ -222,11 +216,10 @@ internal class BranchedStreamTest { consume(Topics.A) .leftJoinWith(tableB) .branch({ (left, _) -> left == "lol" }, { - it.map { (left, right) -> left + right }.produce(Topics.C) - + map { (left, right) -> left + right }.produce(Topics.C) }) .default { - it.map { (_, right) -> right + right }.produce(Topics.D) + map { (_, right) -> right + right }.produce(Topics.D) } }