Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary Serialization Integration #16

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Authors

- Gonzalo Casas <<[email protected]>> [@gonzalocasas](https://github.com/gonzalocasas)
- Eleni Vasiliki Alexi <<[email protected]>> [@elenialex](https://github.com/elenialex)
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

* Add binary serialization support to `Transport` class.
* Add classes `MessageCodec`, `JsonMessageCodec`, `BinaryMessageCodec`.
* Add `msgpack` depedency for binary serialization support.

### Changed

### Removed
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
compas>=1.17.6
paho-mqtt >=1, <2
msgpack
4 changes: 4 additions & 0 deletions src/compas_eve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
EchoSubscriber,
Transport,
Topic,
JsonMessageCodec,
BinaryMessageCodec,
get_default_transport,
set_default_transport,
)
Expand All @@ -61,6 +63,8 @@
"Subscriber",
"EchoSubscriber",
"Topic",
"JsonMessageCodec",
"BinaryMessageCodec",
"Transport",
"get_default_transport",
"set_default_transport",
Expand Down
149 changes: 148 additions & 1 deletion src/compas_eve/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from compas.data import json_dumps
from compas.data import json_loads
import msgpack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since msgpack is not available for ironpython, we should either try..except this import or check if compas.IPY (and subsequently disable binary serialization if that's the case).


DEFAULT_TRANSPORT = None

Expand Down Expand Up @@ -32,9 +33,10 @@ def set_default_transport(transport):
class Transport(object):
"""Defines the base interface for different transport implementations."""

def __init__(self, *args, **kwargs):
def __init__(self, codec=None, *args, **kwargs):
super(Transport, self).__init__(*args, **kwargs)
self._id_counter = 0
self.codec = codec or JsonMessageCodec()

@property
def id_counter(self):
Expand All @@ -43,6 +45,7 @@ def id_counter(self):
return self._id_counter

def publish(self, topic, message):
self.codec.encode(message)
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pass has no effect now, should be removed. But perhaps more importantly, this is probably not where we should be calling the encoding. This method is empty because the implementation is on the sub-classes of transport. So, I would leave it empty.


def subscribe(self, topic, callback):
Expand All @@ -58,6 +61,150 @@ def unadvertise(self, topic):
pass


class MessageCodec:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to maintain IronPython compat, the class def should explicitly inherit from object:

Suggested change
class MessageCodec:
class MessageCodec(object):

Once we move fully to cpython, we can simplify this again.

"""
Base class for message codecs.

This class defines the interface for message encoding and decoding.
Subclasses should implement the `encode` and `decode` methods to handle
specific serialization formats.
"""

def encode(self, message):
"""
Encode a message into a serialized format.

Parameters
----------
message : Message or dict
The message to encode.

Returns
-------
bytes or str
The encoded message.

Raises
------
NotImplementedError
If the method is not implemented by a subclass.
"""
raise NotImplementedError

def decode(self, message):
"""
Decode a serialized message back into a `Message` object.

Parameters
----------
message : bytes or str
The serialized message to decode.

Returns
-------
Message
The decoded message.

Raises
------
NotImplementedError
If the method is not implemented by a subclass.
"""
raise NotImplementedError


class JsonMessageCodec(MessageCodec):
"""
Message codec for JSON serialization.

This codec handles encoding and decoding messages using JSON format.
It supports messages that are instances of `Message` or dictionaries.
"""

def encode(self, message):
"""
Encode a message into a JSON string.

Parameters
----------
message : Message or dict
The message to encode.

Returns
-------
str
The JSON-encoded message.
"""
if isinstance(message, Message):
data = message.data
else:
data = message
Comment on lines +138 to +141
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic of detecting the message type is a tad more complex (there's one more case to handle) and it should be handled centrally, so I would suggest we change Topic._message_to_json to Topic._message_to_data with the following implementation:

    def _message_to_data(self, message):
        """Convert a message to a data representation ready to be encoded.

        Normally, this method expects sub-classes of ``Message`` as input.
        However, it can deal with regular dictionaries as well as classes
        implementing the COMPAS data framework.
        """
        try:
            data = message.data
        except (KeyError, AttributeError):
            try:
                data = message.__data__
            except (KeyError, AttributeError):
                data = dict(message)
        return data

and this method can be much simpler:

    def encode(self, data):
        return json_dumps(data)

... (cont'd)

return json_dumps(data)

def decode(self, message):
"""
Decode a JSON string back into a `Message` object.

Parameters
----------
message : bytes or str
The JSON-encoded message to decode.

Returns
-------
Message
The decoded message.
"""
data = json_loads(message.decode("utf-8"))
return Message(**data)


class BinaryMessageCodec(MessageCodec):
"""
Message codec for binary serialization using MessagePack.

This codec handles encoding and decoding messages using MessagePack.
"""

def encode(self, message):
"""
Encode a message into a binary format using MessagePack.

Parameters
----------
message : Message or dict
The message to encode.

Returns
-------
bytes
The MessagePack-encoded message.
"""

if isinstance(message, Message):
data = message.data
else:
data = message
return msgpack.packb(data)
Comment on lines +184 to +188
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and as mentioned in the other encode, this would also change to just do return msgpack.packb(data)


def decode(self, message):
"""
Decode a MessagePack binary message back into a `Message` object.

Parameters
----------
message : bytes
The MessagePack-encoded message to decode.

Returns
-------
Message
The decoded message.
"""
data = msgpack.unpackb(message)
return Message(**data)


class Message(object):
"""Message objects used for publishing and subscribing to/from topics.

Expand Down
11 changes: 7 additions & 4 deletions src/compas_eve/mqtt/mqtt_paho.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def publish(self, topic, message):
"""

def _callback(**kwargs):
json_message = topic._message_to_json(message)
self.client.publish(topic.name, json_message)
encoded_message = self.codec.encode(message)
self.client.publish(topic.name, encoded_message)
Comment on lines +63 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the comments above, this would change to call the conversion to data from the topic, and then use the codec for encoding:

Suggested change
encoded_message = self.codec.encode(message)
self.client.publish(topic.name, encoded_message)
data = topic._message_to_data(message)
encoded_message = self.codec.encode(data)
self.client.publish(topic.name, encoded_message)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same would need to be done in mqtt_cli.py for this to work on IronPython


self.on_ready(_callback)

Expand All @@ -87,8 +87,11 @@ def subscribe(self, topic, callback):
subscribe_id = "{}:{}".format(event_key, id(callback))

def _local_callback(msg):
msg = topic._message_from_json(msg.payload.decode())
callback(msg)
decoded_message = self.codec.decode(msg.payload)
callback(decoded_message)

# msg = topic._message_from_json(msg.payload.decode())
# callback(msg)
Comment on lines +90 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling decoding needs to follow the same approach as the one commented on the encoding, because there's this custom type parsing supported in there, so, we need to change Topic._message_from_json to this:

    def _message_from_data(self, data):
        """Converts a decoded data object back into a message instance."""
        return self.message_type.parse(data)

and this needs to change a bit as well:

Suggested change
decoded_message = self.codec.decode(msg.payload)
callback(decoded_message)
# msg = topic._message_from_json(msg.payload.decode())
# callback(msg)
decoded_data = self.codec.decode(msg.payload.decode())
decoded_message = topic._message_from_data(decoded_data)
callback(decoded_message)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the mqtt_cli.py would change as well to this sequence of calls


def _subscribe_callback(**kwargs):
self.client.subscribe(topic.name)
Expand Down
Loading