diff --git a/src/main/java/runner/Processor.java b/src/main/java/runner/Processor.java index f4e2e67..3d1c81b 100644 --- a/src/main/java/runner/Processor.java +++ b/src/main/java/runner/Processor.java @@ -30,19 +30,23 @@ public Processor() { } protected T getArgument(String name) { + Log.Companion.getShared().debug(name); + Object result = arguments.get(name); + + if (result == null) { + Log.Companion.getShared().fatal("Argument " + name + " is missing."); + } + return (T) result; } protected Optional getOptionalArgument(String name) { + Log.Companion.getShared().debug(name + " (optional)"); return Optional.ofNullable(getArgument(name)); } - public void setup() { - log.info("Setting up processor"); - } + public void setup() {} - public void exec() { - log.info("Executing processor"); - } + public void exec() {} } diff --git a/src/main/kotlin/bridge/Bridge.kt b/src/main/kotlin/bridge/Bridge.kt new file mode 100644 index 0000000..4475d33 --- /dev/null +++ b/src/main/kotlin/bridge/Bridge.kt @@ -0,0 +1,3 @@ +package bridge + +interface Bridge: Reader, Writer diff --git a/src/main/kotlin/bridge/MemoryBridge.kt b/src/main/kotlin/bridge/MemoryBridge.kt new file mode 100644 index 0000000..71b6231 --- /dev/null +++ b/src/main/kotlin/bridge/MemoryBridge.kt @@ -0,0 +1,41 @@ +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import bridge.Bridge +import bridge.Reader +import technology.idlab.logging.Log + +class MemoryBridge : Bridge { + private var channel = Channel(10) + + override fun pushSync(value: ByteArray) { + Log.shared.debug("Pushing ${value.size} bytes") + runBlocking { channel.send(value) } + Log.shared.debug("Done") + } + + override fun readSync(): Reader.Result { + Log.shared.debug("Reading bytes") + val result = runBlocking { channel.receiveCatching() } + + // Check if the channel got closed. + if (result.isClosed) { + return Reader.Result.closed() + } + + // If an error occurred, the runner must handle it itself. + if (result.isFailure) { + Log.shared.fatal("Failed to read bytes") + } + + val bytes = result.getOrThrow() + return Reader.Result.success(bytes) + } + + override fun isClosed(): Boolean { + return channel.isClosedForSend + } + + override fun close() { + channel.close() + } +} diff --git a/src/main/kotlin/bridge/Reader.kt b/src/main/kotlin/bridge/Reader.kt new file mode 100644 index 0000000..f249ad1 --- /dev/null +++ b/src/main/kotlin/bridge/Reader.kt @@ -0,0 +1,43 @@ +package bridge + +import technology.idlab.logging.Log + + +interface Reader { + enum class ResultType { + SUCCESS, CLOSED; + } + + class Result(private val type: ResultType, value: ByteArray) { + val value: ByteArray + get() { + if (type == ResultType.SUCCESS) { + return field + } else { + Log.shared.fatal("Cannot get value from invalid read.") + } + } + + init { + this.value = value + } + + fun isClosed(): Boolean { + return type == ResultType.CLOSED + } + + companion object { + fun success(value: ByteArray): Result { + return Result(ResultType.SUCCESS, value) + } + + fun closed(): Result { + return Result(ResultType.CLOSED, ByteArray(0)) + } + } + } + + fun readSync(): Result + fun isClosed(): Boolean +} + diff --git a/src/main/kotlin/bridge/Writer.kt b/src/main/kotlin/bridge/Writer.kt new file mode 100644 index 0000000..a5b923f --- /dev/null +++ b/src/main/kotlin/bridge/Writer.kt @@ -0,0 +1,6 @@ +package bridge + +interface Writer { + fun pushSync(value: ByteArray) + fun close() +} diff --git a/src/main/kotlin/logging/Log.kt b/src/main/kotlin/logging/Log.kt index 9326f70..c695c8a 100644 --- a/src/main/kotlin/logging/Log.kt +++ b/src/main/kotlin/logging/Log.kt @@ -12,7 +12,7 @@ class Log { "TIME".padEnd(12, ' '), "THREAD".padEnd(6, ' '), "LEVEL".padEnd(7, ' '), - "LOCATION".padEnd(30, ' '), + "LOCATION".padEnd(50, ' '), "MESSAGE", ).joinToString(" ") println(header) @@ -21,7 +21,7 @@ class Log { "----".padEnd(12, ' '), "------".padEnd(6, ' '), "-----".padEnd(7, ' '), - "--------".padEnd(30, ' '), + "--------".padEnd(50, ' '), "-------", ).joinToString(" ") println(separator) @@ -40,7 +40,7 @@ class Log { time.padEnd(12, '0'), "[${Thread.currentThread().id}]".padEnd(6, ' '), level.padEnd(7, ' '), - name.padEnd(30, ' '), + name.padEnd(50, ' '), message, ).joinToString(" ") @@ -70,6 +70,10 @@ class Log { exitProcess(1) } + fun debug(message: String) { + print(message, "DEBUG") + } + companion object { val shared = Log() } diff --git a/src/main/kotlin/runner/Parser.kt b/src/main/kotlin/runner/Parser.kt index 9386571..0189814 100644 --- a/src/main/kotlin/runner/Parser.kt +++ b/src/main/kotlin/runner/Parser.kt @@ -1,5 +1,6 @@ package technology.idlab.runner +import MemoryBridge import io.reactivex.rxjava3.subjects.PublishSubject import org.apache.jena.ontology.OntModelSpec import org.apache.jena.query.QueryExecutionFactory @@ -110,7 +111,7 @@ class Parser(file: File) { Log.shared.info("Parsing stages") // Initialize the channel. - val channel = PublishSubject.create() + val bridge = MemoryBridge() // Initialize the producer. val producerClass = processors[0] @@ -118,14 +119,14 @@ class Parser(file: File) { producerArgs["start"] = 0 producerArgs["end"] = 5 producerArgs["step"] = 1 - producerArgs["outgoing"] = channel + producerArgs["writer"] = bridge val producerConstructor = producerClass.getConstructor(Map::class.java) val producer = producerConstructor.newInstance(producerArgs) // Initialize the consumer. val consumerClass = processors[1] val consumerArgs: MutableMap = mutableMapOf() - consumerArgs["incoming"] = channel + consumerArgs["reader"] = bridge val consumerConstructor = consumerClass.getConstructor(Map::class.java) val consumer = consumerConstructor.newInstance(consumerArgs) diff --git a/src/main/kotlin/runner/Pipeline.kt b/src/main/kotlin/runner/Pipeline.kt index b807de9..39f9b7d 100644 --- a/src/main/kotlin/runner/Pipeline.kt +++ b/src/main/kotlin/runner/Pipeline.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.runBlocking import runner.Processor import technology.idlab.logging.Log import java.io.File +import kotlin.concurrent.thread class Pipeline(config: File) { /** Processors described in the config. */ @@ -27,12 +28,10 @@ class Pipeline(config: File) { // Run execution phase. Log.shared.info("Running execution phase") - runBlocking { - processors.map { - async { it.exec() } - }.map { - it.await() - } + processors.map { + thread { it.exec() } + }.map { + it.join() } Log.shared.info("Pipeline executed successfully") diff --git a/src/main/resources/pipeline.ttl b/src/main/resources/pipeline.ttl index e434078..2eb1319 100644 --- a/src/main/resources/pipeline.ttl +++ b/src/main/resources/pipeline.ttl @@ -23,7 +23,7 @@ sh:maxCount 1; ]. -# A channel takes in a reader and writer. +# A bridge takes in a reader and writer. [] a sh:NodeShape; sh:targetClass jvm:Channel; @@ -44,21 +44,21 @@ jvm:MemoryChannel rdfs:subClassOf jvm:Channel. jvm:MemoryChannelWriter rdfs:subClassOf jvm:ChannelWriter. jvm:MemoryChannelReader rdfs:subClassOf jvm:ChannelReader. -# Define a memory channel reader. +# Define a memory bridge reader. [] a sh:NodeShape; sh:targetClass jvm:MemoryChannelReader; sh:closed true; sh:ignoredProperties ( rdf:type ). -# Define a memory channel writer. +# Define a memory bridge writer. [] a sh:NodeShape; sh:targetClass jvm:MemoryChannelWriter; sh:closed true; sh:ignoredProperties ( rdf:type ). -# A memory channel takes in a single reader and writer. +# A memory bridge takes in a single reader and writer. [] a sh:NodeShape; sh:targetClass jvm:MemoryChannel; diff --git a/src/test/resources/pipelines/range_reporter.ttl b/src/test/resources/pipelines/range_reporter.ttl index ddbcfae..1703813 100644 --- a/src/test/resources/pipelines/range_reporter.ttl +++ b/src/test/resources/pipelines/range_reporter.ttl @@ -13,11 +13,11 @@ <../processors/range.ttl>, <../processors/reporter.ttl>. -# Define a memory channel. +# Define a memory bridge. a jvm:MemoryChannelReader. a jvm:MemoryChannelWriter. - + a jvm:MemoryChannel; jvm:reader ; jvm:writer . diff --git a/src/test/resources/sources/Range.java b/src/test/resources/sources/Range.java index b0e9a6e..78f0ecc 100644 --- a/src/test/resources/sources/Range.java +++ b/src/test/resources/sources/Range.java @@ -1,4 +1,4 @@ -import io.reactivex.rxjava3.subjects.PublishSubject; +import bridge.Writer; import java.util.Map; import runner.Processor; @@ -9,7 +9,7 @@ public class Range extends Processor { private final int step; // Channels - private final PublishSubject outgoing; + private final Writer writer; public Range(Map args) { // Call super constructor. @@ -21,11 +21,7 @@ public Range(Map args) { this.step = this.getArgument("step"); // Channels - this.outgoing = this.getArgument("outgoing"); - } - - public void setup() { - log.info("Binding to outgoing channel."); + this.writer = this.getArgument("writer"); } public void exec() { @@ -33,10 +29,10 @@ public void exec() { for (int i = start; i < end; i += step) { log.info("Emitting " + i); - outgoing.onNext(Integer.toString(i)); + writer.pushSync(Integer.toString(i).getBytes()); } log.info("Closing outgoing channel."); - outgoing.onComplete(); + writer.close(); } } diff --git a/src/test/resources/sources/Reporter.java b/src/test/resources/sources/Reporter.java index 60d6373..9a9d58e 100644 --- a/src/test/resources/sources/Reporter.java +++ b/src/test/resources/sources/Reporter.java @@ -1,25 +1,27 @@ -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.disposables.Disposable; +import bridge.Reader; import java.util.Map; import runner.Processor; public class Reporter extends Processor { - private final Observable incoming; + private final Reader reader; public Reporter(Map args) { // Call super constructor. super(args); // Parameters - this.incoming = this.getArgument("incoming"); + this.reader = this.getArgument("reader"); } - public void setup() { - // Local variables - Disposable disposable = incoming.subscribe( - item -> log.info("Received item: " + item), - error -> log.severe("Error: " + error), - () -> log.info("Channel closed.") - ); + public void exec() { + while (!reader.isClosed()) { + Reader.Result result = reader.readSync(); + + if (result.isClosed()) { + break; + } + + log.info("Received item: " + new String(result.getValue())); + } } }