Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Querier (unstable) to Zenoh-Kotlin #316

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tasks {
"ZPub",
"ZPubThr",
"ZPut",
"ZQuerier",
"ZQueryable",
"ZScout",
"ZSub",
Expand Down
91 changes: 91 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZQuerier.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.bytes.ZBytes
import io.zenoh.query.QueryTarget
import io.zenoh.query.intoSelector
import java.time.Duration

class ZQuerier(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Querier example"
) {

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

val session = Zenoh.open(config).getOrThrow()
val selector = selector.intoSelector().getOrThrow()

val target = target ?.let{ QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING
val timeout = Duration.ofMillis(timeout)
val querier = session.declareQuerier(selector.keyExpr, target, timeout = timeout).getOrThrow()

for (idx in 0..Int.MAX_VALUE) {
Thread.sleep(1000)
val payload = "[${idx.toString().padStart(4, ' ')}] ${payload ?: ""}"
println("Querying '$selector' with payload: '$payload'...")
querier.get(callback = {
it.result.onSuccess { sample ->
println(">> Received ('${sample.keyExpr}': '${sample.payload}')")
}.onFailure { error ->
println(">> Received (ERROR: '${error.message}')")
}
}, payload = ZBytes.from(payload), parameters = selector.parameters)
}
}

private val selector by option(
"-s",
"--selector",
help = "The selection of resources to query [default: demo/example/**]",
metavar = "selector"
).default("demo/example/**")
private val payload by option(
"-p", "--payload", help = "An optional payload to put in the query.", metavar = "payload"
)
private val target by option(
"-t",
"--target",
help = "The target queryables of the query. Default: BEST_MATCHING. " + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]",
metavar = "target"
)
private val timeout by option(
"-o", "--timeout", help = "The query timeout in milliseconds [default: 10000]", metavar = "timeout"
).long().default(10000)
private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZQuerier(args.isEmpty()).main(args)
1 change: 1 addition & 0 deletions zenoh-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod key_expr;
mod liveliness;
mod logger;
mod publisher;
mod querier;
mod query;
mod queryable;
mod scouting;
Expand Down
137 changes: 137 additions & 0 deletions zenoh-jni/src/querier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::Arc;

use jni::{
objects::{JByteArray, JClass, JObject, JString},
sys::jint,
JNIEnv,
};
use zenoh::{key_expr::KeyExpr, query::Querier, Wait};

use crate::{
errors::ZResult,
key_expr::process_kotlin_key_expr,
session::{on_reply_error, on_reply_success},
throw_exception,
utils::{
decode_byte_array, decode_encoding, decode_string, get_callback_global_ref, get_java_vm,
load_on_close,
},
zerror,
};

/// Perform a Zenoh GET through a querier.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
///
/// Parameters:
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `querier_ptr`: The raw pointer to the querier.
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] provided to the kotlin querier. May be null in case of using an
/// undeclared key expression.
/// - `key_expr_str`: String representation of the key expression used during the querier declaration.
/// It won't be considered in case a key_expr_ptr to a declared key expression is provided.
/// - `selector_params`: Optional selector parameters for the query.
/// - `callback`: Reference to the Kotlin callback to be run upon receiving a reply.
/// - `on_close`: Reference to a kotlin callback to be run upon finishing the get operation, mostly used for closing a provided channel.
/// - `attachment`: Optional attachment.
/// - `payload`: Optional payload for the query.
/// - `encoding_id`: Encoding id of the payload provided.
/// - `encoding_schema`: Encoding schema of the payload provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_getViaJNI(
mut env: JNIEnv,
_class: JClass,
querier_ptr: *const Querier,
key_expr_ptr: /*nullable*/ *const KeyExpr<'static>,
key_expr_str: JString,
selector_params: /*nullable*/ JString,
callback: JObject,
on_close: JObject,
attachment: /*nullable*/ JByteArray,
payload: /*nullable*/ JByteArray,
encoding_id: jint,
encoding_schema: /*nullable*/ JString,
) {
let querier = Arc::from_raw(querier_ptr);
let _ = || -> ZResult<()> {
let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?;
let java_vm = Arc::new(get_java_vm(&mut env)?);
let callback_global_ref = get_callback_global_ref(&mut env, callback)?;
let on_close_global_ref = get_callback_global_ref(&mut env, on_close)?;
let on_close = load_on_close(&java_vm, on_close_global_ref);
let mut get_builder = querier.get().callback(move |reply| {
|| -> ZResult<()> {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
tracing::debug!("Receiving reply through JNI: {:?}", reply);
let mut env = java_vm.attach_current_thread_as_daemon().map_err(|err| {
zerror!("Unable to attach thread for GET query callback: {}.", err)
})?;

match reply.result() {
Ok(sample) => {
on_reply_success(&mut env, reply.replier_id(), sample, &callback_global_ref)
}
Err(error) => {
on_reply_error(&mut env, reply.replier_id(), error, &callback_global_ref)
}
}
}()
.unwrap_or_else(|err| tracing::error!("Error on get callback: {err}"));
});

if !selector_params.is_null() {
let params = decode_string(&mut env, &selector_params)?;
get_builder = get_builder.parameters(params)
};

if !payload.is_null() {
let encoding = decode_encoding(&mut env, encoding_id, &encoding_schema)?;
get_builder = get_builder.encoding(encoding);
get_builder = get_builder.payload(decode_byte_array(&env, payload)?);
}

if !attachment.is_null() {
let attachment = decode_byte_array(&env, attachment)?;
get_builder = get_builder.attachment::<Vec<u8>>(attachment);
}

get_builder
.wait()
.map(|_| tracing::trace!("Performing get on '{key_expr}'.",))
.map_err(|err| zerror!(err))
}()
.map_err(|err| throw_exception!(env, err));
std::mem::forget(querier);
}

