From f035ebf076dc2d9fdf967e735843b9fd1dd54705 Mon Sep 17 00:00:00 2001 From: Silas Della Contrada Date: Sat, 10 Aug 2024 15:55:14 +0200 Subject: [PATCH] feat: allow to pass a pad selector to connect for filtering and mapping source pads to different sink pads --- .../silenium/libs/flows/examples/Simple.kt | 4 +- .../dev/silenium/libs/flows/api/Extensions.kt | 7 +++- .../silenium/libs/flows/impl/FlowGraphImpl.kt | 40 +++++++++++++------ .../libs/flows/impl/FlowGraphImplTest.kt | 10 +++-- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/examples/src/main/kotlin/dev/silenium/libs/flows/examples/Simple.kt b/examples/src/main/kotlin/dev/silenium/libs/flows/examples/Simple.kt index 21e693b..d1716f4 100644 --- a/examples/src/main/kotlin/dev/silenium/libs/flows/examples/Simple.kt +++ b/examples/src/main/kotlin/dev/silenium/libs/flows/examples/Simple.kt @@ -73,8 +73,8 @@ fun main() = runBlocking { val processor = transformer(MyTransformer(), "transformer") val sink = sink(MySink(), "sink") - source.connectTo(processor).getOrThrow() - processor.connectTo(sink).getOrThrow() + connect(source to processor) + connect(processor to sink) } graph.source("source")!!.impl.run() graph.close() diff --git a/src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt b/src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt index 32ae02a..7e3570d 100644 --- a/src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt +++ b/src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt @@ -10,12 +10,15 @@ interface FlowGraphConfigScope : FlowGraph { * Creates a connection job in the [kotlinx.coroutines.CoroutineScope] of the [FlowGraph]. * The job is started immediately. */ - infix fun Source.connectTo(sink: Sink): Result + fun connect( + pair: Pair, Sink>, + padSelector: (sourceSinkMap: Map, sourcePads: Map, sourcePad: UInt, metadata: P) -> UInt? = { _, _, pad, _ -> pad }, + ): Job /** * Configures the [FlowGraph]. * Currently, it does: * - wait for all connection jobs to be started */ - suspend fun configure(): Result + suspend fun configure(): Result } diff --git a/src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt b/src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt index 676b0ae..236bf11 100644 --- a/src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt +++ b/src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt @@ -2,6 +2,7 @@ package dev.silenium.libs.flows.impl import dev.silenium.libs.flows.api.* import kotlinx.coroutines.* +import kotlinx.coroutines.flow.map import kotlin.coroutines.CoroutineContext import kotlin.reflect.KClass @@ -98,25 +99,38 @@ internal class FlowGraphImpl(private val coroutineScope: CoroutineScope) : internal class FlowGraphConfigScopeImpl(private val flowGraph: FlowGraph) : FlowGraphConfigScope, FlowGraph by flowGraph { - private val connectionStarted = mutableSetOf() - - override fun Source.connectTo(sink: Sink): Result { - outputMetadata.forEach { (pad, metadata) -> - sink.configure(pad, metadata).onFailure { - return Result.failure(IllegalStateException("Unable to configure input pad $pad of sink $sink", it)) + private val configurationJobs = mutableSetOf() + + override fun connect( + pair: Pair, Sink>, + padSelector: (sourceSinkMap: Map, sourcePads: Map, sourcePad: UInt, metadata: P) -> UInt?, + ): Job { + val (source, sink) = pair + val padMap = mutableMapOf() + for ((sourcePad, metadata) in source.outputMetadata) { + val sinkPad = padSelector(padMap, source.outputMetadata, sourcePad, metadata) ?: continue + padMap[sourcePad] = sinkPad + } + padMap.forEach { (sourcePad, sinkPad) -> + val metadata = source.outputMetadata.getValue(sourcePad) + sink.configure(sinkPad, metadata).onFailure { + throw IllegalStateException("Unable to configure $sink:$sinkPad from $source:$sourcePad", it) } } val started = CompletableDeferred() return launch { started.complete(Unit) - flow.collect(sink) + source.flow + .map { it.copy(pad = padMap.getValue(it.pad)) } + .collect(sink) }.also { - connectionStarted.add(started) - }.let { Result.success(it) } + configurationJobs.add(started) + } } - override suspend fun configure(): Result = runCatching { - connectionStarted.joinAll() + override suspend fun configure(): Result = runCatching { + configurationJobs.joinAll() + flowGraph } } @@ -136,7 +150,7 @@ internal fun FlowGraph.builder() = FlowGraphConfigScopeImpl(this) suspend fun FlowGraph( coroutineContext: CoroutineContext = Dispatchers.Default, block: FlowGraphConfigScope.() -> Unit, -): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).apply { configure() } +): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).configure().getOrThrow() /** * Creates a new [FlowGraph] with the given [coroutineScope] and [block] configuration. @@ -152,4 +166,4 @@ suspend fun FlowGraph( suspend fun FlowGraph( coroutineScope: CoroutineScope, block: FlowGraphConfigScope.() -> Unit, -): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).apply { configure() } +): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).configure().getOrThrow() diff --git a/src/test/kotlin/dev/silenium/libs/flows/impl/FlowGraphImplTest.kt b/src/test/kotlin/dev/silenium/libs/flows/impl/FlowGraphImplTest.kt index 4d94909..f981723 100644 --- a/src/test/kotlin/dev/silenium/libs/flows/impl/FlowGraphImplTest.kt +++ b/src/test/kotlin/dev/silenium/libs/flows/impl/FlowGraphImplTest.kt @@ -20,8 +20,10 @@ class FlowGraphImplTest : FunSpec({ val source = source(BufferSource(0u to DataType.BASE64), "buffer-source") val sink = sink(BufferSink(), "buffer-sink") val decoder = transformer(Base64Decoder(), "base64-decoder") - source.connectTo(decoder) - decoder.connectTo(sink) + connect(source to decoder) + connect(decoder to sink) { _, _, sourcePad, _ -> + sourcePad + 1u + } } val source = graph.source>("buffer-source")!! val sink = graph.sink>("buffer-sink")!! @@ -30,9 +32,9 @@ class FlowGraphImplTest : FunSpec({ val inputBuffer = input.encodeBase64() source.impl.submit(0u, inputBuffer) inputBuffer.close() - val result = sink.impl.flow.firstOrNull { 0u in it && it[0u]!!.isNotEmpty() } + val result = sink.impl.flow.firstOrNull { 1u in it && it[1u]!!.isNotEmpty() } graph.close() - result.shouldNotBeNull()[0u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input + result.shouldNotBeNull()[1u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input } })