Skip to content

Commit

Permalink
docs: add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
silenium-dev committed Aug 9, 2024
1 parent da1786a commit 03b182a
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 38 deletions.
16 changes: 14 additions & 2 deletions src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,20 @@ package dev.silenium.libs.flows.api

import kotlinx.coroutines.Job

interface FlowGraphBuilder : FlowGraph {
/**
* ConfigScope for creating connections between [Source]s and [Sink]s in a [FlowGraph].
*/
interface FlowGraphConfigScope : FlowGraph {
/**
* Creates a connection job in the [kotlinx.coroutines.CoroutineScope] of the [FlowGraph].
* The job is started immediately.
*/
infix fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Result<Job>

suspend fun finalize(): Result<Unit>
/**
* Configures the [FlowGraph].
* Currently, it does:
* - wait for all connection jobs to be started
*/
suspend fun configure(): Result<Unit>
}
82 changes: 79 additions & 3 deletions src/main/kotlin/dev/silenium/libs/flows/api/FlowGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,102 @@ package dev.silenium.libs.flows.api

import kotlinx.coroutines.CoroutineScope

/**
* A [FlowGraph] is a directed graph of [FlowGraphElement]s.
* It is used to create a flow of data between [Source]s, [Sink]s, and [Transformer]s.
* The [FlowGraph] is a [CoroutineScope] which will contain all connections between the elements.
* It is also an [AutoCloseable] to close all elements and cancel all connection jobs when the [FlowGraph] is closed.
*
* @see FlowGraphElement
* @see Source
* @see Sink
* @see Transformer
* @see CoroutineScope
* @see AutoCloseable
* @see SourceFlowGraphElement
* @see SinkFlowGraphElement
* @see TransformerFlowGraphElement
* @see FlowGraphConfigScope
*/
interface FlowGraph : AutoCloseable, CoroutineScope {
/**
* The list of [FlowGraphElement]s in the [FlowGraph].
*/
val elements: List<FlowGraphElement<*>>

/**
* The number of [FlowGraphElement]s in the [FlowGraph].
*/
val size: Int

/**
* Adds a source to the [FlowGraph].
* Don't use this for a [Transformer] which is a combined [Source] and [Sink].
*
* @param source The [Source] to add to the [FlowGraph].
* @param name The name of the [SourceFlowGraphElement].
* @return The [SourceFlowGraphElement] which was added to the [FlowGraph].
* @see SourceFlowGraphElement
* @see Source
*/
fun <T, P, E : Source<T, P>> source(
source: E,
name: String = "${source.javaClass.name}-${size}",
): SourceFlowGraphElement<T, P, E>

/**
* Adds a sink to the [FlowGraph].
* Don't use this for a [Transformer] which is a combined [Source] and [Sink].
*
* @param sink The [Sink] to add to the [FlowGraph].
* @param name The name of the [SinkFlowGraphElement].
* @return The [SinkFlowGraphElement] which was added to the [FlowGraph].
* @see SinkFlowGraphElement
* @see Sink
*/
fun <T, P, E : Sink<T, P>> sink(
sink: E,
name: String = "${sink.javaClass.name}-${size}",
): SinkFlowGraphElement<T, P, E>

/**
* Adds a transformer to the [FlowGraph].
*
* @param transform The [Transformer] to add to the [FlowGraph].
* @param name The name of the [TransformerFlowGraphElement].
* @return The [TransformerFlowGraphElement] which was added to the [FlowGraph].
* @see TransformerFlowGraphElement
* @see Transformer
*/
fun <IT, IP, OT, OP, E : Transformer<IT, IP, OT, OP>> transformer(
transform: E,
name: String = "${transform.javaClass.name}-${size}",
): TransformerFlowGraphElement<IT, IP, OT, OP, E>

fun <E : Source<*, *>> source(name: String): FlowGraphElement<E>?
fun <E : Sink<*, *>> sink(name: String): FlowGraphElement<E>?
fun <E : Transformer<*, *, *, *>> transformer(name: String): FlowGraphElement<E>?
/**
* Gets a [SourceFlowGraphElement] by its name.
*
* @param name The name of the [SourceFlowGraphElement].
* @return The [SourceFlowGraphElement] with the given name or `null` if it doesn't exist.
* @see SourceFlowGraphElement
*/
fun <E : Source<*, *>> source(name: String): SourceFlowGraphElement<*, *, E>?

/**
* Gets a [SinkFlowGraphElement] by its name.
*
* @param name The name of the [SinkFlowGraphElement].
* @return The [SinkFlowGraphElement] with the given name or `null` if it doesn't exist.
* @see SinkFlowGraphElement
*/
fun <E : Sink<*, *>> sink(name: String): SinkFlowGraphElement<*, *, E>?

/**
* Gets a [TransformerFlowGraphElement] by its name.
*
* @param name The name of the [TransformerFlowGraphElement].
* @return The [TransformerFlowGraphElement] with the given name or `null` if it doesn't exist.
* @see TransformerFlowGraphElement
*/
fun <E : Transformer<*, *, *, *>> transformer(name: String): TransformerFlowGraphElement<*, *, *, *, E>?
}
15 changes: 15 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/api/FlowGraphElement.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package dev.silenium.libs.flows.api

import kotlin.reflect.KClass

/**
* A [FlowGraphElement] is a single element in a [FlowGraph].
* It wraps the concrete implementation of a [Source], [Sink], or [Transformer].
*/
interface FlowGraphElement<T : Any> : AutoCloseable {
val name: String
val type: KClass<T>
Expand All @@ -10,7 +14,18 @@ interface FlowGraphElement<T : Any> : AutoCloseable {
override fun close()
}

/**
* A [FlowGraphElement] which is a [Source].
*/
interface SourceFlowGraphElement<T, P, S : Source<T, P>> : FlowGraphElement<S>, Source<T, P>

/**
* A [FlowGraphElement] which is a [Sink].
*/
interface SinkFlowGraphElement<T, P, S : Sink<T, P>> : FlowGraphElement<S>, Sink<T, P>

/**
* A [FlowGraphElement] which is a [Transformer].
*/
interface TransformerFlowGraphElement<IT, IP, OT, OP, T : Transformer<IT, IP, OT, OP>> :
FlowGraphElement<T>, Transformer<IT, IP, OT, OP>
35 changes: 31 additions & 4 deletions src/main/kotlin/dev/silenium/libs/flows/api/FlowItem.kt
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
package dev.silenium.libs.flows.api

data class FlowItem<T, P>(val pad: UInt, val metadata: P, val value: T) : ReferenceCounted<FlowItem<T, P>> {
/**
* Wraps the actual data of a flow item and contains its metadata and pad id.
*
* @param T The type of the data.
* @param P The type of the metadata.
* @property pad The pad id of the flow item.
* @property metadata The metadata of the flow item.
* @property value The actual data of the flow item.
* @see Reference
* @see AutoCloseable
*/
data class FlowItem<T, P>(val pad: UInt, val metadata: P, val value: T) : Reference<FlowItem<T, P>> {
/**
* Clones the flow item.
* If value and/or metadata are [Reference], they are cloned as well.
*/
@Suppress("UNCHECKED_CAST")
override fun clone(): Result<FlowItem<T, P>> {
return when (value) {
is ReferenceCounted<*> -> value.clone().map { FlowItem(pad, metadata, it as T) }
else -> Result.success(this)
val clonedValue = when (value) {
is Reference<*> -> value.clone() as T
else -> value
}
val clonedMetadata = when (metadata) {
is Reference<*> -> metadata.clone() as P
else -> metadata
}
return Result.success(FlowItem(pad, clonedMetadata, clonedValue))
}

/**
* Closes the flow item.
* If value and/or metadata are [AutoCloseable], they are closed as well.
*/
override fun close() {
if (value is AutoCloseable) {
(value as AutoCloseable).close()
}
if (metadata is AutoCloseable) {
(metadata as AutoCloseable).close()
}
}
}
21 changes: 21 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/api/Reference.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dev.silenium.libs.flows.api

/**
* A reference to a resource.
* The resource is closed when the reference count reaches zero and it implements [AutoCloseable].
*
* @param T The type of the underlying resource.
* @see AutoCloseable
*/
interface Reference<T : Reference<T>> : AutoCloseable {
/**
* Creates a new reference to the underlying resource.
*/
fun clone(): Result<T>

/**
* Destroys the reference to the underlying resource.
* If the reference count reaches zero, the resource is closed.
*/
override fun close()
}

This file was deleted.

11 changes: 11 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/api/Sink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package dev.silenium.libs.flows.api

import kotlinx.coroutines.flow.FlowCollector

/**
* A [Sink] is a flow element that consumes flow items.
* It can be configured with metadata for each input pad.
* It can be closed to release resources.
*
* @param T The type of the data.
* @param P The type of the metadata.
* @see FlowCollector
* @see AutoCloseable
* @see FlowItem
*/
interface Sink<T, P> : FlowCollector<FlowItem<T, P>>, AutoCloseable {
val inputMetadata: Map<UInt, P?>

Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/api/Source.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package dev.silenium.libs.flows.api

import kotlinx.coroutines.flow.Flow

/**
* A [Source] is a flow element that produces flow items.
* It can be queried for metadata for each output pad.
* It can be closed to release resources.
*
* @param T The type of the data.
* @param P The type of the metadata.
* @see AutoCloseable
* @see FlowItem
* @see Flow
*/
interface Source<T, P> : AutoCloseable {
val outputMetadata: Map<UInt, P>
val flow: Flow<FlowItem<T, P>>
Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/api/Transformer.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
package dev.silenium.libs.flows.api

/**
* A [Transformer] is a flow element that transforms flow items.
* It is simply both a [Sink] and a [Source].
*
* @param IT The type of the input data.
* @param IP The type of the input metadata.
* @param OT The type of the output data.
* @param OP The type of the output metadata.
* @see Sink
* @see Source
*/
interface Transformer<IT, IP, OT, OP> : Sink<IT, IP>, Source<OT, OP>
7 changes: 7 additions & 0 deletions src/main/kotlin/dev/silenium/libs/flows/base/SourceBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import dev.silenium.libs.flows.api.Source
import dev.silenium.libs.flows.impl.CloningFlow
import java.util.*

/**
* A base class for [Source] implementations.
* It provides a [CloningFlow] to publish flow items.
* It also provides a [metadata] map to store metadata for each output pad.
* It implements the [Source] interface.
* It is an [AutoCloseable] resource.
*/
abstract class SourceBase<T, P> : Source<T, P> {
override val outputMetadata: Map<UInt, P> get() = metadata.toMap()
override val flow = CloningFlow<FlowItem<T, P>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext

/**
* Executes the given [block] with this mutex locked.
* If the mutex is already locked in the current context, the [block] is executed immediately.
* Otherwise, this function suspends until this mutex is unlocked and then locks it.
*/
@OptIn(ExperimentalContracts::class)
suspend inline fun <T> Mutex.withReentrantLock(crossinline block: suspend () -> T): T {
contract {
Expand All @@ -22,10 +27,12 @@ suspend inline fun <T> Mutex.withReentrantLock(crossinline block: suspend () ->
}
}

class ReentrantMutexContextElement(
@PublishedApi
internal class ReentrantMutexContextElement(
override val key: ReentrantMutexContextKey
) : CoroutineContext.Element

data class ReentrantMutexContextKey(
@PublishedApi
internal data class ReentrantMutexContextKey(
val mutex: Mutex
) : CoroutineContext.Key<ReentrantMutexContextElement>
19 changes: 16 additions & 3 deletions src/main/kotlin/dev/silenium/libs/flows/impl/CloningFlow.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.silenium.libs.flows.impl

import dev.silenium.libs.flows.api.ReferenceCounted
import dev.silenium.libs.flows.api.Reference
import dev.silenium.libs.flows.concurrent.withReentrantLock
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
Expand All @@ -11,10 +11,17 @@ import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.resume

/**
* A [CloningFlow] is a flow that clones each item before publishing it to collectors.
* Every collector must close its items when it's done with them,
* as the flow will pass individual instances to each collector.
*
* @param T The type of the data.
* @param wrapped (Optional) the wrapped flow to clone from, can be null to only use manual publishing.
* @see Flow
* @see AutoCloseable
* @see Reference
*/
class CloningFlow<T : ReferenceCounted<T>>(private val wrapped: Flow<T>? = null) : Flow<T>, AutoCloseable {
class CloningFlow<T : Reference<T>>(private val wrapped: Flow<T>? = null) : Flow<T>, AutoCloseable {
private val idCounter = AtomicLong(0L)
private val collectors = ConcurrentHashMap<Long, FlowCollector<T>>()
private val finished = CompletableDeferred<Unit>()
Expand All @@ -31,7 +38,13 @@ class CloningFlow<T : ReferenceCounted<T>>(private val wrapped: Flow<T>? = null)
}

/**
* *Note: thread-safe*
* Publishes a value to all collectors.
* Each collector will receive a clone of the value, which it must close when it's done with it.
* *Note: this method is thread-safe*
*
* @param value The value to publish.
* @return A [Unit] result.
* @see Reference
*/
suspend fun publish(value: T): Unit = publishLock.withReentrantLock {
coroutineScope {
Expand Down
Loading

0 comments on commit 03b182a

Please sign in to comment.