Skip to content

Commit

Permalink
Add lru cache (#841)
Browse files Browse the repository at this point in the history
* feat: add lru cache module and tests

* feat: add keyexpr_compare function

* fix: clang format

* feat: add rx cache size config token

* refactor: pass keyexpr as pointer

* fix: remove debug functions

* feat: use lru cache for subscription cache

* feat: use lru cache for queryable cache

* feat: add lru bench and tree config token

* fix: improve _z_string_compare

Co-authored-by: Alexander Bushnev <[email protected]>

* fix: remove obsolete code

* feat: switch tree to sorted array

* fix: windows doesn't have ssize_t

* feat: add rx cache invalidation

* fix: modular test error

---------

Co-authored-by: Alexander Bushnev <[email protected]>
  • Loading branch information
jean-roland and sashacmc authored Dec 20, 2024
1 parent 31aaa7a commit 8dc7d13
Show file tree
Hide file tree
Showing 28 changed files with 823 additions and 194 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ if(UNIX OR MSVC)
add_executable(z_api_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_api_bytes_test.c)
add_executable(z_api_encoding_test ${PROJECT_SOURCE_DIR}/tests/z_api_encoding_test.c)
add_executable(z_refcount_test ${PROJECT_SOURCE_DIR}/tests/z_refcount_test.c)
add_executable(z_lru_cache_test ${PROJECT_SOURCE_DIR}/tests/z_lru_cache_test.c)

target_link_libraries(z_data_struct_test zenohpico::lib)
target_link_libraries(z_channels_test zenohpico::lib)
Expand All @@ -521,6 +522,7 @@ if(UNIX OR MSVC)
target_link_libraries(z_api_bytes_test zenohpico::lib)
target_link_libraries(z_api_encoding_test zenohpico::lib)
target_link_libraries(z_refcount_test zenohpico::lib)
target_link_libraries(z_lru_cache_test zenohpico::lib)

configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY)
Expand All @@ -545,6 +547,7 @@ if(UNIX OR MSVC)
add_test(z_api_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_bytes_test)
add_test(z_api_encoding_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_encoding_test)
add_test(z_refcount_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_refcount_test)
add_test(z_lru_cache_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_lru_cache_test)
endif()

if(BUILD_MULTICAST)
Expand Down
67 changes: 67 additions & 0 deletions include/zenoh-pico/collections/lru_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_COLLECTIONS_LRUCACHE_H
#define ZENOH_PICO_COLLECTIONS_LRUCACHE_H

#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/utils/result.h"

#ifdef __cplusplus
extern "C" {
#endif

// Three way comparison function pointer
typedef int (*_z_lru_val_cmp_f)(const void *first, const void *second);

// Node struct: {node_data; generic type}
typedef void _z_lru_cache_node_t;

/*-------- Dynamically allocated vector --------*/
/**
* A least recently used cache implementation
*/
typedef struct _z_lru_cache_t {
size_t capacity; // Max number of node
size_t len; // Number of node
_z_lru_cache_node_t *head; // List head
_z_lru_cache_node_t *tail; // List tail
_z_lru_cache_node_t **slist; // Sorted node list
} _z_lru_cache_t;

_z_lru_cache_t _z_lru_cache_init(size_t capacity);
void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f compare);
z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare);
void _z_lru_cache_clear(_z_lru_cache_t *cache);
void _z_lru_cache_delete(_z_lru_cache_t *cache);

#define _Z_LRU_CACHE_DEFINE(name, type, compare_f) \
typedef _z_lru_cache_t name##_lru_cache_t; \
static inline name##_lru_cache_t name##_lru_cache_init(size_t capacity) { return _z_lru_cache_init(capacity); } \
static inline type *name##_lru_cache_get(name##_lru_cache_t *cache, type *val) { \
return (type *)_z_lru_cache_get(cache, (void *)val, compare_f); \
} \
static inline z_result_t name##_lru_cache_insert(name##_lru_cache_t *cache, type *val) { \
return _z_lru_cache_insert(cache, (void *)val, sizeof(type), compare_f); \
} \
static inline void name##_lru_cache_clear(name##_lru_cache_t *cache) { _z_lru_cache_clear(cache); } \
static inline void name##_lru_cache_delete(name##_lru_cache_t *cache) { _z_lru_cache_delete(cache); }

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_COLLECTIONS_LRUCACHE_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void _z_string_move_str(_z_string_t *dst, char *src);
void _z_string_clear(_z_string_t *s);
void _z_string_free(_z_string_t **s);
void _z_string_reset(_z_string_t *s);
int _z_string_compare(const _z_string_t *left, const _z_string_t *right);
bool _z_string_equals(const _z_string_t *left, const _z_string_t *right);
_z_string_t _z_string_convert_bytes_le(const _z_slice_t *bs);
_z_string_t _z_string_preallocate(const size_t len);
Expand Down
5 changes: 5 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@
*/
#define Z_IOSLICE_SIZE 128

/**
* Default size for the rx cache size (if activated).
*/
#define Z_RX_CACHE_SIZE 10

