Skip to content

Commit

Permalink
Add support for QoS in sample. (#46)
Browse files Browse the repository at this point in the history
* add qos support to sample

* add docs

* move internal qos functionality into JNIQoS class

* suppress clippy too_many_arguments warning on decode_sample

* code clean up
  • Loading branch information
DenisBiryukov91 authored May 15, 2024
1 parent f890fb0 commit b884ce2
Show file tree
Hide file tree
Showing 24 changed files with 230 additions and 32 deletions.
4 changes: 2 additions & 2 deletions examples/src/main/kotlin/io.zenoh/ZPut.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.prelude.SampleKind
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority
import io.zenoh.prelude.CongestionControl
import io.zenoh.prelude.Priority

class ZPut(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Put example"
Expand Down
4 changes: 2 additions & 2 deletions zenoh-jni/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ pub(crate) fn decode_priority(priority: jint) -> Result<Priority> {

pub(crate) fn decode_congestion_control(congestion_control: jint) -> Result<CongestionControl> {
match congestion_control {
0 => Ok(CongestionControl::Block),
1 => Ok(CongestionControl::Drop),
1 => Ok(CongestionControl::Block),
0 => Ok(CongestionControl::Drop),
_value => Err(Error::Session(format!(
"Unknown congestion control '{_value}'."
))),
Expand Down
4 changes: 3 additions & 1 deletion zenoh-jni/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{mem, ops::Deref, sync::Arc};

use jni::{
objects::{GlobalRef, JByteArray, JClass, JPrimitiveArray, JValue},
sys::{jboolean, jint, jlong},
sys::{jboolean, jbyte, jint, jlong},
JNIEnv,
};
use zenoh::{
Expand Down Expand Up @@ -71,6 +71,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI(
sample_kind: jint,
timestamp_enabled: jboolean,
timestamp_ntp_64: jlong,
qos: jbyte,
attachment: JByteArray,
) {
let key_expr = Arc::from_raw(key_expr_ptr);
Expand All @@ -84,6 +85,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI(
sample_kind,
timestamp_enabled,
timestamp_ntp_64,
qos,
) {
Ok(sample) => sample,
Err(err) => {
Expand Down
5 changes: 3 additions & 2 deletions zenoh-jni/src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use zenoh::{
value::Value,
};

use crate::errors::Result;
use crate::{errors::Error, utils::attachment_to_vec};
use crate::{errors::Result, sample::qos_into_jbyte};

pub(crate) fn on_reply(
mut env: JNIEnv,
Expand Down Expand Up @@ -74,7 +74,7 @@ fn on_reply_success(
let result = match env.call_method(
callback_global_ref,
"run",
"(Ljava/lang/String;ZJ[BIIJZ[B)V",
"(Ljava/lang/String;ZJ[BIIJZB[B)V",
&[
JValue::from(&zenoh_id),
JValue::from(true),
Expand All @@ -84,6 +84,7 @@ fn on_reply_success(
JValue::from(kind),
JValue::from(timestamp as i64),
JValue::from(is_valid),
JValue::from(qos_into_jbyte(sample.qos)),
JValue::from(&attachment_bytes),
],
) {
Expand Down
56 changes: 53 additions & 3 deletions zenoh-jni/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ use crate::{
value::decode_value,
};
use jni::{
objects::JByteArray,
sys::{jboolean, jint, jlong},
objects::{JByteArray, JClass},
sys::{jboolean, jbyte, jint, jlong},
JNIEnv,
};
use uhlc::{Timestamp, ID, NTP64};
use zenoh::{
prelude::{KeyExpr, SampleKind},
sample::Sample,
sample::{QoS, Sample},
};

/// Attempts to reconstruct a Zenoh [Sample] from the Java/Kotlin fields specified.
#[allow(clippy::too_many_arguments)]
pub(crate) fn decode_sample(
env: &mut JNIEnv,
key_expr: KeyExpr<'static>,
Expand All @@ -36,6 +37,7 @@ pub(crate) fn decode_sample(
sample_kind: jint,
timestamp_enabled: jboolean,
timestamp_ntp_64: jlong,
qos: jbyte,
) -> Result<Sample> {
let value = decode_value(env, payload, encoding)?;
let mut sample = Sample::new(key_expr, value);
Expand All @@ -45,6 +47,7 @@ pub(crate) fn decode_sample(
} else {
None
};
sample.qos = qos_from_jbyte(qos);
Ok(sample)
}

Expand All @@ -58,3 +61,50 @@ pub(crate) fn decode_sample_kind(sample_kind: jint) -> Result<SampleKind> {
))),
}
}

pub fn qos_from_jbyte(qos: jbyte) -> QoS {
unsafe { std::mem::transmute::<jbyte, QoS>(qos) }
}

pub fn qos_into_jbyte(qos: QoS) -> jbyte {
unsafe { std::mem::transmute::<QoS, jbyte>(qos) }
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getPriorityViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jint {
qos_from_jbyte(qos).priority() as jint
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getCongestionControlViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jint {
qos_from_jbyte(qos).congestion_control() as jint
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_getExpressdViaJNI(
_env: JNIEnv,
_class: JClass,
qos: jbyte,
) -> jboolean {
qos_from_jbyte(qos).express() as jboolean
}

#[no_mangle]
#[allow(non_snake_case)]
pub extern "C" fn Java_io_zenoh_jni_JNIQoS_00024Companion_getDefaultQoSViaJNI(
_env: JNIEnv,
_class: JClass,
) -> jbyte {
qos_into_jbyte(QoS::default())
}
8 changes: 6 additions & 2 deletions zenoh-jni/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use jni::{
use zenoh::prelude::r#sync::*;
use zenoh::subscriber::Subscriber;

use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close};
use crate::{
errors::{Error, Result},
utils::attachment_to_vec,
};
use crate::{
sample::qos_into_jbyte,
utils::{get_callback_global_ref, get_java_vm, load_on_close},
};

/// Frees the memory associated with a Zenoh subscriber raw pointer via JNI.
///
Expand Down Expand Up @@ -134,14 +137,15 @@ pub(crate) unsafe fn declare_subscriber(
match env.call_method(
&callback_global_ref,
"run",
"(J[BIIJZ[B)V",
"(J[BIIJZB[B)V",
&[
JValue::from(key_expr_ptr as jlong),
JValue::from(&byte_array),
JValue::from(encoding),
JValue::from(kind),
JValue::from(timestamp as i64),
JValue::from(is_valid),
JValue::from(qos_into_jbyte(sample.qos)),
JValue::from(&attachment_bytes),
],
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package io.zenoh.jni

import io.zenoh.*
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority
import io.zenoh.prelude.CongestionControl
import io.zenoh.prelude.Priority
import io.zenoh.sample.Attachment
import io.zenoh.value.Value

Expand Down Expand Up @@ -64,7 +64,7 @@ internal class JNIPublisher(private val ptr: Long) {
* @return A [Result] with the status of the operation.
*/
fun setCongestionControl(congestionControl: CongestionControl): Result<Unit> = runCatching {
setCongestionControlViaJNI(congestionControl.ordinal, ptr)
setCongestionControlViaJNI(congestionControl.value, ptr)
}

/**
Expand Down
43 changes: 43 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQoS.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.jni

import io.zenoh.prelude.CongestionControl;
import io.zenoh.prelude.Priority;

internal class JNIQoS internal constructor(internal val qos: Byte) {

internal constructor(): this(getDefaultQoSViaJNI())

fun getExpress(): Boolean {
return getExpressViaJNI(qos)
}

fun getCongestionControl(): CongestionControl {
return CongestionControl.fromInt(getCongestionControlViaJNI(qos))
}

fun getPriority(): Priority {
return Priority.fromInt(getPriorityViaJNI(qos))
}

companion object {
private external fun getDefaultQoSViaJNI(): Byte
}

private external fun getPriorityViaJNI(_qos: Byte): Int
private external fun getCongestionControlViaJNI(_qos: Byte): Int
private external fun getExpressViaJNI(_qos:Byte): Boolean
}
2 changes: 2 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal class JNIQuery(private val ptr: Long) {
sample.kind.ordinal,
timestampEnabled,
if (timestampEnabled) sample.timestamp!!.ntpValue() else 0,
sample.qos.jniQoS.qos,
sample.attachment?.let { encodeAttachment(it) },
)
}
Expand All @@ -57,6 +58,7 @@ internal class JNIQuery(private val ptr: Long) {
sampleKind: Int,
timestampEnabled: Boolean,
timestampNtp64: Long,
qos: Byte,
attachment: ByteArray?,
)

Expand Down
11 changes: 7 additions & 4 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.zenoh.jni.callbacks.JNISubscriberCallback
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.prelude.Encoding
import io.zenoh.prelude.SampleKind
import io.zenoh.prelude.QoS
import io.zenoh.publication.Publisher
import io.zenoh.publication.Put
import io.zenoh.query.*
Expand Down Expand Up @@ -61,7 +62,7 @@ internal class JNISession {
val publisherRawPtr = declarePublisherViaJNI(
builder.keyExpr.jniKeyExpr!!.ptr,
sessionPtr.get(),
builder.congestionControl.ordinal,
builder.congestionControl.value,
builder.priority.value,
)
Publisher(
Expand All @@ -76,14 +77,15 @@ internal class JNISession {
keyExpr: KeyExpr, callback: Callback<Sample>, onClose: () -> Unit, receiver: R?, reliability: Reliability
): Result<Subscriber<R>> = runCatching {
val subCallback =
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes ->
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, qos, attachmentBytes ->
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
val sample = Sample(
KeyExpr(JNIKeyExpr(keyExprPtr)),
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp,
QoS(qos),
attachment
)
callback.run(sample)
Expand Down Expand Up @@ -124,7 +126,7 @@ internal class JNISession {
attachment: Attachment?
): Result<R?> = runCatching {
val getCallback =
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray ->
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, qos: Byte, attachmentBytes: ByteArray ->
if (success) {
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
Expand All @@ -133,6 +135,7 @@ internal class JNISession {
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp,
QoS(qos),
decodedAttachment
)
val reply = Reply.Success(replierId, sample)
Expand Down Expand Up @@ -192,7 +195,7 @@ internal class JNISession {
sessionPtr.get(),
put.value.payload,
put.value.encoding.knownEncoding.ordinal,
put.congestionControl.ordinal,
put.congestionControl.value,
put.priority.value,
put.kind.ordinal,
put.attachment?.let { encodeAttachment(it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal fun interface JNIGetCallback {
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean,
qos: Byte,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal fun interface JNISubscriberCallback {
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean,
qos: Byte,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.publication
package io.zenoh.prelude

/** The congestion control to be applied when routing the data. */
enum class CongestionControl {
enum class CongestionControl (val value: Int) {

/**
* Allows the message to be dropped if all buffers are full.
*/
DROP(0),

/**
* Prevents the message from being dropped at all cost.
* In the face of heavy congestion on a part of the network, this could result in your publisher node blocking.
*/
BLOCK,
BLOCK(1);

/**
* Allows the message to be dropped if all buffers are full.
*/
DROP;
companion object {
fun fromInt(value: Int) = entries.first { it.value == value }
}
}
Loading

0 comments on commit b884ce2

Please sign in to comment.