Skip to content

Commit

Permalink
feat: allow to pass a pad selector to connect for filtering and mappi…
Browse files Browse the repository at this point in the history
…ng source pads to different sink pads
  • Loading branch information
silenium-dev committed Aug 10, 2024
1 parent e413a6f commit f035ebf
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ fun main() = runBlocking {
val processor = transformer(MyTransformer(), "transformer")
val sink = sink(MySink(), "sink")

source.connectTo(processor).getOrThrow()
processor.connectTo(sink).getOrThrow()
connect(source to processor)
connect(processor to sink)
}
graph.source<MySource>("source")!!.impl.run()
graph.close()
Expand Down
7 changes: 5 additions & 2 deletions src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ interface FlowGraphConfigScope : FlowGraph {
* Creates a connection job in the [kotlinx.coroutines.CoroutineScope] of the [FlowGraph].
* The job is started immediately.
*/
infix fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Result<Job>
fun <T, P> connect(
pair: Pair<Source<T, P>, Sink<T, P>>,
padSelector: (sourceSinkMap: Map<UInt, UInt>, sourcePads: Map<UInt, P>, sourcePad: UInt, metadata: P) -> UInt? = { _, _, pad, _ -> pad },
): Job

/**
* Configures the [FlowGraph].
* Currently, it does:
* - wait for all connection jobs to be started
*/
suspend fun configure(): Result<Unit>
suspend fun configure(): Result<FlowGraph>
}
40 changes: 27 additions & 13 deletions src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dev.silenium.libs.flows.impl

import dev.silenium.libs.flows.api.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KClass

Expand Down Expand Up @@ -98,25 +99,38 @@ internal class FlowGraphImpl(private val coroutineScope: CoroutineScope) :

internal class FlowGraphConfigScopeImpl(private val flowGraph: FlowGraph) : FlowGraphConfigScope,
FlowGraph by flowGraph {
private val connectionStarted = mutableSetOf<Job>()

override fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Result<Job> {
outputMetadata.forEach { (pad, metadata) ->
sink.configure(pad, metadata).onFailure {
return Result.failure(IllegalStateException("Unable to configure input pad $pad of sink $sink", it))
private val configurationJobs = mutableSetOf<Job>()

override fun <T, P> connect(
pair: Pair<Source<T, P>, Sink<T, P>>,
padSelector: (sourceSinkMap: Map<UInt, UInt>, sourcePads: Map<UInt, P>, sourcePad: UInt, metadata: P) -> UInt?,
): Job {
val (source, sink) = pair
val padMap = mutableMapOf<UInt, UInt>()
for ((sourcePad, metadata) in source.outputMetadata) {
val sinkPad = padSelector(padMap, source.outputMetadata, sourcePad, metadata) ?: continue
padMap[sourcePad] = sinkPad
}
padMap.forEach { (sourcePad, sinkPad) ->
val metadata = source.outputMetadata.getValue(sourcePad)
sink.configure(sinkPad, metadata).onFailure {
throw IllegalStateException("Unable to configure $sink:$sinkPad from $source:$sourcePad", it)
}
}
val started = CompletableDeferred<Unit>()
return launch {
started.complete(Unit)
flow.collect(sink)
source.flow
.map { it.copy(pad = padMap.getValue(it.pad)) }
.collect(sink)
}.also {
connectionStarted.add(started)
}.let { Result.success(it) }
configurationJobs.add(started)
}
}

override suspend fun configure(): Result<Unit> = runCatching {
connectionStarted.joinAll()
override suspend fun configure(): Result<FlowGraph> = runCatching {
configurationJobs.joinAll()
flowGraph
}
}

Expand All @@ -136,7 +150,7 @@ internal fun FlowGraph.builder() = FlowGraphConfigScopeImpl(this)
suspend fun FlowGraph(
coroutineContext: CoroutineContext = Dispatchers.Default,
block: FlowGraphConfigScope.() -> Unit,
): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).apply { configure() }
): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).configure().getOrThrow()

/**
* Creates a new [FlowGraph] with the given [coroutineScope] and [block] configuration.
Expand All @@ -152,4 +166,4 @@ suspend fun FlowGraph(
suspend fun FlowGraph(
coroutineScope: CoroutineScope,
block: FlowGraphConfigScope.() -> Unit,
): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).apply { configure() }
): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).configure().getOrThrow()
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ class FlowGraphImplTest : FunSpec({
val source = source(BufferSource<Base64Buffer, DataType>(0u to DataType.BASE64), "buffer-source")
val sink = sink(BufferSink<ByteArray, DataType>(), "buffer-sink")
val decoder = transformer(Base64Decoder(), "base64-decoder")
source.connectTo(decoder)
decoder.connectTo(sink)
connect(source to decoder)
connect(decoder to sink) { _, _, sourcePad, _ ->
sourcePad + 1u
}
}
val source = graph.source<BufferSource<Base64Buffer, DataType>>("buffer-source")!!
val sink = graph.sink<BufferSink<ByteArray, DataType>>("buffer-sink")!!
Expand All @@ -30,9 +32,9 @@ class FlowGraphImplTest : FunSpec({
val inputBuffer = input.encodeBase64()
source.impl.submit(0u, inputBuffer)
inputBuffer.close()
val result = sink.impl.flow.firstOrNull { 0u in it && it[0u]!!.isNotEmpty() }
val result = sink.impl.flow.firstOrNull { 1u in it && it[1u]!!.isNotEmpty() }

graph.close()
result.shouldNotBeNull()[0u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input
result.shouldNotBeNull()[1u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input
}
})

0 comments on commit f035ebf

Please sign in to comment.