Skip to content

Commit

Permalink
Publisher alignment (#236)
Browse files Browse the repository at this point in the history
* Adding KDoc to SetIntersectionLevel

* alignment(publisher): replacing CC and Priority attributes with functions

* alignment(publisher): adding encoding argument to declarePublisher
  • Loading branch information
DariusIMP authored Sep 19, 2024
1 parent 4b9f659 commit 1a836f5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 11 deletions.
8 changes: 5 additions & 3 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,17 @@ class Session private constructor(private val config: Config) : AutoCloseable {
*
* @param keyExpr The [KeyExpr] the publisher will be associated to.
* @param qos The [QoS] configuration of the publisher.
* @param encoding The default [Encoding] for the publications.
* @param reliability The [Reliability] the publisher wishes to obtain from the network.
* @return The result of the declaration, returning the publisher in case of success.
*/
fun declarePublisher(
keyExpr: KeyExpr,
qos: QoS = QoS.default(),
encoding: Encoding = Encoding.default(),
reliability: Reliability = Reliability.RELIABLE
): Result<Publisher> {
return resolvePublisher(keyExpr, qos, reliability)
return resolvePublisher(keyExpr, qos, encoding, reliability)
}

/**
Expand Down Expand Up @@ -807,9 +809,9 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return SessionInfo(this)
}

private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, reliability: Reliability): Result<Publisher> {
private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, encoding: Encoding, reliability: Reliability): Result<Publisher> {
return jniSession?.run {
declarePublisher(keyExpr, qos, reliability).onSuccess { declarations.add(it) }
declarePublisher(keyExpr, qos, encoding, reliability).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ internal class JNISession {
closeSessionViaJNI(sessionPtr.get())
}

fun declarePublisher(keyExpr: KeyExpr, qos: QoS, reliability: Reliability): Result<Publisher> = runCatching {
fun declarePublisher(keyExpr: KeyExpr, qos: QoS, encoding: Encoding, reliability: Reliability): Result<Publisher> = runCatching {
val publisherRawPtr = declarePublisherViaJNI(
keyExpr.jniKeyExpr?.ptr ?: 0,
keyExpr.keyExpr,
Expand All @@ -76,6 +76,7 @@ internal class JNISession {
Publisher(
keyExpr,
qos,
encoding,
JNIPublisher(publisherRawPtr),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

package io.zenoh.keyexpr

/**
* The possible relations between two sets.
*
* Note that [EQUALS] implies [INCLUDES], which itself implies [INTERSECTS].
*/
enum class SetIntersectionLevel(internal val value: Int) {
DISJOINT(0),
INTERSECTS(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ import io.zenoh.protocol.into
class Publisher internal constructor(
val keyExpr: KeyExpr,
val qos: QoS,
val encoding: Encoding,
private var jniPublisher: JNIPublisher?,
) : SessionDeclaration, AutoCloseable {

companion object {
private val InvalidPublisherResult = Result.failure<Unit>(ZError("Publisher is not valid."))
}

val congestionControl = qos.congestionControl
val priority = qos.priority
val express = qos.express
/** Get the congestion control applied when routing the data. */
fun congestionControl() = qos.congestionControl

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(payload, encoding, attachment) ?: InvalidPublisherResult
/** Get the priority of the written data. */
fun priority() = qos.priority

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult

/** Performs a PUT operation on the specified [keyExpr] with the specified string [message]. */
fun put(message: String, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(message.into(), encoding, attachment) ?: InvalidPublisherResult
fun put(message: String, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(message.into(), encoding ?: this.encoding, attachment) ?: InvalidPublisherResult

/**
* Performs a DELETE operation on the specified [keyExpr]
Expand Down
9 changes: 8 additions & 1 deletion zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PublisherTest {
fun setUp() {
session = Session.open(Config.default()).getOrThrow()
keyExpr = "example/testing/keyexpr".intoKeyExpr().getOrThrow()
publisher = session.declarePublisher(keyExpr).getOrThrow()
publisher = session.declarePublisher(keyExpr, encoding = Encoding.ZENOH_STRING).getOrThrow()
subscriber = session.declareSubscriber(keyExpr, callback = { sample ->
receivedSamples.add(sample)
}).getOrThrow()
Expand Down Expand Up @@ -75,4 +75,11 @@ class PublisherTest {
assertEquals(1, receivedSamples.size)
assertEquals(SampleKind.DELETE, receivedSamples[0].kind)
}

@Test
fun `when encoding is not provided a put should fallback to the publisher encoding`() {
publisher.put("Test")
assertEquals(1, receivedSamples.size)
assertEquals(Encoding.ZENOH_STRING, receivedSamples[0].encoding)
}
}

0 comments on commit 1a836f5

Please sign in to comment.