Skip to content

Commit

Permalink
added support for writer/reader append/append_bounded functions
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Aug 9, 2024
1 parent 31adb5e commit 83f7a66
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/commons.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ Sample
Logging
-------

.. doxygenfunction:: init_logger
.. doxygenfunction:: init_logging
25 changes: 24 additions & 1 deletion examples/simple/universal/z_simple.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct CustomStruct {
// Example of codec for a custom class / struct
// We need to define corresponding serialize and deserialize methods
struct CustomCodec {
/// @brief Serialize by copying.
static Bytes serialize(const CustomStruct& s) {
Bytes b;
auto writer = b.writer();
Expand All @@ -45,6 +46,26 @@ struct CustomCodec {
return b;
}

/// @brief Serialize by consuming (only applies to string field).
static Bytes serialize(CustomStruct&& s) {
Bytes b;
auto writer = b.writer();
writer.write_all(serialize_arithmetic(s.u).data(), 4);
writer.write_all(serialize_arithmetic(s.d).data(), 8);
writer.append(std::move(s.s));
return b;
}

/// @brief Serialize by aliasing (only applies to string field).
static Bytes serialize(std::shared_ptr<CustomStruct> s) {
Bytes b;
auto writer = b.writer();
writer.write_all(serialize_arithmetic(s->u).data(), 4);
writer.write_all(serialize_arithmetic(s->d).data(), 8);
writer.append(Bytes::serialize(s->s, ZenohCodec(std::move(s))));
return b;
}

// deserialize should be a template method
template <class T>
static T deserialize(const Bytes& b, ZResult* err = nullptr);
Expand Down Expand Up @@ -114,6 +135,8 @@ class CustomPublisher {
}

void put(const CustomStruct& s) { _pub.put(Bytes::serialize(s, CustomCodec())); }
void put(CustomStruct&& s) { _pub.put(Bytes::serialize(std::move(s), CustomCodec())); }
void put(std::shared_ptr<CustomStruct> s) { _pub.put(Bytes::serialize(std::move(s), CustomCodec())); }

private:
Publisher _pub;
Expand Down Expand Up @@ -145,7 +168,7 @@ int main(int, char**) {
CustomPublisher pub(session, keyexpr);
CustomSubscriber sub(session, keyexpr);

pub.put({0, 0.5, "abc"});
pub.put(CustomStruct{0, 0.5, "abc"});
std::this_thread::sleep_for(1s); /// wait a bit to receive the message

} catch (std::exception& e) {
Expand Down
58 changes: 49 additions & 9 deletions include/zenoh/api/bytes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,30 @@ class Bytes : public Owned<::z_owned_bytes_t> {
void seek_from_end(int64_t offset, ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(::z_bytes_reader_seek(&this->_0, offset, SEEK_END), err, "seek_from_end failed");
}

/// @brief Read bounded data previously written by ``Bytes::Writer::write_bounded``.
///
/// @param res if not null, the result code will be written to this location, otherwise ZException exception
/// will be thrown in case of error.
Bytes read_bounded(ZResult* err = nullptr) {
Bytes b;
__ZENOH_RESULT_CHECK(
::z_bytes_reader_read_bounded(&this->_0, &b._0),
err,
"Failed to append data with boundaries"
);
return b;
}
};

/// @brief Create data reader.
/// @return reader instance.
Reader reader() const { return Reader(::z_bytes_get_reader(this->loan())); }

/// @brief A writer for Zenoh-serialized data.
class Writer : public Owned<::z_owned_bytes_writer_t> {
class Writer : public Copyable<::z_bytes_writer_t> {
public:
using Owned::Owned;
using Copyable::Copyable;

/// @name Methods

Expand All @@ -215,7 +229,32 @@ class Bytes : public Owned<::z_owned_bytes_t> {
/// @param res if not null, the result code will be written to this location, otherwise ZException exception
/// will be thrown in case of error.
void write_all(const uint8_t* src, size_t len, ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(::z_bytes_writer_write_all(this->loan(), src, len), err, "Failed to write data");
__ZENOH_RESULT_CHECK(::z_bytes_writer_write_all(&this->_0, src, len), err, "Failed to write data");
}


/// @brief Appends another `Bytes` instance.
/// This allows to compose a serialized data out of multiple `Bytes` that may point to different memory regions.
/// Said in other terms, it allows to create a linear view on different memory regions without copy.
///
/// @param data data to append.
/// @param res if not null, the result code will be written to this location, otherwise ZException exception
/// will be thrown in case of error.
void append(Bytes&& data, ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(::z_bytes_writer_append(&this->_0, z_move(data._0)), err, "Failed to append data");
}

/// @brief Append another `Bytes` instance, with boundaries information. It would allow to read the same piece of data using ``Bytes::reader::read_bounded``.
///
/// @param data data to append.
/// @param res if not null, the result code will be written to this location, otherwise ZException exception
/// will be thrown in case of error.
void append_bounded(Bytes&& data, ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(
::z_bytes_writer_append_bounded(&this->_0, z_move(data._0)),
err,
"Failed to append data with boundaries"
);
}
};

Expand All @@ -225,9 +264,7 @@ class Bytes : public Owned<::z_owned_bytes_t> {
/// a given moment of time for a given ``Bytes`` instance.
/// @return writer instance.
Writer writer() {
Writer w(nullptr);
::z_bytes_get_writer(this->loan(), detail::as_owned_c_ptr(w));
return w;
return Writer(::z_bytes_get_writer(this->loan()));
}
};

Expand Down Expand Up @@ -427,12 +464,15 @@ auto make_owned_slice(uint8_t* data, size_t len, Deleter&& d) {
return OwnedSlice<std::remove_reference_t<Deleter>>{data, len, std::forward<Deleter>(d)};
};

/// @brief Default codec for Zenoh serialization / deserialziation
class ZenohCodec {
std::shared_ptr<void> _alias_guard;

ZenohCodec(std::shared_ptr<void> alias_guard) :_alias_guard(std::move(alias_guard)) {}
public:
ZenohCodec() :_alias_guard(nullptr) {}
/// @brief Consturctor
/// @param alias_guard optional alias guard. If null the data passed by const reference will copied,
/// otherwise it will be aliased instead and a copy if alias_guard will be added to all serialized
/// ``Bytes`` instances, to ensure that aliased data outlives them.
ZenohCodec(std::shared_ptr<void> alias_guard = nullptr) :_alias_guard(std::move(alias_guard)) {}

/// @brief Serialize pointer and length by copying.
Bytes serialize(const Slice& s) const {
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh/api/logging.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace zenoh {
/// User may set environment variable RUST_LOG to values *debug* | *info* | *warn* | *error* to show diagnostic output.
///
/// @note zenoh-c only
inline void init_logger() { ::zc_init_logger(); }
inline void init_logging() { ::zc_init_logging(); }
#endif

}
88 changes: 88 additions & 0 deletions tests/universal/bytes.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,59 @@ void serde_shared() {
assert(mu_ptr.use_count() == 1);
}

void reader_writer_append() {
std::cout << "running reader_writer_append\n";
std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
std::vector<uint8_t> data2 = {11, 12, 13, 14};
Bytes b;
{
auto writer = b.writer();
writer.write_all(data.data(), 5);
writer.write_all(data.data() + 5, 5);
writer.append(data2);
}

auto reader = b.reader();
std::vector<uint8_t> out(3);
assert(reader.read(out.data(), 3) == 3);
assert(out == std::vector<uint8_t>(data.begin(), data.begin() + 3));
out = std::vector<uint8_t>(7);
assert(reader.read(out.data(), 7) == 7);
assert(out == std::vector<uint8_t>(data.begin() + 3, data.end()));

out = std::vector<uint8_t>(4);
assert(reader.read(out.data(), 4) == 4);
assert(out == data2);
assert(reader.read(out.data(), 1) == 0); // reached the end of the payload
}

void reader_writer_append_bounded() {
std::cout << "running reader_writer_append_bounded\n";
std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
std::string s = "abcd";
float f = 0.5f;
Bytes b;
{
auto writer = b.writer();
writer.write_all(data.data(), 5);
writer.write_all(data.data() + 5, 5);
writer.append_bounded(s);
writer.append_bounded(f);
}

auto reader = b.reader();
std::vector<uint8_t> out(3);
assert(reader.read(out.data(), 3) == 3);
assert(out == std::vector<uint8_t>(data.begin(), data.begin() + 3));
out = std::vector<uint8_t>(7);
assert(reader.read(out.data(), 7) == 7);
assert(out == std::vector<uint8_t>(data.begin() + 3, data.end()));

assert(reader.read_bounded().deserialize<std::string>() == s);
assert(reader.read_bounded().deserialize<float>() == f);
assert(reader.read(out.data(), 1) == 0); // reached the end of the payload
}



struct CustomStruct {
Expand All @@ -217,6 +270,24 @@ struct CustomCodec {
return b;
}

static Bytes serialize(CustomStruct&& s) {
Bytes b;
auto writer = b.writer();
writer.write_all(serialize_arithmetic(s.u).data(), 4);
writer.write_all(serialize_arithmetic(s.d).data(), 8);
writer.append(std::move(s.s));
return b;
}

static Bytes serialize(std::shared_ptr<CustomStruct> s) {
Bytes b;
auto writer = b.writer();
writer.write_all(serialize_arithmetic(s->u).data(), 4);
writer.write_all(serialize_arithmetic(s->d).data(), 8);
writer.append(Bytes::serialize(s->s, ZenohCodec(s)));
return b;
}

// deserialize should be a template method
template <class T>
static T deserialize(const Bytes& b, ZResult* err = nullptr);
Expand Down Expand Up @@ -290,6 +361,20 @@ void serde_custom() {
assert(s.d == out.d);
assert(s.u == out.u);
assert(s.s == out.s);

CustomStruct s2 = s;

b = Bytes::serialize(std::move(s2), CustomCodec());
out = b.deserialize<CustomStruct>(CustomCodec());
assert(s.d == out.d);
assert(s.u == out.u);
assert(s.s == out.s);

b = Bytes::serialize(std::make_shared<CustomStruct>(s), CustomCodec());
out = b.deserialize<CustomStruct>(CustomCodec());
assert(s.d == out.d);
assert(s.u == out.u);
assert(s.s == out.s);
}

int main(int argc, char** argv) {
Expand All @@ -299,5 +384,8 @@ int main(int argc, char** argv) {
serde_iter();
serde_advanced();
serde_shared();

reader_writer_append();
reader_writer_append_bounded();
serde_custom();
}
2 changes: 1 addition & 1 deletion tests/zenohc/config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void test_config_to_string() {
}

int main(int argc, char** argv) {
init_logger();
init_logging();
test_config_client();
test_config_peer();
test_config_to_string();
Expand Down

0 comments on commit 83f7a66

Please sign in to comment.