Skip to content

Commit

Permalink
refactor: do not write wrappers for writer/reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jul 20, 2024
1 parent e465b36 commit d89bb7e
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 110 deletions.
24 changes: 24 additions & 0 deletions src/main/kotlin/extensions/Channel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package technology.idlab.extensions

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch

internal fun <E, R> Channel<E>.map(scope: CoroutineScope, func: (R) -> E): SendChannel<R> {
// Create the new channel.
val result = Channel<R>()

// Pipe the data through the function and into the new channel.
scope.launch {
for (data in result) {
this@map.send(func(data))
}
}

// Close the new channel if required.
this.invokeOnClose { result.close() }

// Return the new channel.
return result
}
16 changes: 14 additions & 2 deletions src/main/kotlin/runner/impl/jvm/JVMRunner.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package technology.idlab.runner.impl.jvm

import arrow.core.zip
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import technology.idlab.extensions.map
import technology.idlab.intermediate.IRArgument
import technology.idlab.intermediate.IRParameter
import technology.idlab.intermediate.IRProcessor
Expand All @@ -20,6 +24,10 @@ private const val REQUIRES_PROCESSOR_BASE_CLASS = "Class does not extend Process
class JVMRunner(
fromProcessors: Channel<Payload>,
) : Runner(fromProcessors) {
/** A shared coroutine scope. */
private val job = Job()
private val scope = CoroutineScope(Dispatchers.Default + job)

/** Map of all stages in the runner. */
private val stages = mutableMapOf<String, Processor>()

Expand Down Expand Up @@ -48,6 +56,8 @@ class JVMRunner(
override suspend fun exec() = coroutineScope {
Log.shared.info("Executing all stages.")

job.start()

// Create a new job which routes the messages.
val router = launch {
Log.shared.debug("Begin routing messages in JVMRunner.")
Expand Down Expand Up @@ -109,11 +119,13 @@ class JVMRunner(
IRParameter.Type.INT -> value.toInt()
IRParameter.Type.LONG -> value.toLong()
IRParameter.Type.STRING -> value
IRParameter.Type.WRITER -> return Writer(this.fromProcessors, value)
IRParameter.Type.WRITER -> {
this.fromProcessors.map<Payload, ByteArray>(scope) { Payload(value, it) }
}
IRParameter.Type.READER -> {
val channel = this.readers[value] ?: Channel()
this.readers[value] = channel
return Reader(channel, value)
return channel
}
}
}
Expand Down
12 changes: 0 additions & 12 deletions src/main/kotlin/runner/impl/jvm/Reader.kt

This file was deleted.

12 changes: 0 additions & 12 deletions src/main/kotlin/runner/impl/jvm/Writer.kt

This file was deleted.

6 changes: 3 additions & 3 deletions src/main/kotlin/std/FileReader.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package technology.idlab.std

import java.io.File
import kotlinx.coroutines.channels.SendChannel
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Writer

class FileReader(args: Arguments) : Processor(args) {
/** Arguments */
private val path: String = arguments["path"]
private val output: Writer = arguments["output"]
private val output: SendChannel<ByteArray> = arguments["output"]

/** Read the file as a single byte array and push it down the pipeline. */
override suspend fun exec() {
val file = File(path)
val bytes = file.readBytes()
output.push(bytes)
output.send(bytes)
}
}
9 changes: 4 additions & 5 deletions src/main/kotlin/std/FileWriter.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package technology.idlab.std

import java.io.File
import kotlinx.coroutines.channels.ReceiveChannel
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Reader

class FileWriter(args: Arguments) : Processor(args) {
/** Processor default values. */
Expand All @@ -13,7 +13,7 @@ class FileWriter(args: Arguments) : Processor(args) {
/** Arguments */
private val path: String = arguments["path"]
private val file = File(path)
private val input: Reader = arguments["input"]
private val input: ReceiveChannel<ByteArray> = arguments["input"]
private val overwrite: Boolean? = arguments["overwrite"]
private val append: Boolean? = arguments["append"]

Expand All @@ -36,9 +36,8 @@ class FileWriter(args: Arguments) : Processor(args) {

/** All incoming values are parsed as byte and appended onto the file. */
override suspend fun exec() {
while (true) {
val result = input.read()
file.appendBytes(result)
for (data in input) {
file.appendBytes(data)
}
}
}
6 changes: 3 additions & 3 deletions src/main/kotlin/std/HttpFetch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import kotlinx.coroutines.channels.SendChannel
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Writer

class HttpFetch(args: Arguments) : Processor(args) {
/** Meta configuration. */
private var engine: HttpClientEngine = CIO.create()

/** Parameters. */
private val endpoint: String = arguments["endpoint"]
private val output: Writer = arguments["output"]
private val output: SendChannel<ByteArray> = arguments["output"]
private val headers: Array<String> = arguments["headers"]
private val method: String = arguments.get<String?>("method") ?: "GET"

Expand Down Expand Up @@ -45,7 +45,7 @@ class HttpFetch(args: Arguments) : Processor(args) {

// Push the result to the output.
val bytes = res.readBytes()
output.push(bytes)
output.send(bytes)
}

internal fun overwriteEngine(engine: HttpClientEngine) {
Expand Down
17 changes: 7 additions & 10 deletions src/main/kotlin/std/RDFValidator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package technology.idlab.std

import java.io.ByteArrayOutputStream
import java.io.File
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import org.apache.jena.graph.Graph
import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.RiotException
import org.apache.jena.shacl.ShaclValidator
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Reader
import technology.idlab.runner.impl.jvm.Writer
import technology.idlab.util.Log

class RDFValidator(args: Arguments) : Processor(args) {
Expand All @@ -21,8 +21,8 @@ class RDFValidator(args: Arguments) : Processor(args) {
/** Arguments. */
private val errorIsFatal: Boolean? = arguments["error_is_fatal"]
private val printReport: Boolean? = arguments["print_report"]
private val input: Reader = arguments["input"]
private val output: Writer = arguments["output"]
private val input: ReceiveChannel<ByteArray> = arguments["input"]
private val output: SendChannel<ByteArray> = arguments["output"]

/** Runtime fields. */
private val shapes: Graph
Expand All @@ -46,13 +46,10 @@ class RDFValidator(args: Arguments) : Processor(args) {

/** Read incoming data, validate it, and output it. */
override suspend fun exec() {
while (true) {
// Read incoming data.
val res = input.read()

for (data in input) {
// Parse as a model.
try {
model.read(res.inputStream(), null, "TURTLE")
model.read(data.inputStream(), null, "TURTLE")
} catch (e: RiotException) {
Log.shared.fatal("Failed to read incoming RDF data.")
}
Expand All @@ -62,7 +59,7 @@ class RDFValidator(args: Arguments) : Processor(args) {

if (report.conforms()) {
// Propagate to the output.
output.push(res)
output.send(data)
} else {
// Print the report if required.
if (printReport ?: printReportDefault) {
Expand Down
17 changes: 12 additions & 5 deletions src/main/kotlin/std/Transparent.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package technology.idlab.std

import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Reader
import technology.idlab.runner.impl.jvm.Writer
import technology.idlab.util.Log

class Transparent(args: Arguments) : Processor(args) {
private val input: Reader = arguments["input"]
private val output: Writer = arguments["output"]
private val input: ReceiveChannel<ByteArray> = arguments["input"]
private val output: SendChannel<ByteArray> = arguments["output"]

override suspend fun exec() {
output.push(input.read())
Log.shared.debug { "Transparent processor started" }
for (data in input) {
Log.shared.debug { "Received ${data.size} bytes" }
output.send(data)
}
output.close()
Log.shared.debug { "Transparent processor finished" }
}
}
6 changes: 3 additions & 3 deletions src/test/kotlin/processors/TappedReader.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package processors

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import technology.idlab.intermediate.IRArgument
import technology.idlab.intermediate.IRParameter
import technology.idlab.intermediate.IRProcessor
import technology.idlab.intermediate.IRStage
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Reader

/**
* The TappedReader processor provides a convenient way to read data from the pipeline during
Expand All @@ -16,11 +16,11 @@ import technology.idlab.runner.impl.jvm.Reader
*/
class TappedReader(args: Arguments) : Processor(args) {
/** The channel which is exposed to the pipeline. */
private val input: Reader = arguments["input"]
private val input: ReceiveChannel<ByteArray> = arguments["input"]

/** Continuously read data from the input and write it to the global channel. */
override suspend fun exec() {
output.send(input.read())
output.send(input.receive())
output.close()
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/kotlin/processors/TappedWriter.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package processors

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import technology.idlab.intermediate.IRArgument
import technology.idlab.intermediate.IRParameter
import technology.idlab.intermediate.IRProcessor
import technology.idlab.intermediate.IRStage
import technology.idlab.runner.impl.jvm.Arguments
import technology.idlab.runner.impl.jvm.Processor
import technology.idlab.runner.impl.jvm.Writer

/**
* The TappedWriter processor provides a convenient way to write data into the pipeline during
Expand All @@ -16,11 +16,11 @@ import technology.idlab.runner.impl.jvm.Writer
*/
class TappedWriter(args: Arguments) : Processor(args) {
/** Writer which is exposed to the pipeline. */
private val output: Writer = arguments["output"]
private val output: SendChannel<ByteArray> = arguments["output"]

/** Continuously read data from the global channel and write it to the output. */
override suspend fun exec() {
output.push(input.receive())
output.send(input.receive())
input.close()
}

Expand Down
14 changes: 0 additions & 14 deletions src/test/resources/e2e/data/invalid.ttl

This file was deleted.

14 changes: 0 additions & 14 deletions src/test/resources/e2e/data/valid.ttl

This file was deleted.

24 changes: 0 additions & 24 deletions src/test/resources/e2e/shacl/shapes.ttl

This file was deleted.

0 comments on commit d89bb7e

Please sign in to comment.