diff --git a/src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt b/src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt index ca98449..11170c8 100644 --- a/src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt +++ b/src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt @@ -2,9 +2,9 @@ package dev.silenium.libs.flows.buffer import dev.silenium.libs.flows.api.FlowItem import dev.silenium.libs.flows.api.Sink -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow class BufferSink(vararg pads: Pair) : Sink { private val inputMetadata_: MutableMap = pads.toMap().toMutableMap() @@ -12,8 +12,8 @@ class BufferSink(vararg pads: Pair) : Sink { private val buffer_: MutableMap>> = mutableMapOf() val buffer: Map>> by ::buffer_ - private val flow_ = MutableStateFlow>>>(emptyMap()) - val flow: StateFlow>>> = flow_.asStateFlow() + private val flow_ = MutableSharedFlow>>>(replay = 1) + val flow: SharedFlow>>> = flow_.asSharedFlow() override suspend fun receive(item: FlowItem): Result { buffer_.getOrPut(item.pad, ::mutableListOf).add(item)