///
/// Frees the pointer of the querier.
///
/// After a call to this function, no further jni operations should be performed using the querier associated to the raw pointer provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_freePtrViaJNI(
_env: JNIEnv,
_: JClass,
querier_ptr: *const Querier<'static>,
) {
Arc::from_raw(querier_ptr);
}
65 changes: 64 additions & 1 deletion zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{
config::Config,
key_expr::KeyExpr,
pubsub::{Publisher, Subscriber},
query::{Query, Queryable, ReplyError, Selector},
query::{Querier, Query, Queryable, ReplyError, Selector},
sample::Sample,
session::{Session, ZenohId},
Wait,
Expand Down Expand Up @@ -514,6 +514,69 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI(
})
}

/// Declare a Zenoh querier via JNI.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
///
/// Parameters:
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the querier. May be null in case of using an
/// undeclared key expression.
/// - `key_expr_str`: String representation of the key expression to be used to declare the querier.
/// It won't be considered in case a key_expr_ptr to a declared key expression is provided.
/// - `target`: The ordinal value of the query target enum value.
/// - `consolidation`: The ordinal value of the consolidation enum value.
/// - `congestion_control`: The ordinal value of the congestion control enum value.
/// - `priority`: The ordinal value of the priority enum value.
/// - `is_express`: The boolean express value of the QoS provided.
/// - `timeout_ms`: The timeout in milliseconds.
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareQuerierViaJNI(
mut env: JNIEnv,
_class: JClass,
key_expr_ptr: /*nullable*/ *const KeyExpr<'static>,
key_expr_str: JString,
session_ptr: *const Session,
target: jint,
consolidation: jint,
congestion_control: jint,
priority: jint,
is_express: jboolean,
timeout_ms: jlong,
) -> *const Querier<'static> {
let session = Arc::from_raw(session_ptr);
|| -> ZResult<*const Querier<'static>> {
let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?;
let query_target = decode_query_target(target)?;
let consolidation = decode_consolidation(consolidation)?;
let congestion_control = decode_congestion_control(congestion_control)?;
let timeout = Duration::from_millis(timeout_ms as u64);
let priority = decode_priority(priority)?;
tracing::debug!("Declaring querier on '{}'...", key_expr);

let querier = session
.declare_querier(key_expr.to_owned())
.congestion_control(congestion_control)
.consolidation(consolidation)
.express(is_express != 0)
.target(query_target)
.priority(priority)
.timeout(timeout)
.wait()
.map_err(|err| zerror!(err))?;

tracing::debug!("Querier declared on '{}'.", key_expr);
std::mem::forget(session);
Ok(Arc::into_raw(Arc::new(querier)))
}()
.unwrap_or_else(|err| {
throw_exception!(env, err);
null()
})
}

/// Declare a Zenoh queryable via JNI.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
Expand Down
45 changes: 45 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,39 @@ class Session private constructor(private val config: Config) : AutoCloseable {
}, handler.receiver(), complete)
}

/**
* Declare a [Querier].
*
* A querier allows to send queries to a queryable.
*
* Queriers are automatically undeclared when dropped.
*
* Example:
* ```kotlin
* val session = Zenoh.open(config).getOrThrow();
* val keyExpr = "a/b/c".intoKeyExpr().getOrThrow();
*
* val querier = session.declareQuerier(keyExpr).getOrThrow();
* querier.get(callback = {
* it.result.onSuccess { sample ->
* println(">> Received ('${sample.keyExpr}': '${sample.payload}')")
* }.onFailure { error ->
* println(">> Received (ERROR: '${error.message}')")
* }
* }
* )
* ```
*/
fun declareQuerier(
keyExpr: KeyExpr,
target: QueryTarget = QueryTarget.BEST_MATCHING,
qos: QoS = QoS.default(),
consolidation: ConsolidationMode = ConsolidationMode.NONE,
DariusIMP marked this conversation as resolved.
Show resolved Hide resolved
timeout: Duration = Duration.ofMillis(10000)
): Result<Querier> {
return resolveQuerier(keyExpr, target, consolidation, qos, timeout)
}

/**
* Declare a [KeyExpr].
*
Expand Down Expand Up @@ -743,6 +776,18 @@ class Session private constructor(private val config: Config) : AutoCloseable {
} ?: Result.failure(sessionClosedException)
}

private fun resolveQuerier(
keyExpr: KeyExpr,
target: QueryTarget,
consolidation: ConsolidationMode,
qos: QoS,
timeout: Duration
): Result<Querier> {
return jniSession?.run {
declareQuerier(keyExpr, target, consolidation, qos, timeout)
} ?: Result.failure(sessionClosedException)
}

private fun <R> resolveGet(
selector: Selector,
callback: Callback<Reply>,
Expand Down
Loading
Loading