/**
* Default get timeout in milliseconds.
*/
Expand Down
5 changes: 5 additions & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@
*/
#define Z_IOSLICE_SIZE 128

/**
* Default size for the rx cache size (if activated).
*/
#define Z_RX_CACHE_SIZE 10

/**
* Default get timeout in milliseconds.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void _z_scout(const z_what_t what, const _z_id_t zid, _z_string_t *locator, cons
* Returns:
* A numerical id of the declared resource.
*/
uint16_t _z_declare_resource(_z_session_t *zn, _z_keyexpr_t keyexpr);
uint16_t _z_declare_resource(_z_session_t *zn, const _z_keyexpr_t *keyexpr);

/**
* Associate a numerical id with the given resource key.
Expand Down Expand Up @@ -218,7 +218,7 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle);
* kind: The type of operation.
* attachment: An optional attachment to the reply.
*/
z_result_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_keyexpr_t keyexpr,
z_result_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_keyexpr_t *keyexpr,
const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority, bool is_express, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ static inline _z_query_t _z_query_alias(_z_value_t *value, _z_keyexpr_t *key, co
_z_session_rc_t *zn, uint32_t request_id, const _z_bytes_t *attachment,
bool anyke) {
_z_query_t ret;
ret._key = _z_keyexpr_alias(*key);
ret._value = _z_value_alias(*value);
ret._key = _z_keyexpr_alias(key);
ret._value = _z_value_alias(value);
ret._request_id = request_id;
ret._zn = _z_session_rc_clone_as_weak(zn);
ret._attachment = _z_bytes_alias(*attachment);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct _z_session_t {
_z_subscription_rc_list_t *_subscriptions;
_z_subscription_rc_list_t *_liveliness_subscriptions;
#if Z_FEATURE_RX_CACHE == 1
_z_subscription_cache_t _subscription_cache;
_z_subscription_lru_cache_t _subscription_cache;
#endif
#endif

Expand All @@ -86,7 +86,7 @@ typedef struct _z_session_t {
#if Z_FEATURE_QUERYABLE == 1
_z_session_queryable_rc_list_t *_local_queryable;
#if Z_FEATURE_RX_CACHE == 1
_z_queryable_cache_t _queryable_cache;
_z_queryable_lru_cache_t _queryable_cache;
#endif
#endif
#if Z_FEATURE_QUERY == 1
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ static inline bool _z_value_check(const _z_value_t *value) {
}
_z_value_t _z_value_steal(_z_value_t *value);
z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src);
_z_value_t _z_value_alias(_z_value_t src);
_z_value_t _z_value_alias(_z_value_t *src);
void _z_value_move(_z_value_t *dst, _z_value_t *src);
void _z_value_clear(_z_value_t *src);
void _z_value_free(_z_value_t **hello);
Expand Down
11 changes: 5 additions & 6 deletions include/zenoh-pico/protocol/keyexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,20 @@ bool _z_keyexpr_suffix_equals(const _z_keyexpr_t *left, const _z_keyexpr_t *righ
/*------------------ clone/Copy/Free helpers ------------------*/
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_keyexpr_t _z_keyexpr_null(void) { return (_z_keyexpr_t){0}; }
static inline _z_keyexpr_t _z_keyexpr_alias(const _z_keyexpr_t src) {
static inline _z_keyexpr_t _z_keyexpr_alias(const _z_keyexpr_t *src) {
_z_keyexpr_t ret;
ret._id = src._id;
ret._mapping = src._mapping;
ret._suffix = _z_string_alias(src._suffix);
ret._id = src->_id;
ret._mapping = src->_mapping;
ret._suffix = _z_string_alias(src->_suffix);
return ret;
}

int _z_keyexpr_compare(_z_keyexpr_t *first, _z_keyexpr_t *second);
_z_keyexpr_t _z_keyexpr_from_string(uint16_t rid, _z_string_t *str);
_z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len);
size_t _z_keyexpr_size(_z_keyexpr_t *p);
z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src);
_z_keyexpr_t _z_keyexpr_duplicate(const _z_keyexpr_t *src);
_z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src);
_z_keyexpr_t _z_keyexpr_alias(_z_keyexpr_t src);
/// Returns either keyexpr defined by id + mapping with null suffix if try_declared is true and id is non-zero,
/// or keyexpr defined by its suffix only, with 0 id and no mapping. This is to be used only when forwarding
/// keyexpr in user api to properly separate declared keyexpr from its suffix.
Expand Down
12 changes: 10 additions & 2 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <stdbool.h>
#include <zenoh-pico/session/session.h>

#include "zenoh-pico/collections/lru_cache.h"

// Forward declaration to avoid cyclical include
typedef struct _z_session_t _z_session_t;
typedef struct _z_session_rc_t _z_session_rc_t;
Expand All @@ -36,14 +38,20 @@ typedef struct {
_z_keyexpr_t ke_out;
_z_queryable_infos_svec_t infos;
size_t qle_nb;
} _z_queryable_cache_t;
} _z_queryable_cache_data_t;

void _z_queryable_cache_invalidate(_z_session_t *zn);
int _z_queryable_cache_data_compare(const void *first, const void *second);

