From f4f4351ca4aa6c653da191f8a18e681901cf8446 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 21 Feb 2024 21:58:26 +0100 Subject: [PATCH 1/5] add qos support to sample --- examples/src/main/kotlin/io.zenoh/ZPut.kt | 4 +- zenoh-jni/Cargo.toml | 4 +- zenoh-jni/src/put.rs | 4 +- zenoh-jni/src/query.rs | 4 +- zenoh-jni/src/reply.rs | 5 +- zenoh-jni/src/sample.rs | 55 ++++++++++++++++++- zenoh-jni/src/subscriber.rs | 8 ++- .../kotlin/io/zenoh/jni/JNIPublisher.kt | 6 +- .../kotlin/io/zenoh/jni/JNIQuery.kt | 2 + .../kotlin/io/zenoh/jni/JNISession.kt | 11 ++-- .../io/zenoh/jni/callbacks/JNIGetCallback.kt | 1 + .../jni/callbacks/JNISubscriberCallback.kt | 1 + .../CongestionControl.kt | 17 +++--- .../{publication => prelude}/Priority.kt | 6 +- .../commonMain/kotlin/io/zenoh/prelude/QoS.kt | 45 +++++++++++++++ .../kotlin/io/zenoh/publication/Delete.kt | 2 + .../kotlin/io/zenoh/publication/Publisher.kt | 2 + .../kotlin/io/zenoh/publication/Put.kt | 2 + .../commonMain/kotlin/io/zenoh/query/Reply.kt | 3 +- .../kotlin/io/zenoh/sample/Sample.kt | 2 + .../src/commonTest/kotlin/io/zenoh/GetTest.kt | 3 +- .../kotlin/io/zenoh/PublisherTest.kt | 5 +- .../kotlin/io/zenoh/QueryableTest.kt | 5 +- .../kotlin/io/zenoh/SubscriberTest.kt | 14 ++++- 24 files changed, 175 insertions(+), 36 deletions(-) rename zenoh-kotlin/src/commonMain/kotlin/io/zenoh/{publication => prelude}/CongestionControl.kt (80%) rename zenoh-kotlin/src/commonMain/kotlin/io/zenoh/{publication => prelude}/Priority.kt (87%) create mode 100644 zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt diff --git a/examples/src/main/kotlin/io.zenoh/ZPut.kt b/examples/src/main/kotlin/io.zenoh/ZPut.kt index 19f79a87..ee93cda5 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPut.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPut.kt @@ -16,8 +16,8 @@ package io.zenoh import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind -import io.zenoh.publication.CongestionControl -import io.zenoh.publication.Priority +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority fun main() { println("Opening Session") diff --git a/zenoh-jni/Cargo.toml b/zenoh-jni/Cargo.toml index 4e1341ac..1711a8f7 100644 --- a/zenoh-jni/Cargo.toml +++ b/zenoh-jni/Cargo.toml @@ -37,8 +37,8 @@ jni = "0.21.1" flume = "0.10.14" uhlc = "0.6.0" json5 = "0.4.1" -zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false } -zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false } +zenoh = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", default-features = false } +zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", default-features = false } [lib] name = "zenoh_jni" diff --git a/zenoh-jni/src/put.rs b/zenoh-jni/src/put.rs index 0e2cf410..212c4b9e 100644 --- a/zenoh-jni/src/put.rs +++ b/zenoh-jni/src/put.rs @@ -104,8 +104,8 @@ pub(crate) fn decode_priority(priority: jint) -> Result { pub(crate) fn decode_congestion_control(congestion_control: jint) -> Result { match congestion_control { - 0 => Ok(CongestionControl::Block), - 1 => Ok(CongestionControl::Drop), + 1 => Ok(CongestionControl::Block), + 0 => Ok(CongestionControl::Drop), _value => Err(Error::Session(format!( "Unknown congestion control '{_value}'." ))), diff --git a/zenoh-jni/src/query.rs b/zenoh-jni/src/query.rs index 333b6cfc..13eb4c29 100644 --- a/zenoh-jni/src/query.rs +++ b/zenoh-jni/src/query.rs @@ -16,7 +16,7 @@ use std::{mem, ops::Deref, sync::Arc}; use jni::{ objects::{GlobalRef, JByteArray, JClass, JPrimitiveArray, JValue}, - sys::{jboolean, jint, jlong}, + sys::{jboolean, jbyte, jint, jlong}, JNIEnv, }; use zenoh::{ @@ -71,6 +71,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( sample_kind: jint, timestamp_enabled: jboolean, timestamp_ntp_64: jlong, + qos: jbyte, attachment: JByteArray, ) { let key_expr = Arc::from_raw(key_expr_ptr); @@ -84,6 +85,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( sample_kind, timestamp_enabled, timestamp_ntp_64, + qos, ) { Ok(sample) => sample, Err(err) => { diff --git a/zenoh-jni/src/reply.rs b/zenoh-jni/src/reply.rs index f3660a03..1f74a020 100644 --- a/zenoh-jni/src/reply.rs +++ b/zenoh-jni/src/reply.rs @@ -26,8 +26,8 @@ use zenoh::{ value::Value, }; -use crate::errors::Result; use crate::{errors::Error, utils::attachment_to_vec}; +use crate::{errors::Result, sample::qos_into_jbyte}; pub(crate) fn on_reply( mut env: JNIEnv, @@ -74,7 +74,7 @@ fn on_reply_success( let result = match env.call_method( callback_global_ref, "run", - "(Ljava/lang/String;ZJ[BIIJZ[B)V", + "(Ljava/lang/String;ZJ[BIIJZB[B)V", &[ JValue::from(&zenoh_id), JValue::from(true), @@ -84,6 +84,7 @@ fn on_reply_success( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(qos_into_jbyte(sample.qos)), JValue::from(&attachment_bytes), ], ) { diff --git a/zenoh-jni/src/sample.rs b/zenoh-jni/src/sample.rs index 1d8dd23f..0c6aa83d 100644 --- a/zenoh-jni/src/sample.rs +++ b/zenoh-jni/src/sample.rs @@ -17,14 +17,14 @@ use crate::{ value::decode_value, }; use jni::{ - objects::JByteArray, - sys::{jboolean, jint, jlong}, + objects::{JByteArray, JClass}, + sys::{jboolean, jbyte, jint, jlong}, JNIEnv, }; use uhlc::{Timestamp, ID, NTP64}; use zenoh::{ prelude::{KeyExpr, SampleKind}, - sample::Sample, + sample::{QoS, Sample}, }; /// Attempts to reconstruct a Zenoh [Sample] from the Java/Kotlin fields specified. @@ -36,6 +36,7 @@ pub(crate) fn decode_sample( sample_kind: jint, timestamp_enabled: jboolean, timestamp_ntp_64: jlong, + qos: jbyte, ) -> Result { let value = decode_value(env, payload, encoding)?; let mut sample = Sample::new(key_expr, value); @@ -45,6 +46,7 @@ pub(crate) fn decode_sample( } else { None }; + sample.qos = qos_from_jbyte(qos); Ok(sample) } @@ -58,3 +60,50 @@ pub(crate) fn decode_sample_kind(sample_kind: jint) -> Result { ))), } } + +pub fn qos_from_jbyte(qos: jbyte) -> QoS { + unsafe { std::mem::transmute::(qos) } +} + +pub fn qos_into_jbyte(qos: QoS) -> jbyte { + unsafe { std::mem::transmute::(qos) } +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_prelude_QoS_getPriorityViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jint { + qos_from_jbyte(qos).priority() as jint +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_prelude_QoS_getCongestionControlViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jint { + qos_from_jbyte(qos).congestion_control() as jint +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_prelude_QoS_getExpressdViaJNI( + _env: JNIEnv, + _class: JClass, + qos: jbyte, +) -> jboolean { + qos_from_jbyte(qos).express() as jboolean +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn Java_io_zenoh_prelude_QoS_00024Companion_getDefaultViaJNI( + _env: JNIEnv, + _class: JClass, +) -> jbyte { + qos_into_jbyte(QoS::default()) +} diff --git a/zenoh-jni/src/subscriber.rs b/zenoh-jni/src/subscriber.rs index ae99f351..5dee8cfd 100644 --- a/zenoh-jni/src/subscriber.rs +++ b/zenoh-jni/src/subscriber.rs @@ -22,11 +22,14 @@ use jni::{ use zenoh::prelude::r#sync::*; use zenoh::subscriber::Subscriber; -use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close}; use crate::{ errors::{Error, Result}, utils::attachment_to_vec, }; +use crate::{ + sample::qos_into_jbyte, + utils::{get_callback_global_ref, get_java_vm, load_on_close}, +}; /// Frees the memory associated with a Zenoh subscriber raw pointer via JNI. /// @@ -134,7 +137,7 @@ pub(crate) unsafe fn declare_subscriber( match env.call_method( &callback_global_ref, "run", - "(J[BIIJZ[B)V", + "(J[BIIJZB[B)V", &[ JValue::from(key_expr_ptr as jlong), JValue::from(&byte_array), @@ -142,6 +145,7 @@ pub(crate) unsafe fn declare_subscriber( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(qos_into_jbyte(sample.qos)), JValue::from(&attachment_bytes), ], ) { diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt index 9182952b..28aa1da3 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt @@ -16,8 +16,8 @@ package io.zenoh.jni import io.zenoh.* import io.zenoh.prelude.SampleKind -import io.zenoh.publication.CongestionControl -import io.zenoh.publication.Priority +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.sample.Attachment import io.zenoh.value.Value @@ -82,7 +82,7 @@ internal class JNIPublisher(private val ptr: Long) { * @return A [Result] with the status of the operation. */ fun setCongestionControl(congestionControl: CongestionControl): Result = runCatching { - setCongestionControlViaJNI(congestionControl.ordinal, ptr) + setCongestionControlViaJNI(congestionControl.value, ptr) } /** diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt index 9ce20202..ce6bd7ff 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt @@ -36,6 +36,7 @@ internal class JNIQuery(private val ptr: Long) { sample.kind.ordinal, timestampEnabled, if (timestampEnabled) sample.timestamp!!.ntpValue() else 0, + sample.qos.qos, sample.attachment?.let { encodeAttachment(it) }, ) } @@ -57,6 +58,7 @@ internal class JNIQuery(private val ptr: Long) { sampleKind: Int, timestampEnabled: Boolean, timestampNtp64: Long, + qos: Byte, attachment: ByteArray?, ) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index 96a81f16..20602a10 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -24,6 +24,7 @@ import io.zenoh.jni.callbacks.JNISubscriberCallback import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.publication.Publisher import io.zenoh.publication.Put import io.zenoh.query.* @@ -61,7 +62,7 @@ internal class JNISession { val publisherRawPtr = declarePublisherViaJNI( builder.keyExpr.jniKeyExpr!!.ptr, sessionPtr.get(), - builder.congestionControl.ordinal, + builder.congestionControl.value, builder.priority.value, ) Publisher( @@ -76,7 +77,7 @@ internal class JNISession { keyExpr: KeyExpr, callback: Callback, onClose: () -> Unit, receiver: R?, reliability: Reliability ): Result> = runCatching { val subCallback = - JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes -> + JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, qos, attachmentBytes -> val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } val sample = Sample( @@ -84,6 +85,7 @@ internal class JNISession { Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), timestamp, + QoS(qos), attachment ) callback.run(sample) @@ -124,7 +126,7 @@ internal class JNISession { attachment: Attachment? ): Result = runCatching { val getCallback = - JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray -> + JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, qos: Byte, attachmentBytes: ByteArray -> if (success) { val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } @@ -133,6 +135,7 @@ internal class JNISession { Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), timestamp, + QoS(qos), decodedAttachment ) val reply = Reply.Success(replierId, sample) @@ -192,7 +195,7 @@ internal class JNISession { sessionPtr.get(), put.value.payload, put.value.encoding.knownEncoding.ordinal, - put.congestionControl.ordinal, + put.congestionControl.value, put.priority.value, put.kind.ordinal, put.attachment?.let { encodeAttachment(it) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt index 621eaf98..72ce911e 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt @@ -25,6 +25,7 @@ internal fun interface JNIGetCallback { kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, + qos: Byte, attachment: ByteArray, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt index 750888eb..5d57641e 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt @@ -22,6 +22,7 @@ internal fun interface JNISubscriberCallback { kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, + qos: Byte, attachment: ByteArray, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt similarity index 80% rename from zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt rename to zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt index 3662e542..d4f58ed9 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/CongestionControl.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt @@ -12,19 +12,22 @@ // ZettaScale Zenoh Team, // -package io.zenoh.publication +package io.zenoh.prelude /** The congestion control to be applied when routing the data. */ -enum class CongestionControl { +enum class CongestionControl (val value: Int) { + /** + * Allows the message to be dropped if all buffers are full. + */ + DROP(0), /** * Prevents the message from being dropped at all cost. * In the face of heavy congestion on a part of the network, this could result in your publisher node blocking. */ - BLOCK, + BLOCK(1); - /** - * Allows the message to be dropped if all buffers are full. - */ - DROP; + companion object { + fun fromInt(value: Int) = entries.first { it.value == value } + } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt similarity index 87% rename from zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt rename to zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt index a8821fca..820c871f 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Priority.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/Priority.kt @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -package io.zenoh.publication +package io.zenoh.prelude /** * The Priority of Zenoh messages. @@ -30,5 +30,9 @@ enum class Priority(val value: Int) { DATA(5), DATA_LOW(6), BACKGROUND(7); + + companion object { + fun fromInt(value: Int) = entries.first { it.value == value } + } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt new file mode 100644 index 00000000..b87a9647 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt @@ -0,0 +1,45 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.prelude + +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority + +class QoS internal constructor(internal val qos: Byte) { + + fun priority(): Priority { + return Priority.fromInt(getPriorityViaJNI(qos)) + } + + fun congestionControl(): CongestionControl { + return CongestionControl.fromInt(getCongestionControlViaJNI(qos)) + } + + fun express(): Boolean { + return getExpressViaJNI(qos); + } + + companion object { + fun default(): QoS { + return QoS(getDefaultViaJNI()) + } + + private external fun getDefaultViaJNI(): Byte + } + + private external fun getPriorityViaJNI(_qos: Byte): Int + private external fun getCongestionControlViaJNI(_qos: Byte): Int + private external fun getExpressViaJNI(_qos:Byte): Boolean +} \ No newline at end of file diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt index 22e34e33..65386ada 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Delete.kt @@ -15,6 +15,8 @@ package io.zenoh.publication import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.Session import io.zenoh.value.Value import io.zenoh.keyexpr.KeyExpr diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt index 50a9da2a..94d9b824 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt @@ -19,6 +19,8 @@ import io.zenoh.exceptions.SessionException import io.zenoh.jni.JNIPublisher import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.Priority +import io.zenoh.prelude.CongestionControl import io.zenoh.sample.Attachment import io.zenoh.value.Value diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt index f0a62c22..a54c9f5b 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Put.kt @@ -19,6 +19,8 @@ import io.zenoh.Session import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import io.zenoh.sample.Attachment import io.zenoh.value.Value diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt index b13e9bfb..97c5c7a2 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Reply.kt @@ -18,6 +18,7 @@ import io.zenoh.Resolvable import io.zenoh.ZenohType import io.zenoh.sample.Sample import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.value.Value import io.zenoh.keyexpr.KeyExpr import io.zenoh.sample.Attachment @@ -130,7 +131,7 @@ abstract class Reply private constructor(val replierId: String) : ZenohType { * Constructs the reply sample with the provided parameters and triggers the reply to the query. */ override fun res(): Result { - val sample = Sample(keyExpr, value, kind, timeStamp, attachment) + val sample = Sample(keyExpr, value, kind, timeStamp, QoS.default(), attachment) return query.reply(Success("", sample)).res() } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt index 5762d4cd..86e451b5 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt @@ -16,6 +16,7 @@ package io.zenoh.sample import io.zenoh.ZenohType import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.keyexpr.KeyExpr import io.zenoh.value.Value import org.apache.commons.net.ntp.TimeStamp @@ -37,6 +38,7 @@ class Sample( val value: Value, val kind: SampleKind, val timestamp: TimeStamp?, + val qos: QoS, val attachment: Attachment? = null ): ZenohType { override fun toString(): String { diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt index f8b87644..9ccdf34a 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/GetTest.kt @@ -18,6 +18,7 @@ import io.zenoh.handlers.Handler import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.query.Reply import io.zenoh.queryable.Queryable import io.zenoh.sample.Sample @@ -118,7 +119,7 @@ class GetTest { .res() .getOrThrow() declaredQueryables.add(queryable) - repliedSamples.add(Sample(keyExpr, value, kind, timestamp)) + repliedSamples.add(Sample(keyExpr, value, kind, timestamp, QoS.default())) } val sessionB = Session.open().getOrThrow() diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt index 16c6b403..0c9c64b5 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/PublisherTest.kt @@ -18,6 +18,7 @@ import io.zenoh.prelude.KnownEncoding import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.sample.Sample import io.zenoh.value.Value import kotlin.test.Test @@ -63,8 +64,8 @@ class PublisherTest { }.res() val testSamples = arrayListOf( - Sample(TEST_KEY_EXP, Value("Test PUT"), SampleKind.PUT, null), - Sample(TEST_KEY_EXP, Value("Test DELETE"), SampleKind.DELETE, null), + Sample(TEST_KEY_EXP, Value("Test PUT"), SampleKind.PUT, null, QoS.default()), + Sample(TEST_KEY_EXP, Value("Test DELETE"), SampleKind.DELETE, null, QoS.default()), ) session.declarePublisher(TEST_KEY_EXP).res().onSuccess { diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt index ce182fbf..e2c3a71b 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QueryableTest.kt @@ -17,6 +17,7 @@ package io.zenoh import io.zenoh.handlers.Handler import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.prelude.QoS import io.zenoh.query.Reply import io.zenoh.queryable.Query import io.zenoh.sample.Sample @@ -46,7 +47,7 @@ class QueryableTest { val sessionA = Session.open().getOrThrow() val sample = Sample( - TEST_KEY_EXP, Value(TEST_PAYLOAD), SampleKind.PUT, TimeStamp(Date.from(Instant.now())) + TEST_KEY_EXP, Value(TEST_PAYLOAD), SampleKind.PUT, TimeStamp(Date.from(Instant.now())), QoS.default() ) val queryable = sessionA.declareQueryable(TEST_KEY_EXP).with { query -> query.reply(TEST_KEY_EXP).success(sample.value).withTimeStamp(sample.timestamp!!).res() @@ -160,7 +161,7 @@ private class QueryHandler : Handler { val payload = "Hello queryable $counter!" counter++ val sample = Sample( - query.keyExpr, Value(payload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())) + query.keyExpr, Value(payload), SampleKind.PUT, TimeStamp(Date.from(Instant.now())), QoS.default() ) performedReplies.add(sample) query.reply(query.keyExpr).success(sample.value).withTimeStamp(sample.timestamp!!).res() diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt index 7f1c392c..fe681630 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SubscriberTest.kt @@ -20,6 +20,8 @@ import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.Encoding import io.zenoh.sample.Sample import io.zenoh.value.Value +import io.zenoh.prelude.CongestionControl +import io.zenoh.prelude.Priority import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlin.collections.ArrayList @@ -31,6 +33,8 @@ class SubscriberTest { companion object { val TEST_KEY_EXP = "example/testing/keyexpr".intoKeyExpr().getOrThrow() + val TEST_PRIORITY = Priority.DATA_HIGH; + val TEST_CONGESTION_CONTROL = CongestionControl.BLOCK; val testValues = arrayListOf( Value("Test 1".encodeToByteArray(), Encoding(KnownEncoding.TEXT_PLAIN)), @@ -52,6 +56,8 @@ class SubscriberTest { for ((index, sample) in receivedSamples.withIndex()) { assertEquals(sample.value, testValues[index]) + assertEquals(sample.qos.priority(), TEST_PRIORITY); + assertEquals(sample.qos.congestionControl(), TEST_CONGESTION_CONTROL); } } @@ -67,6 +73,8 @@ class SubscriberTest { assertEquals(queue.size, testValues.size) for ((index, sample) in queue.withIndex()) { assertEquals(sample.value, testValues[index]) + assertEquals(sample.qos.priority(), TEST_PRIORITY); + assertEquals(sample.qos.congestionControl(), TEST_CONGESTION_CONTROL); } } @@ -90,7 +98,11 @@ class SubscriberTest { } private fun publishTestValues(session: Session): ArrayList { - val publisher = session.declarePublisher(TEST_KEY_EXP).res().getOrThrow() + val publisher = session + .declarePublisher(TEST_KEY_EXP) + .congestionControl(TEST_CONGESTION_CONTROL) + .priority(TEST_PRIORITY) + .res().getOrThrow() testValues.forEach { value -> publisher.put(value).res() } return testValues } From 196b5d547a1bf4506b54d668a29f8f6a775cc5f0 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Thu, 22 Feb 2024 11:12:45 +0100 Subject: [PATCH 2/5] add docs --- .../commonMain/kotlin/io/zenoh/prelude/QoS.kt | 17 ++++++++++++++++- .../commonMain/kotlin/io/zenoh/sample/Sample.kt | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt index b87a9647..3c42e164 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt @@ -17,21 +17,36 @@ package io.zenoh.prelude import io.zenoh.prelude.CongestionControl import io.zenoh.prelude.Priority +/** + * Quality of service settings used to send zenoh message. + */ class QoS internal constructor(internal val qos: Byte) { - + + /** + * Returns priority of the message. + */ fun priority(): Priority { return Priority.fromInt(getPriorityViaJNI(qos)) } + /** + * Returns congestion control setting of the message. + */ fun congestionControl(): CongestionControl { return CongestionControl.fromInt(getCongestionControlViaJNI(qos)) } + /** + * Returns express flag. If it is true, the message is not batched to reduce the latency. + */ fun express(): Boolean { return getExpressViaJNI(qos); } companion object { + /** + * Returns default QoS settings. + */ fun default(): QoS { return QoS(getDefaultViaJNI()) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt index 86e451b5..1eec3a22 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/sample/Sample.kt @@ -31,6 +31,7 @@ import org.apache.commons.net.ntp.TimeStamp * @property value The [Value] of the sample. * @property kind The [SampleKind] of the sample. * @property timestamp Optional [TimeStamp]. + * @property qos The Quality of Service settings used to deliver the sample. * @property attachment Optional [Attachment]. */ class Sample( From 5ecfd1853ca0d21022472fdef2adf9f733da7b18 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 4 Mar 2024 12:04:13 +0100 Subject: [PATCH 3/5] move internal qos functionality into JNIQoS class --- zenoh-jni/Cargo.toml | 4 +- zenoh-jni/src/sample.rs | 8 ++-- .../commonMain/kotlin/io/zenoh/jni/JNIQoS.kt | 43 +++++++++++++++++++ .../kotlin/io/zenoh/jni/JNIQuery.kt | 2 +- .../commonMain/kotlin/io/zenoh/prelude/QoS.kt | 29 ++++--------- 5 files changed, 58 insertions(+), 28 deletions(-) create mode 100644 zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt diff --git a/zenoh-jni/Cargo.toml b/zenoh-jni/Cargo.toml index 1711a8f7..4e1341ac 100644 --- a/zenoh-jni/Cargo.toml +++ b/zenoh-jni/Cargo.toml @@ -37,8 +37,8 @@ jni = "0.21.1" flume = "0.10.14" uhlc = "0.6.0" json5 = "0.4.1" -zenoh = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", default-features = false } -zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/DenisBiryukov91/zenoh.git", branch = "feature/priority-in-sample", default-features = false } +zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false } +zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false } [lib] name = "zenoh_jni" diff --git a/zenoh-jni/src/sample.rs b/zenoh-jni/src/sample.rs index 0c6aa83d..4c56c6ac 100644 --- a/zenoh-jni/src/sample.rs +++ b/zenoh-jni/src/sample.rs @@ -71,7 +71,7 @@ pub fn qos_into_jbyte(qos: QoS) -> jbyte { #[no_mangle] #[allow(non_snake_case)] -pub extern "C" fn Java_io_zenoh_prelude_QoS_getPriorityViaJNI( +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getPriorityViaJNI( _env: JNIEnv, _class: JClass, qos: jbyte, @@ -81,7 +81,7 @@ pub extern "C" fn Java_io_zenoh_prelude_QoS_getPriorityViaJNI( #[no_mangle] #[allow(non_snake_case)] -pub extern "C" fn Java_io_zenoh_prelude_QoS_getCongestionControlViaJNI( +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getCongestionControlViaJNI( _env: JNIEnv, _class: JClass, qos: jbyte, @@ -91,7 +91,7 @@ pub extern "C" fn Java_io_zenoh_prelude_QoS_getCongestionControlViaJNI( #[no_mangle] #[allow(non_snake_case)] -pub extern "C" fn Java_io_zenoh_prelude_QoS_getExpressdViaJNI( +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getExpressdViaJNI( _env: JNIEnv, _class: JClass, qos: jbyte, @@ -101,7 +101,7 @@ pub extern "C" fn Java_io_zenoh_prelude_QoS_getExpressdViaJNI( #[no_mangle] #[allow(non_snake_case)] -pub extern "C" fn Java_io_zenoh_prelude_QoS_00024Companion_getDefaultViaJNI( +pub extern "C" fn Java_io_zenoh_jni_JNIQoS_00024Companion_getDefaultQoSViaJNI( _env: JNIEnv, _class: JClass, ) -> jbyte { diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt new file mode 100644 index 00000000..b49b06ac --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt @@ -0,0 +1,43 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.prelude.CongestionControl; +import io.zenoh.prelude.Priority; + +internal class JNIQoS internal constructor(internal val qos: Byte) { + + internal constructor(): this(getDefaultQoSViaJNI()) + + fun getExpress(): Boolean { + return getExpressViaJNI(qos) + } + + fun getCongestionControl(): CongestionControl { + return CongestionControl.fromInt(getCongestionControlViaJNI(qos)) + } + + fun getPriority(): Priority { + return Priority.fromInt(getPriorityViaJNI(qos)) + } + + companion object { + private external fun getDefaultQoSViaJNI(): Byte + } + + private external fun getPriorityViaJNI(_qos: Byte): Int + private external fun getCongestionControlViaJNI(_qos: Byte): Int + private external fun getExpressViaJNI(_qos:Byte): Boolean +} \ No newline at end of file diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt index ce6bd7ff..71d1e06d 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt @@ -36,7 +36,7 @@ internal class JNIQuery(private val ptr: Long) { sample.kind.ordinal, timestampEnabled, if (timestampEnabled) sample.timestamp!!.ntpValue() else 0, - sample.qos.qos, + sample.qos.jni().qos, sample.attachment?.let { encodeAttachment(it) }, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt index 3c42e164..75037d4e 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt @@ -13,48 +13,35 @@ // package io.zenoh.prelude +import io.zenoh.jni.JNIQoS; import io.zenoh.prelude.CongestionControl import io.zenoh.prelude.Priority - /** * Quality of service settings used to send zenoh message. */ -class QoS internal constructor(internal val qos: Byte) { - +class QoS internal constructor(private val jniQoS: JNIQoS) { + internal constructor(qos: Byte): this(JNIQoS(qos)) /** * Returns priority of the message. */ - fun priority(): Priority { - return Priority.fromInt(getPriorityViaJNI(qos)) - } - + fun priority(): Priority = jniQoS.getPriority() /** * Returns congestion control setting of the message. */ - fun congestionControl(): CongestionControl { - return CongestionControl.fromInt(getCongestionControlViaJNI(qos)) - } - + fun congestionControl(): CongestionControl = jniQoS.getCongestionControl() /** * Returns express flag. If it is true, the message is not batched to reduce the latency. */ - fun express(): Boolean { - return getExpressViaJNI(qos); - } + fun express(): Boolean = jniQoS.getExpress() + internal fun jni(): JNIQoS = this.jniQoS companion object { /** * Returns default QoS settings. */ fun default(): QoS { - return QoS(getDefaultViaJNI()) + return QoS(JNIQoS()) } - - private external fun getDefaultViaJNI(): Byte } - - private external fun getPriorityViaJNI(_qos: Byte): Int - private external fun getCongestionControlViaJNI(_qos: Byte): Int - private external fun getExpressViaJNI(_qos:Byte): Boolean } \ No newline at end of file From ee589a7f9b5eb47bcedd93aa235f69e7025b2c3c Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 14 May 2024 15:28:13 +0200 Subject: [PATCH 4/5] suppress clippy too_many_arguments warning on decode_sample --- zenoh-jni/src/sample.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh-jni/src/sample.rs b/zenoh-jni/src/sample.rs index 4c56c6ac..43495c19 100644 --- a/zenoh-jni/src/sample.rs +++ b/zenoh-jni/src/sample.rs @@ -28,6 +28,7 @@ use zenoh::{ }; /// Attempts to reconstruct a Zenoh [Sample] from the Java/Kotlin fields specified. +#[allow(clippy::too_many_arguments)] pub(crate) fn decode_sample( env: &mut JNIEnv, key_expr: KeyExpr<'static>, From ffdf3f758a6ed0e9a1d3639dea8e2c6e595c0c71 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 15 May 2024 10:49:46 +0200 Subject: [PATCH 5/5] code clean up --- .../src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt | 2 +- .../src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt | 2 +- .../kotlin/io/zenoh/prelude/CongestionControl.kt | 1 + .../src/commonMain/kotlin/io/zenoh/prelude/QoS.kt | 9 ++++++--- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt index b49b06ac..102ce446 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt @@ -40,4 +40,4 @@ internal class JNIQoS internal constructor(internal val qos: Byte) { private external fun getPriorityViaJNI(_qos: Byte): Int private external fun getCongestionControlViaJNI(_qos: Byte): Int private external fun getExpressViaJNI(_qos:Byte): Boolean -} \ No newline at end of file +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt index 71d1e06d..f0943584 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt @@ -36,7 +36,7 @@ internal class JNIQuery(private val ptr: Long) { sample.kind.ordinal, timestampEnabled, if (timestampEnabled) sample.timestamp!!.ntpValue() else 0, - sample.qos.jni().qos, + sample.qos.jniQoS.qos, sample.attachment?.let { encodeAttachment(it) }, ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt index d4f58ed9..26ab9fca 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/CongestionControl.kt @@ -16,6 +16,7 @@ package io.zenoh.prelude /** The congestion control to be applied when routing the data. */ enum class CongestionControl (val value: Int) { + /** * Allows the message to be dropped if all buffers are full. */ diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt index 75037d4e..ae0de029 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/prelude/QoS.kt @@ -20,23 +20,26 @@ import io.zenoh.prelude.Priority /** * Quality of service settings used to send zenoh message. */ -class QoS internal constructor(private val jniQoS: JNIQoS) { +class QoS internal constructor(internal val jniQoS: JNIQoS) { internal constructor(qos: Byte): this(JNIQoS(qos)) + /** * Returns priority of the message. */ fun priority(): Priority = jniQoS.getPriority() + /** * Returns congestion control setting of the message. */ fun congestionControl(): CongestionControl = jniQoS.getCongestionControl() + /** * Returns express flag. If it is true, the message is not batched to reduce the latency. */ fun express(): Boolean = jniQoS.getExpress() - internal fun jni(): JNIQoS = this.jniQoS companion object { + /** * Returns default QoS settings. */ @@ -44,4 +47,4 @@ class QoS internal constructor(private val jniQoS: JNIQoS) { return QoS(JNIQoS()) } } -} \ No newline at end of file +}