Skip to content

Commit

Permalink
feat: simple HTTP channels
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Apr 30, 2024
1 parent 416b4db commit ca84ed9
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 0 deletions.
6 changes: 6 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ repositories {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")

// HTTP dependencies.
implementation("io.ktor:ktor-client-core:2.3.10")
implementation("io.ktor:ktor-client-cio:2.3.10")
implementation("io.ktor:ktor-server-core:2.3.10")
implementation("io.ktor:ktor-server-netty:2.3.10")

// RDF dependencies.
implementation("org.apache.jena:apache-jena-libs:5.0.0")
implementation("org.apache.jena:jena-arq:5.0.0")
Expand Down
52 changes: 52 additions & 0 deletions src/main/kotlin/bridge/HttpReader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package bridge

import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.runBlocking
import technology.idlab.logging.Log

class HttpReader: Reader {
private val buffer = Channel<ByteArray>(Channel.Factory.UNLIMITED);

private val embeddedServer = embeddedServer(Netty, port = 8080) {
routing {
post("/") {
Log.shared.debug("Incoming request")
val body = call.receive<ByteArray>()
Log.shared.debug("Received ${body.size} bytes")
buffer.send(body)

Log.shared.debug("Responding")
call.response.status(HttpStatusCode.OK)
call.respondText("OK")
Log.shared.debug("Response sent")
}
}
}.start(wait = false)

override suspend fun read(): Reader.Result {
try {
val result = buffer.receive()
return Reader.Result.success(result)
} catch (e: ClosedReceiveChannelException) {
return Reader.Result.closed()
} catch (e: Exception) {
Log.shared.fatal(e)
}
}

override fun readSync(): Reader.Result {
return runBlocking { read() }
}

override fun isClosed(): Boolean {
return false;
}
}
35 changes: 35 additions & 0 deletions src/main/kotlin/bridge/HttpWriter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package technology.idlab.bridge

import bridge.Writer
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.request.*
import io.ktor.client.statement.*
import kotlinx.coroutines.runBlocking
import technology.idlab.logging.Log

class HttpWriter(private val endpoint: String): Writer {
private val client = HttpClient(CIO)

override suspend fun push(value: ByteArray) {
// Create request.
Log.shared.debug("POST $endpoint (${value.size} bytes)")
val res = client.post(endpoint) {
setBody(value)
}
Log.shared.debug("Received response: ${res.status.value} - ${res.bodyAsText()}")

// Check status code.
if (res.status.value != 200) {
Log.shared.fatal("ERROR: Status code ${res.status.value} received from $endpoint")
}
}

override fun pushSync(value: ByteArray) {
runBlocking { push(value) }
}

override fun close() {
client.close()
}
}
18 changes: 18 additions & 0 deletions src/main/resources/pipeline.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,21 @@ jvm:MemoryChannelReader rdfs:subClassOf jvm:ChannelReader.
];
sh:closed true;
sh:ignoredProperties ( rdf:type ).

# HTTP Channels are specific channels.
jvm:HttpChannelReader rdfs:subClassOf jvm:ChannelReader.
jvm:HttpChannelWriter rdfs:subClassOf jvm:ChannelWriter.

# Define a HTTP reader.
[]
a sh:NodeShape;
sh:targetClass jvm:HttpChannelReader;
sh:closed true;
sh:ignoredProperties ( rdf:type ).

# Define a HTTP writer.
[]
a sh:NodeShape;
sh:targetClass jvm:HttpChannelWriter;
sh:closed true;
sh:ignoredProperties ( rdf:type ).
22 changes: 22 additions & 0 deletions src/test/kotlin/channels/HttpTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package channels

import bridge.HttpReader
import technology.idlab.bridge.HttpWriter
import kotlin.test.Test
import kotlin.test.assertEquals

class HttpTest {
@Test
fun exec() {
// Create a writer and a reader.
val writer = HttpWriter("http://localhost:8080")
val reader = HttpReader()

// Push a value to the writer and read it from the reader.
writer.pushSync("Hello, World!".toByteArray())
val res = reader.readSync()

// Check if result is the same.
assertEquals("Hello, World!", String(res.value))
}
}

0 comments on commit ca84ed9

Please sign in to comment.