#if Z_FEATURE_QUERYABLE == 1
#define _Z_QUERYABLE_COMPLETE_DEFAULT false
#define _Z_QUERYABLE_DISTANCE_DEFAULT 0

#if Z_FEATURE_RX_CACHE == 1
_Z_LRU_CACHE_DEFINE(_z_queryable, _z_queryable_cache_data_t, _z_queryable_cache_data_compare)
#endif

/*------------------ Queryable ------------------*/
void _z_queryable_cache_clear(_z_queryable_cache_t *cache);
_z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
z_result_t _z_trigger_queryables(_z_session_rc_t *zn, _z_msg_query_t *query, _z_keyexpr_t *q_key, uint32_t qid);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ uint16_t _z_get_resource_id(_z_session_t *zn);
_z_resource_t *_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t rid);
_z_resource_t *_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr);
_z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr);
uint16_t _z_register_resource(_z_session_t *zn, const _z_keyexpr_t key, uint16_t id, uint16_t register_to_mapping);
uint16_t _z_register_resource(_z_session_t *zn, const _z_keyexpr_t *key, uint16_t id, uint16_t register_to_mapping);
void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping);
void _z_unregister_resources_for_peer(_z_session_t *zn, uint16_t mapping);
void _z_flush_resources(_z_session_t *zn);
Expand Down
8 changes: 6 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H
#define INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H

#include "zenoh-pico/collections/lru_cache.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/session.h"
Expand All @@ -40,7 +41,10 @@ typedef struct {
_z_keyexpr_t ke_out;
_z_subscription_infos_svec_t infos;
size_t sub_nb;
} _z_subscription_cache_t;
} _z_subscription_cache_data_t;

void _z_subscription_cache_invalidate(_z_session_t *zn);
int _z_subscription_cache_data_compare(const void *first, const void *second);

/*------------------ Subscription ------------------*/
z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, _z_keyexpr_t *keyexpr, _z_bytes_t *payload,
Expand All @@ -59,7 +63,7 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke
#if Z_FEATURE_SUBSCRIPTION == 1

#if Z_FEATURE_RX_CACHE == 1
void _z_subscription_cache_clear(_z_subscription_cache_t *cache);
_Z_LRU_CACHE_DEFINE(_z_subscription, _z_subscription_cache_data_t, _z_subscription_cache_data_compare)
#endif

_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id);
Expand Down
16 changes: 8 additions & 8 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ z_result_t z_declare_publisher(const z_loaned_session_t *zs, z_owned_publisher_t
if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased);
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
key = _z_keyexpr_from_string(id, &keyexpr_aliased._suffix);
}
}
Expand Down Expand Up @@ -1215,7 +1215,7 @@ z_result_t z_declare_queryable(const z_loaned_session_t *zs, z_owned_queryable_t
if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased);
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
key = _z_rid_with_suffix(id, NULL);
}
}
Expand Down Expand Up @@ -1267,7 +1267,7 @@ z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this),
.encoding = _z_encoding_from_owned(&opts.encoding->_this)};

z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, &keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
z_bytes_drop(payload);
Expand Down Expand Up @@ -1303,7 +1303,7 @@ z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyex

_z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()};

z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, &keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
// Clean-up
Expand Down Expand Up @@ -1377,7 +1377,7 @@ z_result_t z_keyexpr_from_substr(z_owned_keyexpr_t *key, const char *name, size_

z_result_t z_declare_keyexpr(const z_loaned_session_t *zs, z_owned_keyexpr_t *key, const z_loaned_keyexpr_t *keyexpr) {
_z_keyexpr_t k = _z_keyexpr_alias_from_user_defined(*keyexpr, false);
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), k);
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &k);
key->_val = _z_rid_with_suffix(id, NULL);
// we still need to store the original suffix, for user needs
// (for example to compare keys or perform other operations on their string representation).
Expand Down Expand Up @@ -1426,7 +1426,7 @@ z_result_t z_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber
callback->_this._val.context = NULL;

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
_z_keyexpr_t key = _z_keyexpr_alias(keyexpr_aliased);
_z_keyexpr_t key = _z_keyexpr_alias(&keyexpr_aliased);

// TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition
// lacks a way to convey them to later-joining nodes. Thus, in the current version automatic
Expand All @@ -1435,7 +1435,7 @@ z_result_t z_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
bool do_keydecl = true;
_z_keyexpr_t resource_key = _z_keyexpr_alias(keyexpr_aliased);
_z_keyexpr_t resource_key = _z_keyexpr_alias(&keyexpr_aliased);
// Remove wild
char *wild = _z_string_pbrk(&keyexpr_aliased._suffix, "*$");
if ((wild != NULL) && _z_keyexpr_has_suffix(&keyexpr_aliased)) {
Expand All @@ -1450,7 +1450,7 @@ z_result_t z_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber
}
// Declare resource
if (do_keydecl) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), resource_key);
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &resource_key);
key = _z_rid_with_suffix(id, wild);
}
_z_keyexpr_clear(&resource_key);
Expand Down
Loading

0 comments on commit 8dc7d13

Please sign in to comment.