diff --git a/src/main/kotlin/extensions/Channel.kt b/src/main/kotlin/extensions/Channel.kt new file mode 100644 index 0000000..c6a0d73 --- /dev/null +++ b/src/main/kotlin/extensions/Channel.kt @@ -0,0 +1,24 @@ +package technology.idlab.extensions + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.launch + +internal fun Channel.map(scope: CoroutineScope, func: (R) -> E): SendChannel { + // Create the new channel. + val result = Channel() + + // Pipe the data through the function and into the new channel. + scope.launch { + for (data in result) { + this@map.send(func(data)) + } + } + + // Close the new channel if required. + this.invokeOnClose { result.close() } + + // Return the new channel. + return result +} diff --git a/src/main/kotlin/runner/impl/jvm/JVMRunner.kt b/src/main/kotlin/runner/impl/jvm/JVMRunner.kt index 5179c7f..84f04eb 100644 --- a/src/main/kotlin/runner/impl/jvm/JVMRunner.kt +++ b/src/main/kotlin/runner/impl/jvm/JVMRunner.kt @@ -1,12 +1,16 @@ package technology.idlab.runner.impl.jvm import arrow.core.zip +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout +import technology.idlab.extensions.map import technology.idlab.intermediate.IRArgument import technology.idlab.intermediate.IRParameter import technology.idlab.intermediate.IRProcessor @@ -20,6 +24,10 @@ private const val REQUIRES_PROCESSOR_BASE_CLASS = "Class does not extend Process class JVMRunner( fromProcessors: Channel, ) : Runner(fromProcessors) { + /** A shared coroutine scope. */ + private val job = Job() + private val scope = CoroutineScope(Dispatchers.Default + job) + /** Map of all stages in the runner. */ private val stages = mutableMapOf() @@ -48,6 +56,8 @@ class JVMRunner( override suspend fun exec() = coroutineScope { Log.shared.info("Executing all stages.") + job.start() + // Create a new job which routes the messages. val router = launch { Log.shared.debug("Begin routing messages in JVMRunner.") @@ -109,11 +119,13 @@ class JVMRunner( IRParameter.Type.INT -> value.toInt() IRParameter.Type.LONG -> value.toLong() IRParameter.Type.STRING -> value - IRParameter.Type.WRITER -> return Writer(this.fromProcessors, value) + IRParameter.Type.WRITER -> { + this.fromProcessors.map(scope) { Payload(value, it) } + } IRParameter.Type.READER -> { val channel = this.readers[value] ?: Channel() this.readers[value] = channel - return Reader(channel, value) + return channel } } } diff --git a/src/main/kotlin/runner/impl/jvm/Reader.kt b/src/main/kotlin/runner/impl/jvm/Reader.kt deleted file mode 100644 index 8881f16..0000000 --- a/src/main/kotlin/runner/impl/jvm/Reader.kt +++ /dev/null @@ -1,12 +0,0 @@ -package technology.idlab.runner.impl.jvm - -import kotlinx.coroutines.channels.ReceiveChannel -import technology.idlab.util.Log - -class Reader(private val channel: ReceiveChannel, private val channelURI: String) { - suspend fun read(): ByteArray { - val result = channel.receive() - Log.shared.debug("[$channelURI] -> '${result.decodeToString()}'") - return result - } -} diff --git a/src/main/kotlin/runner/impl/jvm/Writer.kt b/src/main/kotlin/runner/impl/jvm/Writer.kt deleted file mode 100644 index 4f1e2b8..0000000 --- a/src/main/kotlin/runner/impl/jvm/Writer.kt +++ /dev/null @@ -1,12 +0,0 @@ -package technology.idlab.runner.impl.jvm - -import kotlinx.coroutines.channels.Channel -import technology.idlab.runner.Runner -import technology.idlab.util.Log - -class Writer(private val channel: Channel, private val channelURI: String) { - suspend fun push(value: ByteArray) { - Log.shared.debug("'${value.decodeToString()}' -> [$channelURI]") - channel.send(Runner.Payload(channelURI, value)) - } -} diff --git a/src/main/kotlin/std/FileReader.kt b/src/main/kotlin/std/FileReader.kt index 8b25a11..26ce3e7 100644 --- a/src/main/kotlin/std/FileReader.kt +++ b/src/main/kotlin/std/FileReader.kt @@ -1,19 +1,19 @@ package technology.idlab.std import java.io.File +import kotlinx.coroutines.channels.SendChannel import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Writer class FileReader(args: Arguments) : Processor(args) { /** Arguments */ private val path: String = arguments["path"] - private val output: Writer = arguments["output"] + private val output: SendChannel = arguments["output"] /** Read the file as a single byte array and push it down the pipeline. */ override suspend fun exec() { val file = File(path) val bytes = file.readBytes() - output.push(bytes) + output.send(bytes) } } diff --git a/src/main/kotlin/std/FileWriter.kt b/src/main/kotlin/std/FileWriter.kt index 0f93eda..97505ce 100644 --- a/src/main/kotlin/std/FileWriter.kt +++ b/src/main/kotlin/std/FileWriter.kt @@ -1,9 +1,9 @@ package technology.idlab.std import java.io.File +import kotlinx.coroutines.channels.ReceiveChannel import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Reader class FileWriter(args: Arguments) : Processor(args) { /** Processor default values. */ @@ -13,7 +13,7 @@ class FileWriter(args: Arguments) : Processor(args) { /** Arguments */ private val path: String = arguments["path"] private val file = File(path) - private val input: Reader = arguments["input"] + private val input: ReceiveChannel = arguments["input"] private val overwrite: Boolean? = arguments["overwrite"] private val append: Boolean? = arguments["append"] @@ -36,9 +36,8 @@ class FileWriter(args: Arguments) : Processor(args) { /** All incoming values are parsed as byte and appended onto the file. */ override suspend fun exec() { - while (true) { - val result = input.read() - file.appendBytes(result) + for (data in input) { + file.appendBytes(data) } } } diff --git a/src/main/kotlin/std/HttpFetch.kt b/src/main/kotlin/std/HttpFetch.kt index 1a71654..4e12da6 100644 --- a/src/main/kotlin/std/HttpFetch.kt +++ b/src/main/kotlin/std/HttpFetch.kt @@ -6,9 +6,9 @@ import io.ktor.client.engine.cio.* import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* +import kotlinx.coroutines.channels.SendChannel import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Writer class HttpFetch(args: Arguments) : Processor(args) { /** Meta configuration. */ @@ -16,7 +16,7 @@ class HttpFetch(args: Arguments) : Processor(args) { /** Parameters. */ private val endpoint: String = arguments["endpoint"] - private val output: Writer = arguments["output"] + private val output: SendChannel = arguments["output"] private val headers: Array = arguments["headers"] private val method: String = arguments.get("method") ?: "GET" @@ -45,7 +45,7 @@ class HttpFetch(args: Arguments) : Processor(args) { // Push the result to the output. val bytes = res.readBytes() - output.push(bytes) + output.send(bytes) } internal fun overwriteEngine(engine: HttpClientEngine) { diff --git a/src/main/kotlin/std/RDFValidator.kt b/src/main/kotlin/std/RDFValidator.kt index 93ec47d..3f749a5 100644 --- a/src/main/kotlin/std/RDFValidator.kt +++ b/src/main/kotlin/std/RDFValidator.kt @@ -2,6 +2,8 @@ package technology.idlab.std import java.io.ByteArrayOutputStream import java.io.File +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import org.apache.jena.graph.Graph import org.apache.jena.ontology.OntModelSpec import org.apache.jena.rdf.model.ModelFactory @@ -9,8 +11,6 @@ import org.apache.jena.riot.RiotException import org.apache.jena.shacl.ShaclValidator import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Reader -import technology.idlab.runner.impl.jvm.Writer import technology.idlab.util.Log class RDFValidator(args: Arguments) : Processor(args) { @@ -21,8 +21,8 @@ class RDFValidator(args: Arguments) : Processor(args) { /** Arguments. */ private val errorIsFatal: Boolean? = arguments["error_is_fatal"] private val printReport: Boolean? = arguments["print_report"] - private val input: Reader = arguments["input"] - private val output: Writer = arguments["output"] + private val input: ReceiveChannel = arguments["input"] + private val output: SendChannel = arguments["output"] /** Runtime fields. */ private val shapes: Graph @@ -46,13 +46,10 @@ class RDFValidator(args: Arguments) : Processor(args) { /** Read incoming data, validate it, and output it. */ override suspend fun exec() { - while (true) { - // Read incoming data. - val res = input.read() - + for (data in input) { // Parse as a model. try { - model.read(res.inputStream(), null, "TURTLE") + model.read(data.inputStream(), null, "TURTLE") } catch (e: RiotException) { Log.shared.fatal("Failed to read incoming RDF data.") } @@ -62,7 +59,7 @@ class RDFValidator(args: Arguments) : Processor(args) { if (report.conforms()) { // Propagate to the output. - output.push(res) + output.send(data) } else { // Print the report if required. if (printReport ?: printReportDefault) { diff --git a/src/main/kotlin/std/Transparent.kt b/src/main/kotlin/std/Transparent.kt index b25297b..6e657a3 100644 --- a/src/main/kotlin/std/Transparent.kt +++ b/src/main/kotlin/std/Transparent.kt @@ -1,15 +1,22 @@ package technology.idlab.std +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Reader -import technology.idlab.runner.impl.jvm.Writer +import technology.idlab.util.Log class Transparent(args: Arguments) : Processor(args) { - private val input: Reader = arguments["input"] - private val output: Writer = arguments["output"] + private val input: ReceiveChannel = arguments["input"] + private val output: SendChannel = arguments["output"] override suspend fun exec() { - output.push(input.read()) + Log.shared.debug { "Transparent processor started" } + for (data in input) { + Log.shared.debug { "Received ${data.size} bytes" } + output.send(data) + } + output.close() + Log.shared.debug { "Transparent processor finished" } } } diff --git a/src/test/kotlin/processors/TappedReader.kt b/src/test/kotlin/processors/TappedReader.kt index ae99a6f..7a909de 100644 --- a/src/test/kotlin/processors/TappedReader.kt +++ b/src/test/kotlin/processors/TappedReader.kt @@ -1,13 +1,13 @@ package processors import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel import technology.idlab.intermediate.IRArgument import technology.idlab.intermediate.IRParameter import technology.idlab.intermediate.IRProcessor import technology.idlab.intermediate.IRStage import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Reader /** * The TappedReader processor provides a convenient way to read data from the pipeline during @@ -16,11 +16,11 @@ import technology.idlab.runner.impl.jvm.Reader */ class TappedReader(args: Arguments) : Processor(args) { /** The channel which is exposed to the pipeline. */ - private val input: Reader = arguments["input"] + private val input: ReceiveChannel = arguments["input"] /** Continuously read data from the input and write it to the global channel. */ override suspend fun exec() { - output.send(input.read()) + output.send(input.receive()) output.close() } diff --git a/src/test/kotlin/processors/TappedWriter.kt b/src/test/kotlin/processors/TappedWriter.kt index 1e05ace..de42328 100644 --- a/src/test/kotlin/processors/TappedWriter.kt +++ b/src/test/kotlin/processors/TappedWriter.kt @@ -1,13 +1,13 @@ package processors import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel import technology.idlab.intermediate.IRArgument import technology.idlab.intermediate.IRParameter import technology.idlab.intermediate.IRProcessor import technology.idlab.intermediate.IRStage import technology.idlab.runner.impl.jvm.Arguments import technology.idlab.runner.impl.jvm.Processor -import technology.idlab.runner.impl.jvm.Writer /** * The TappedWriter processor provides a convenient way to write data into the pipeline during @@ -16,11 +16,11 @@ import technology.idlab.runner.impl.jvm.Writer */ class TappedWriter(args: Arguments) : Processor(args) { /** Writer which is exposed to the pipeline. */ - private val output: Writer = arguments["output"] + private val output: SendChannel = arguments["output"] /** Continuously read data from the global channel and write it to the output. */ override suspend fun exec() { - output.push(input.receive()) + output.send(input.receive()) input.close() } diff --git a/src/test/resources/e2e/data/invalid.ttl b/src/test/resources/e2e/data/invalid.ttl deleted file mode 100644 index ec9eddc..0000000 --- a/src/test/resources/e2e/data/invalid.ttl +++ /dev/null @@ -1,14 +0,0 @@ -@prefix test: . -@prefix xsd: . - -# The coordinates should be given as floats, but are given as integers instead. - - a test:Coordinate ; - test:latitude "51"^^xsd:int ; - test:longitude "3"^^xsd:int . - -# The coordinates are given by the incorrect URI. - - a test:Coordinate ; - test:lat "41.39"^^xsd:float ; - test:lon "2.14"^^xsd:float . diff --git a/src/test/resources/e2e/data/valid.ttl b/src/test/resources/e2e/data/valid.ttl deleted file mode 100644 index fca41f8..0000000 --- a/src/test/resources/e2e/data/valid.ttl +++ /dev/null @@ -1,14 +0,0 @@ -@prefix test: . -@prefix xsd: . - -# Correct entry. - - a test:Coordinate ; - test:latitude "51.05"^^xsd:float ; - test:longitude "3.73"^^xsd:float . - -# Correct entry. - - a test:Coordinate ; - test:latitude "41.39"^^xsd:float ; - test:longitude "2.14"^^xsd:float . diff --git a/src/test/resources/e2e/shacl/shapes.ttl b/src/test/resources/e2e/shacl/shapes.ttl deleted file mode 100644 index 7191ef1..0000000 --- a/src/test/resources/e2e/shacl/shapes.ttl +++ /dev/null @@ -1,24 +0,0 @@ -@prefix rdf: . -@prefix sh: . -@prefix test: . -@prefix xsd: . - -# Definition of a simple shape which requires a shape to have it's coordinates -# within the range of -90 to 90 for latitude and -180 to 180 for longitude as -# floats. -[] - a sh:NodeShape ; - sh:closed true ; - sh:targetClass test:Coordinate ; - sh:ignoredProperties ( rdf:type ) ; - sh:property [ - sh:path test:latitude ; - sh:datatype xsd:float ; - sh:minInclusive -90 ; - sh:maxInclusive 90 ; - ], [ - sh:path test:longitude ; - sh:datatype xsd:float ; - sh:minInclusive -180 ; - sh:maxInclusive 180 ; - ] .