Skip to content

Commit

Permalink
feat: (buffersink) use shared flow
Browse files Browse the repository at this point in the history
  • Loading branch information
silenium-dev committed Aug 10, 2024
1 parent de0e439 commit c75c7c9
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package dev.silenium.libs.flows.buffer

import dev.silenium.libs.flows.api.FlowItem
import dev.silenium.libs.flows.api.Sink
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow

class BufferSink<T, P>(vararg pads: Pair<UInt, P>) : Sink<T, P> {
private val inputMetadata_: MutableMap<UInt, P?> = pads.toMap().toMutableMap()
override val inputMetadata: Map<UInt, P?> by ::inputMetadata_

private val buffer_: MutableMap<UInt, MutableList<FlowItem<T, P>>> = mutableMapOf()
val buffer: Map<UInt, List<FlowItem<T, P>>> by ::buffer_
private val flow_ = MutableStateFlow<Map<UInt, List<FlowItem<T, P>>>>(emptyMap())
val flow: StateFlow<Map<UInt, List<FlowItem<T, P>>>> = flow_.asStateFlow()
private val flow_ = MutableSharedFlow<Map<UInt, List<FlowItem<T, P>>>>(replay = 1)
val flow: SharedFlow<Map<UInt, List<FlowItem<T, P>>>> = flow_.asSharedFlow()

override suspend fun receive(item: FlowItem<T, P>): Result<Unit> {
buffer_.getOrPut(item.pad, ::mutableListOf).add(item)
Expand Down

0 comments on commit c75c7c9

Please sign in to comment.