Skip to content

Commit

Permalink
feat: abstract reply and push functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Oct 13, 2023
1 parent 5838832 commit 8a5ee92
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 15 deletions.
26 changes: 26 additions & 0 deletions include/zenoh-pico/session/push.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <stdint.h>

#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/message.h"

#ifndef ZENOH_PICO_SESSION_PUSH_H
#define ZENOH_PICO_SESSION_PUSH_H

int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) ;

#endif /* ZENOH_PICO_SESSION_PUSH_H */
29 changes: 29 additions & 0 deletions include/zenoh-pico/session/reply.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <stdint.h>

#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/message.h"
#include "zenoh-pico/protocol/definitions/network.h"

#ifndef ZENOH_PICO_SESSION_REPLY_H
#define ZENOH_PICO_SESSION_REPLY_H

int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply);

int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final);

#endif /* ZENOH_PICO_SESSION_REPLY_H */
35 changes: 35 additions & 0 deletions src/session/push.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include "zenoh-pico/session/push.h"

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/session/subscription.h"
#include "zenoh-pico/utils/logging.h"

int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
int8_t ret = _Z_RES_OK;

// TODO check body to know where to dispatch
_z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_empty();
_z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : z_encoding_default();
int kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;

ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp);

return ret;
}
44 changes: 44 additions & 0 deletions src/session/reply.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include "zenoh-pico/session/reply.h"

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/session/query.h"
#include "zenoh-pico/utils/logging.h"

int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply) {
int8_t ret = _Z_RES_OK;

// TODO check id to know where to dispatch

#if Z_FEATURE_QUERY == 1
ret = _z_trigger_query_reply_partial(zn, id, key, reply->_value.payload, reply->_value.encoding, Z_SAMPLE_KIND_PUT,
reply->_timestamp);
#endif
return ret;
}

int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final) {
int8_t ret = _Z_RES_OK;

// TODO check id to know where to dispatch
_z_zint_t id = final->_request_id;

#if Z_FEATURE_QUERY == 1
_z_trigger_query_reply_final(zn, id);
#endif
return ret;
}
22 changes: 7 additions & 15 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
#include "zenoh-pico/protocol/definitions/message.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/query.h"
#include "zenoh-pico/session/push.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/reply.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/session/subscription.h"
Expand Down Expand Up @@ -81,11 +82,8 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_N_PUSH: {
_Z_DEBUG("Handling _Z_N_PUSH\n");
_z_n_msg_push_t push = msg->_body._push;
_z_bytes_t payload = push._body._is_put ? push._body._body._put._payload : _z_bytes_empty();
_z_encoding_t encoding = push._body._is_put ? push._body._body._put._encoding : z_encoding_default();
int kind = push._body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;
ret = _z_trigger_subscriptions(zn, push._key, payload, encoding, kind, push._timestamp);
_z_n_msg_push_t *push = &msg->_body._push;
ret = _z_trigger_push(zn, push);
} break;
case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST\n");
Expand Down Expand Up @@ -131,11 +129,8 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_n_msg_response_t response = msg->_body._response;
switch (response._tag) {
case _Z_RESPONSE_BODY_REPLY: {
_z_msg_reply_t reply = response._body._reply;
#if Z_FEATURE_QUERY == 1
ret = _z_trigger_query_reply_partial(zn, response._request_id, response._key, reply._value.payload,
reply._value.encoding, Z_SAMPLE_KIND_PUT, reply._timestamp);
#endif
_z_msg_reply_t *reply = &response._body._reply;
ret = _z_trigger_reply_partial(zn, response._request_id, response._key, reply);
} break;
case _Z_RESPONSE_BODY_ERR: {
// @TODO: expose errors to the user
Expand All @@ -161,10 +156,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_N_RESPONSE_FINAL: {
_Z_DEBUG("Handling _Z_N_RESPONSE_FINAL\n");
_z_zint_t id = msg->_body._response_final._request_id;
#if Z_FEATURE_QUERY == 1
_z_trigger_query_reply_final(zn, id);
#endif
ret = _z_trigger_reply_final(zn, &msg->_body._response_final);
} break;
}
_z_msg_clear(msg);
Expand Down

0 comments on commit 8a5ee92

Please sign in to comment.