diff --git a/build.gradle.kts b/build.gradle.kts index a94aed9..2f1d49e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -12,6 +12,12 @@ repositories { dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + // HTTP dependencies. + implementation("io.ktor:ktor-client-core:2.3.10") + implementation("io.ktor:ktor-client-cio:2.3.10") + implementation("io.ktor:ktor-server-core:2.3.10") + implementation("io.ktor:ktor-server-netty:2.3.10") + // RDF dependencies. implementation("org.apache.jena:apache-jena-libs:5.0.0") implementation("org.apache.jena:jena-arq:5.0.0") diff --git a/src/main/kotlin/bridge/HttpReader.kt b/src/main/kotlin/bridge/HttpReader.kt new file mode 100644 index 0000000..18ca796 --- /dev/null +++ b/src/main/kotlin/bridge/HttpReader.kt @@ -0,0 +1,52 @@ +package bridge + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.engine.* +import io.ktor.server.netty.* +import io.ktor.server.request.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.runBlocking +import technology.idlab.logging.Log + +class HttpReader: Reader { + private val buffer = Channel(Channel.Factory.UNLIMITED); + + private val embeddedServer = embeddedServer(Netty, port = 8080) { + routing { + post("/") { + Log.shared.debug("Incoming request") + val body = call.receive() + Log.shared.debug("Received ${body.size} bytes") + buffer.send(body) + + Log.shared.debug("Responding") + call.response.status(HttpStatusCode.OK) + call.respondText("OK") + Log.shared.debug("Response sent") + } + } + }.start(wait = false) + + override suspend fun read(): Reader.Result { + try { + val result = buffer.receive() + return Reader.Result.success(result) + } catch (e: ClosedReceiveChannelException) { + return Reader.Result.closed() + } catch (e: Exception) { + Log.shared.fatal(e) + } + } + + override fun readSync(): Reader.Result { + return runBlocking { read() } + } + + override fun isClosed(): Boolean { + return false; + } +} diff --git a/src/main/kotlin/bridge/HttpWriter.kt b/src/main/kotlin/bridge/HttpWriter.kt new file mode 100644 index 0000000..28b9a3a --- /dev/null +++ b/src/main/kotlin/bridge/HttpWriter.kt @@ -0,0 +1,35 @@ +package technology.idlab.bridge + +import bridge.Writer +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.request.* +import io.ktor.client.statement.* +import kotlinx.coroutines.runBlocking +import technology.idlab.logging.Log + +class HttpWriter(private val endpoint: String): Writer { + private val client = HttpClient(CIO) + + override suspend fun push(value: ByteArray) { + // Create request. + Log.shared.debug("POST $endpoint (${value.size} bytes)") + val res = client.post(endpoint) { + setBody(value) + } + Log.shared.debug("Received response: ${res.status.value} - ${res.bodyAsText()}") + + // Check status code. + if (res.status.value != 200) { + Log.shared.fatal("ERROR: Status code ${res.status.value} received from $endpoint") + } + } + + override fun pushSync(value: ByteArray) { + runBlocking { push(value) } + } + + override fun close() { + client.close() + } +} diff --git a/src/main/resources/pipeline.ttl b/src/main/resources/pipeline.ttl index 2eb1319..fe9eb1c 100644 --- a/src/main/resources/pipeline.ttl +++ b/src/main/resources/pipeline.ttl @@ -75,3 +75,21 @@ jvm:MemoryChannelReader rdfs:subClassOf jvm:ChannelReader. ]; sh:closed true; sh:ignoredProperties ( rdf:type ). + +# HTTP Channels are specific channels. +jvm:HttpChannelReader rdfs:subClassOf jvm:ChannelReader. +jvm:HttpChannelWriter rdfs:subClassOf jvm:ChannelWriter. + +# Define a HTTP reader. +[] + a sh:NodeShape; + sh:targetClass jvm:HttpChannelReader; + sh:closed true; + sh:ignoredProperties ( rdf:type ). + +# Define a HTTP writer. +[] + a sh:NodeShape; + sh:targetClass jvm:HttpChannelWriter; + sh:closed true; + sh:ignoredProperties ( rdf:type ). diff --git a/src/test/kotlin/channels/HttpTest.kt b/src/test/kotlin/channels/HttpTest.kt new file mode 100644 index 0000000..622eac4 --- /dev/null +++ b/src/test/kotlin/channels/HttpTest.kt @@ -0,0 +1,22 @@ +package channels + +import bridge.HttpReader +import technology.idlab.bridge.HttpWriter +import kotlin.test.Test +import kotlin.test.assertEquals + +class HttpTest { + @Test + fun exec() { + // Create a writer and a reader. + val writer = HttpWriter("http://localhost:8080") + val reader = HttpReader() + + // Push a value to the writer and read it from the reader. + writer.pushSync("Hello, World!".toByteArray()) + val res = reader.readSync() + + // Check if result is the same. + assertEquals("Hello, World!", String(res.value)) + } +}