From eb300c9855a8c076d47dd7f45cd7306155a15c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ted=20Sj=C3=B6blom?= <36690474+TedSjoblom@users.noreply.github.com> Date: Fri, 28 Apr 2023 07:41:58 +0200 Subject: [PATCH 1/4] Added rate limit --- README.md | 26 ++++++++++++++++++++++++++ main.py | 2 ++ 2 files changed, 28 insertions(+) diff --git a/README.md b/README.md index 1ef270f..7edb072 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,32 @@ Output minimized example: } ``` +## Docker compose example + +```yml + crowsnest-processor-radar-0: + image: ghcr.io/mo-rise/crowsnest-processor-opendlv-radar:0.1.17 + container_name: cw-radar-processor-0 + restart: unless-stopped + network_mode: "host" + deploy: + resources: + limits: + memory: 1024M + environment: + - CLUON_CID=65 + - CLUON_ENVELOPE_ID=1201 + - MQTT_BROKER_HOST=localhost + - MQTT_BROKER_PORT=1883 + - MQTT_BASE_TOPIC=CROWSNEST/SEAHORSE/RADAR/0/SWEEP + - RADAR_MIN_READING_WEIGHT=0 + - RADAR_SWEEP_ANGULAR_SUBSETTING=4 + - RADAR_SWEEP_RADIAL_SUBSETTING=4 + - RADAR_MAX_UPDATE_FREQUENCY=1 +``` + + + ## Development setup To setup the development environment: diff --git a/main.py b/main.py index 4cfa629..d1edb1c 100644 --- a/main.py +++ b/main.py @@ -33,6 +33,7 @@ RADAR_MIN_READING_WEIGHT = env.int("RADAR_MIN_READING_WEIGHT", 0) RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 10) RADAR_SWEEP_RADIAL_SUBSETTING = env.int("RADAR_SWEEP_RADIAL_SUBSETTING", 2) +RADAR_MAX_UPDATE_FREQUENCY = env.int("RADAR_MAX_UPDATE_FREQUENCY", 1) LOG_LEVEL = env.log_level("LOG_LEVEL", logging.DEBUG) @@ -224,6 +225,7 @@ def to_mqtt(envelope: Envelope): source.map(unpack_spoke) .filter(not_empty) .latest() # Drop anything we dont manage to process... + .rate_limit(1 / RADAR_MAX_UPDATE_FREQUENCY) .starmap(polar_to_cartesian) .starmap(buffer_to_full_360_view) .filter(not_empty) From 4890c2a2067a25a4bf3fedb42068d7dbafba8dac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ted=20Sj=C3=B6blom?= <36690474+TedSjoblom@users.noreply.github.com> Date: Fri, 28 Apr 2023 07:44:03 +0200 Subject: [PATCH 2/4] Update main.py --- main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index d1edb1c..bc3beb5 100644 --- a/main.py +++ b/main.py @@ -225,9 +225,10 @@ def to_mqtt(envelope: Envelope): source.map(unpack_spoke) .filter(not_empty) .latest() # Drop anything we dont manage to process... - .rate_limit(1 / RADAR_MAX_UPDATE_FREQUENCY) .starmap(polar_to_cartesian) .starmap(buffer_to_full_360_view) + .latest() # Drop anything we dont manage to process... + .rate_limit(1 / RADAR_MAX_UPDATE_FREQUENCY) .filter(not_empty) .starmap(to_brefv) .sink(to_mqtt) From 21394c4350731afc128b67a4d0a08c7aa56742a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ted=20Sj=C3=B6blom?= <36690474+TedSjoblom@users.noreply.github.com> Date: Thu, 4 May 2023 14:44:12 +0200 Subject: [PATCH 3/4] latest removed --- brefv/envelope.py | 25 +++++++++++++++++++++ brefv/messages/__init__.py | 3 +++ brefv/messages/angle.py | 11 ++++++++++ brefv/messages/angle_rate.py | 13 +++++++++++ brefv/messages/angular_velocity.py | 19 ++++++++++++++++ brefv/messages/attitude.py | 19 ++++++++++++++++ brefv/messages/error_ellipse.py | 25 +++++++++++++++++++++ brefv/messages/force.py | 19 ++++++++++++++++ brefv/messages/linear_velocity.py | 19 ++++++++++++++++ brefv/messages/location.py | 19 ++++++++++++++++ brefv/messages/moment.py | 19 ++++++++++++++++ brefv/messages/playback.py | 22 +++++++++++++++++++ brefv/messages/point_cloud.py | 18 +++++++++++++++ brefv/messages/pose.py | 17 +++++++++++++++ brefv/messages/position.py | 19 ++++++++++++++++ brefv/messages/rotation.py | 19 ++++++++++++++++ brefv/messages/rpm.py | 13 +++++++++++ brefv/messages/speed.py | 11 ++++++++++ brefv/messages/twist.py | 17 +++++++++++++++ brefv/messages/wrench.py | 17 +++++++++++++++ docker-compose.radar.yml | 33 ++++++++++++++++++++++++++++ main.py | 35 +++++++++++++----------------- 22 files changed, 392 insertions(+), 20 deletions(-) create mode 100644 brefv/envelope.py create mode 100644 brefv/messages/__init__.py create mode 100644 brefv/messages/angle.py create mode 100644 brefv/messages/angle_rate.py create mode 100644 brefv/messages/angular_velocity.py create mode 100644 brefv/messages/attitude.py create mode 100644 brefv/messages/error_ellipse.py create mode 100644 brefv/messages/force.py create mode 100644 brefv/messages/linear_velocity.py create mode 100644 brefv/messages/location.py create mode 100644 brefv/messages/moment.py create mode 100644 brefv/messages/playback.py create mode 100644 brefv/messages/point_cloud.py create mode 100644 brefv/messages/pose.py create mode 100644 brefv/messages/position.py create mode 100644 brefv/messages/rotation.py create mode 100644 brefv/messages/rpm.py create mode 100644 brefv/messages/speed.py create mode 100644 brefv/messages/twist.py create mode 100644 brefv/messages/wrench.py create mode 100644 docker-compose.radar.yml diff --git a/brefv/envelope.py b/brefv/envelope.py new file mode 100644 index 0000000..2cb20b3 --- /dev/null +++ b/brefv/envelope.py @@ -0,0 +1,25 @@ +# generated by datamodel-codegen: +# filename: envelope.json +# timestamp: 2023-04-28T09:01:04+00:00 + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Union + +from pydantic import BaseModel, Extra, Field + + +class Envelope(BaseModel): + class Config: + extra = Extra.forbid + + sent_at: datetime = Field( + ..., + description='Date and time when the message was sent from the application processing the sensor data, expressed according to the ISO 8601 standard.', + examples=['2021-10-07T20:20:39.345+00:00'], + title='Sent at', + ) + message: Union[float, str, bool, Dict[str, Any], List[Any]] = Field( + ..., description='The message contained by this envelope.', title='Message' + ) diff --git a/brefv/messages/__init__.py b/brefv/messages/__init__.py new file mode 100644 index 0000000..33c8e0a --- /dev/null +++ b/brefv/messages/__init__.py @@ -0,0 +1,3 @@ +# generated by datamodel-codegen: +# filename: messages +# timestamp: 2023-04-28T09:01:10+00:00 diff --git a/brefv/messages/angle.py b/brefv/messages/angle.py new file mode 100644 index 0000000..cf74128 --- /dev/null +++ b/brefv/messages/angle.py @@ -0,0 +1,11 @@ +# generated by datamodel-codegen: +# filename: angle.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class Angle(BaseModel): + __root__: float = Field(..., description='Angular reading [deg]', title='Angle') diff --git a/brefv/messages/angle_rate.py b/brefv/messages/angle_rate.py new file mode 100644 index 0000000..bad879c --- /dev/null +++ b/brefv/messages/angle_rate.py @@ -0,0 +1,13 @@ +# generated by datamodel-codegen: +# filename: angle_rate.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class AngleRate(BaseModel): + __root__: float = Field( + ..., description='Angular rate reading [deg/min]', title='Angle rate' + ) diff --git a/brefv/messages/angular_velocity.py b/brefv/messages/angular_velocity.py new file mode 100644 index 0000000..b097c50 --- /dev/null +++ b/brefv/messages/angular_velocity.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: angular_velocity.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class AngularVelocity(BaseModel): + __root__: List[float] = Field( + ..., + description="Angular velocity [Yaw-rate, Pitch-rate, Roll-rate] (rad/s) of a body with respect to the body's BF frame of reference.", + max_items=3, + min_items=3, + title='Angular Velocity', + ) diff --git a/brefv/messages/attitude.py b/brefv/messages/attitude.py new file mode 100644 index 0000000..078991f --- /dev/null +++ b/brefv/messages/attitude.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: attitude.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Attitude(BaseModel): + __root__: List[float] = Field( + ..., + description="Attitude [Yaw, Pitch, Roll] (radians) of a body with respect to the body's NED frame of reference.", + max_items=3, + min_items=3, + title='Attitude', + ) diff --git a/brefv/messages/error_ellipse.py b/brefv/messages/error_ellipse.py new file mode 100644 index 0000000..9e66533 --- /dev/null +++ b/brefv/messages/error_ellipse.py @@ -0,0 +1,25 @@ +# generated by datamodel-codegen: +# filename: error_ellipse.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class ErrorEllipse(BaseModel): + semi_major_axis: float = Field( + ..., + description="Semi-major axis of the GNSS sensor's error ellipse for 1 standard deviation", + title='Semi-major axis', + ) + semi_minor_axis: float = Field( + ..., + description="Semi-minor axis of the GNSS sensor's error ellipse for 1 standard deviation", + title='Semi-minor axis', + ) + azimuth: float = Field( + ..., + description="Azimuth of the semi-major axis of the GNSS sensor's error ellipse with respect to True North in degrees", + title='Azimuth', + ) diff --git a/brefv/messages/force.py b/brefv/messages/force.py new file mode 100644 index 0000000..eba8e37 --- /dev/null +++ b/brefv/messages/force.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: force.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Force(BaseModel): + __root__: List[float] = Field( + ..., + description="Force [Fx, Fy, Fz] (N) acting on a body with respect to the body's BF frame of reference.", + max_items=3, + min_items=3, + title='Force', + ) diff --git a/brefv/messages/linear_velocity.py b/brefv/messages/linear_velocity.py new file mode 100644 index 0000000..4e0332b --- /dev/null +++ b/brefv/messages/linear_velocity.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: linear_velocity.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class LinearVelocity(BaseModel): + __root__: List[float] = Field( + ..., + description="Linear velocity [Vx, Vy, Vz] (m/s) of a body with respect to the body's BF frame of reference.", + max_items=3, + min_items=3, + title='Linear Velocity', + ) diff --git a/brefv/messages/location.py b/brefv/messages/location.py new file mode 100644 index 0000000..6318da7 --- /dev/null +++ b/brefv/messages/location.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: location.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Location(BaseModel): + __root__: List[float] = Field( + ..., + description='A location [x, y, z] [m] relative to a body´s geometric center, expressed in the BF frame of reference', + max_items=3, + min_items=3, + title='Location', + ) diff --git a/brefv/messages/moment.py b/brefv/messages/moment.py new file mode 100644 index 0000000..9ddf0ed --- /dev/null +++ b/brefv/messages/moment.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: moment.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Moment(BaseModel): + __root__: List[float] = Field( + ..., + description="Moment [Mx, My, Mz] (Nm) acting on a body and with respect to the body's BF frame of reference.", + max_items=3, + min_items=3, + title='Moment', + ) diff --git a/brefv/messages/playback.py b/brefv/messages/playback.py new file mode 100644 index 0000000..8ec2ce9 --- /dev/null +++ b/brefv/messages/playback.py @@ -0,0 +1,22 @@ +# generated by datamodel-codegen: +# filename: playback.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, Field + + +class PlaybackCommand(BaseModel): + playback_speed: float = Field( + ..., + description='Reverse is number under zero. Zero is paused. Above zero is moving forward and 1 is seen as normal speed', + title='Playback speed', + ) + start_from: Optional[float] = Field( + None, + description='Start frame where position from where the playback will start or continue from', + title='Start from', + ) diff --git a/brefv/messages/point_cloud.py b/brefv/messages/point_cloud.py new file mode 100644 index 0000000..3d2ff5d --- /dev/null +++ b/brefv/messages/point_cloud.py @@ -0,0 +1,18 @@ +# generated by datamodel-codegen: +# filename: point_cloud.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class PointCloud(BaseModel): + __root__: List = Field( + ..., + description='A cloud of 3D points as a list of (x,y,z) tuples [m] in a right-handed, cartesian frame of reference.', + examples=[[0.0, 0.0, 0.0], [0.0, 1.0, 0.0]], + title='Point cloud', + ) diff --git a/brefv/messages/pose.py b/brefv/messages/pose.py new file mode 100644 index 0000000..0a6b90c --- /dev/null +++ b/brefv/messages/pose.py @@ -0,0 +1,17 @@ +# generated by datamodel-codegen: +# filename: pose.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Extra + +from . import attitude, position + + +class Pose(BaseModel): + class Config: + extra = Extra.forbid + + position: position.Position + attitude: attitude.Attitude diff --git a/brefv/messages/position.py b/brefv/messages/position.py new file mode 100644 index 0000000..58c0a0f --- /dev/null +++ b/brefv/messages/position.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: position.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Position(BaseModel): + __root__: List[float] = Field( + ..., + description='Position [longitude, latitude, altitude] [degrees, degrees, m] of a body with respect to the WGS84 frame of reference. Altitude is the height of the body above the WGS84 ellipsoid.', + max_items=3, + min_items=3, + title='Position', + ) diff --git a/brefv/messages/rotation.py b/brefv/messages/rotation.py new file mode 100644 index 0000000..2994b7e --- /dev/null +++ b/brefv/messages/rotation.py @@ -0,0 +1,19 @@ +# generated by datamodel-codegen: +# filename: rotation.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from typing import List + +from pydantic import BaseModel, Field + + +class Rotation(BaseModel): + __root__: List[float] = Field( + ..., + description='A rotation [roll, pitch, yaw] [radians] expressed in the BF frame of reference given as Euler angles according to the YPR convention', + max_items=3, + min_items=3, + title='Rotation', + ) diff --git a/brefv/messages/rpm.py b/brefv/messages/rpm.py new file mode 100644 index 0000000..38d3bbb --- /dev/null +++ b/brefv/messages/rpm.py @@ -0,0 +1,13 @@ +# generated by datamodel-codegen: +# filename: rpm.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class RPM(BaseModel): + __root__: float = Field( + ..., description='RPM reading [Revolution Per Minute]', title='RPM' + ) diff --git a/brefv/messages/speed.py b/brefv/messages/speed.py new file mode 100644 index 0000000..4a1da38 --- /dev/null +++ b/brefv/messages/speed.py @@ -0,0 +1,11 @@ +# generated by datamodel-codegen: +# filename: speed.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class Speed(BaseModel): + __root__: float = Field(..., description='Speed reading [knots]', title='Speed') diff --git a/brefv/messages/twist.py b/brefv/messages/twist.py new file mode 100644 index 0000000..ba4bd49 --- /dev/null +++ b/brefv/messages/twist.py @@ -0,0 +1,17 @@ +# generated by datamodel-codegen: +# filename: twist.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Extra + +from . import angular_velocity, linear_velocity + + +class Twist(BaseModel): + class Config: + extra = Extra.forbid + + linear_velocity: linear_velocity.LinearVelocity + angular_velocity: angular_velocity.AngularVelocity diff --git a/brefv/messages/wrench.py b/brefv/messages/wrench.py new file mode 100644 index 0000000..0e19f29 --- /dev/null +++ b/brefv/messages/wrench.py @@ -0,0 +1,17 @@ +# generated by datamodel-codegen: +# filename: wrench.json +# timestamp: 2023-04-28T09:01:10+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Extra + +from . import force, moment + + +class Wrench(BaseModel): + class Config: + extra = Extra.forbid + + force: force.Force + moment: moment.Moment diff --git a/docker-compose.radar.yml b/docker-compose.radar.yml new file mode 100644 index 0000000..7e74676 --- /dev/null +++ b/docker-compose.radar.yml @@ -0,0 +1,33 @@ +version: '3' +services: + + crowsnest-processor-radar-0: + build: . + # image: ghcr.io/mo-rise/crowsnest-processor-opendlv-radar:0.1.17 + container_name: cw-radar-processor-0 + restart: unless-stopped + network_mode: "host" + deploy: + resources: + limits: + memory: 1024M + environment: + # Session ID + - CLUON_CID=65 + # Radar outputs two images diffrensiated by ID (Deafult: 1201 & 1202) + - CLUON_ENVELOPE_ID=1201 + + # Filtering clouon input messages (1=every spoke, 2=50% of spokes...) + - RADAR_INPUT_SLICE=1 + # Radar return filter by signal return strength + - RADAR_MIN_READING_WEIGHT=0 + # Angular subsetting (1=every spoke, 2=50% of spokes...) + - RADAR_SWEEP_ANGULAR_SUBSETTING=1 + # Radial subsetting (1=every spoke element, 2=50% of spoke elements...) + - RADAR_SWEEP_RADIAL_SUBSETTING=1 + + # Pushing radar sweep to MQTT + - MQTT_BROKER_HOST=localhost + - MQTT_BROKER_PORT=1883 + - MQTT_BASE_TOPIC=CROWSNEST/SEAHORSE/RADAR/0/SWEEP + diff --git a/main.py b/main.py index bc3beb5..f71a1bf 100644 --- a/main.py +++ b/main.py @@ -17,25 +17,25 @@ # Reading config from environment variables env = Env() -MQTT_BROKER_HOST = env("MQTT_BROKER_HOST") +MQTT_BROKER_HOST = env("MQTT_BROKER_HOST", "localhost") MQTT_BROKER_PORT = env.int("MQTT_BROKER_PORT", 1883) MQTT_CLIENT_ID = env("MQTT_CLIENT_ID", None) MQTT_TRANSPORT = env("MQTT_TRANSPORT", "tcp") MQTT_TLS = env.bool("MQTT_TLS", False) MQTT_USER = env("MQTT_USER", None) MQTT_PASSWORD = env("MQTT_PASSWORD", None) -MQTT_BASE_TOPIC = env("MQTT_BASE_TOPIC") +MQTT_BASE_TOPIC = env("MQTT_BASE_TOPIC", "CROWSNEST/SEAHORSE/RADAR/0/SWEEP") -CLUON_CID = env.int("CLUON_CID", 111) +CLUON_CID = env.int("CLUON_CID", 65) CLUON_ENVELOPE_ID = env.int("CLUON_ENVELOPE_ID", 1201) RADAR_ATTITUDE: list = env.list("RADAR_ATTITUDE", [0, 0, 0], subcast=float, validate=lambda x: len(x) == 3) RADAR_MIN_READING_WEIGHT = env.int("RADAR_MIN_READING_WEIGHT", 0) -RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 10) +RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 1) RADAR_SWEEP_RADIAL_SUBSETTING = env.int("RADAR_SWEEP_RADIAL_SUBSETTING", 2) -RADAR_MAX_UPDATE_FREQUENCY = env.int("RADAR_MAX_UPDATE_FREQUENCY", 1) +RADAR_INPUT_SLICE = env.int("RADAR_INPUT_SLICE", 2) -LOG_LEVEL = env.log_level("LOG_LEVEL", logging.DEBUG) +LOG_LEVEL = env.log_level("LOG_LEVEL", logging.WARNING) ## Import and generate code for message specifications radar_message_spec = import_odvd("radar.odvd") @@ -83,15 +83,14 @@ def unpack_spoke(envelope: cEnvelope) -> Tuple[float, np.ndarray, np.ndarray]: radar_message.ParseFromString(envelope.serialized_data) str_id = envelope.sender_stamp - # str_time = str(envelope.sender_timestamp) - + if CLUON_ENVELOPE_ID == str_id: LOGGER.info("Sender ID: %s", str_id) - # Unpack message azimuth = decode_azimuth(int(radar_message.azimuth)) radar_range = radar_message.range + spoke_data = np.frombuffer(radar_message.data, dtype=np.uint8) LOGGER.debug( @@ -118,11 +117,12 @@ def unpack_spoke(envelope: cEnvelope) -> Tuple[float, np.ndarray, np.ndarray]: spoke_data, ) else: - LOGGER.info("Not handled sender ID: %s", str_id) + LOGGER.info("Not handled sender ID: %s", str_id) except Exception: # pylint: disable=broad-except LOGGER.exception("Exception when unpacking a radar message") - return None + + return None def polar_to_cartesian(azimuth: float, distances: np.ndarray, weights: np.ndarray) -> Tuple[float, np.ndarray]: @@ -132,10 +132,6 @@ def polar_to_cartesian(azimuth: float, distances: np.ndarray, weights: np.ndarra x = distances * np.cos(np.deg2rad(azimuth)) # pylint: disable=invalid-name y = distances * np.sin(np.deg2rad(azimuth)) # pylint: disable=invalid-name - # Distance correction (Do not know why...) - # x = x * 1.852 - # y = y * 1.852 - points = np.column_stack((y, x)) return azimuth, points, weights @@ -217,18 +213,16 @@ def to_mqtt(envelope: Envelope): if __name__ == "__main__": - # Setup pipeline source = Stream() pipe = ( - source.map(unpack_spoke) + source + .slice(step=RADAR_INPUT_SLICE) # Drop if processing power is not enugh... + .map(unpack_spoke) .filter(not_empty) - .latest() # Drop anything we dont manage to process... .starmap(polar_to_cartesian) .starmap(buffer_to_full_360_view) - .latest() # Drop anything we dont manage to process... - .rate_limit(1 / RADAR_MAX_UPDATE_FREQUENCY) .filter(not_empty) .starmap(to_brefv) .sink(to_mqtt) @@ -244,4 +238,5 @@ def to_mqtt(envelope: Envelope): session = OD4Session(CLUON_CID) session.add_data_trigger(1201, source.emit) + mq.loop_forever() From 3d53d6035f3a0f8a05a97f478e72005e4f016048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ted=20Sj=C3=B6blom?= <36690474+TedSjoblom@users.noreply.github.com> Date: Thu, 4 May 2023 21:18:26 +0200 Subject: [PATCH 4/4] Second-main.py --- docker-compose.radar.yml | 2 +- main.py | 2 +- main_1.py | 242 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 main_1.py diff --git a/docker-compose.radar.yml b/docker-compose.radar.yml index 7e74676..3610079 100644 --- a/docker-compose.radar.yml +++ b/docker-compose.radar.yml @@ -16,7 +16,7 @@ services: - CLUON_CID=65 # Radar outputs two images diffrensiated by ID (Deafult: 1201 & 1202) - CLUON_ENVELOPE_ID=1201 - + # Filtering clouon input messages (1=every spoke, 2=50% of spokes...) - RADAR_INPUT_SLICE=1 # Radar return filter by signal return strength diff --git a/main.py b/main.py index f71a1bf..fbceadb 100644 --- a/main.py +++ b/main.py @@ -31,7 +31,7 @@ RADAR_ATTITUDE: list = env.list("RADAR_ATTITUDE", [0, 0, 0], subcast=float, validate=lambda x: len(x) == 3) RADAR_MIN_READING_WEIGHT = env.int("RADAR_MIN_READING_WEIGHT", 0) -RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 1) +RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 2) RADAR_SWEEP_RADIAL_SUBSETTING = env.int("RADAR_SWEEP_RADIAL_SUBSETTING", 2) RADAR_INPUT_SLICE = env.int("RADAR_INPUT_SLICE", 2) diff --git a/main_1.py b/main_1.py new file mode 100644 index 0000000..d46533c --- /dev/null +++ b/main_1.py @@ -0,0 +1,242 @@ +"""Main entrypoint for this application""" +from datetime import datetime, timezone +from functools import lru_cache +from typing import Tuple +import logging +import warnings + +import numpy as np +from environs import Env +from streamz import Stream +from paho.mqtt.client import Client as MQTT + +from pycluon import OD4Session, Envelope as cEnvelope +from pycluon.importer import import_odvd +from brefv.envelope import Envelope + +# Reading config from environment variables +env = Env() + +MQTT_BROKER_HOST = env("MQTT_BROKER_HOST", "localhost") +MQTT_BROKER_PORT = env.int("MQTT_BROKER_PORT", 1883) +MQTT_CLIENT_ID = env("MQTT_CLIENT_ID", None) +MQTT_TRANSPORT = env("MQTT_TRANSPORT", "tcp") +MQTT_TLS = env.bool("MQTT_TLS", False) +MQTT_USER = env("MQTT_USER", None) +MQTT_PASSWORD = env("MQTT_PASSWORD", None) +MQTT_BASE_TOPIC = env("MQTT_BASE_TOPIC", "CROWSNEST/SEAHORSE/RADAR/1/SWEEP") + +CLUON_CID = env.int("CLUON_CID", 65) +CLUON_ENVELOPE_ID = env.int("CLUON_ENVELOPE_ID", 1202) + +RADAR_ATTITUDE: list = env.list("RADAR_ATTITUDE", [0, 0, 0], subcast=float, validate=lambda x: len(x) == 3) +RADAR_MIN_READING_WEIGHT = env.int("RADAR_MIN_READING_WEIGHT", 0) +RADAR_SWEEP_ANGULAR_SUBSETTING = env.int("RADAR_SWEEP_ANGULAR_SUBSETTING", 2) +RADAR_SWEEP_RADIAL_SUBSETTING = env.int("RADAR_SWEEP_RADIAL_SUBSETTING", 2) +RADAR_INPUT_SLICE = env.int("RADAR_INPUT_SLICE", 2) + +LOG_LEVEL = env.log_level("LOG_LEVEL", logging.WARNING) + +## Import and generate code for message specifications +radar_message_spec = import_odvd("radar.odvd") + +# Setup logger +logging.basicConfig(level=LOG_LEVEL) +logging.captureWarnings(True) +warnings.filterwarnings("once") +LOGGER = logging.getLogger("crowsnest-processor-opendlv-radar") + +# Create mqtt client and confiure it according to configuration +mq = MQTT(client_id=MQTT_CLIENT_ID, transport=MQTT_TRANSPORT) +mq.username_pw_set(MQTT_USER, MQTT_PASSWORD) +if MQTT_TLS: + mq.tls_set() + + +# Not empty filter +not_empty = lambda x: x is not None + + +### Helper functions + + +@lru_cache +def decode_azimuth(spoke_direction: int) -> float: + """Decode azimuth from integer spoke_direction""" + return spoke_direction / 4096 * 360 + + +@lru_cache +def decode_distances(spoke_length: int, _range: float) -> np.ndarray: + """Decode distances from spoke length and range metadata""" + return np.array(range(spoke_length)) * _range / spoke_length + + +### Processing functions + + +def unpack_spoke(envelope: cEnvelope) -> Tuple[float, np.ndarray, np.ndarray]: + """Extract a radar message from the cluon envelope""" + LOGGER.info("Got envelope from pycluon") + try: + radar_message = radar_message_spec.opendlv_proxy_RadarDetectionReading() + radar_message.ParseFromString(envelope.serialized_data) + + str_id = envelope.sender_stamp + + if CLUON_ENVELOPE_ID == str_id: + LOGGER.info("Sender ID: %s", str_id) + + # Unpack message + azimuth = decode_azimuth(int(radar_message.azimuth)) + radar_range = radar_message.range + + spoke_data = np.frombuffer(radar_message.data, dtype=np.uint8) + + LOGGER.debug( + "Radar message unpacked with azimuth: %.4f, range: %.4f and spoke length: %d", + azimuth, + radar_range, + len(spoke_data), + ) + + distances = decode_distances(len(spoke_data), radar_range) + + # Radial filtering + distances = distances[::RADAR_SWEEP_RADIAL_SUBSETTING] + spoke_data = spoke_data[::RADAR_SWEEP_RADIAL_SUBSETTING] + + # Minimum weight filtering + mask = spoke_data > RADAR_MIN_READING_WEIGHT + distances = distances[mask] + spoke_data = spoke_data[mask] + + return ( + azimuth, + distances, + spoke_data, + ) + else: + LOGGER.info("Not handled sender ID: %s", str_id) + + except Exception: # pylint: disable=broad-except + LOGGER.exception("Exception when unpacking a radar message") + + return None + + +def polar_to_cartesian(azimuth: float, distances: np.ndarray, weights: np.ndarray) -> Tuple[float, np.ndarray]: + """Map from polar to cartesian coordinates""" + LOGGER.debug("Converting to cartesian for azimuth: %.4f", azimuth) + + x = distances * np.cos(np.deg2rad(azimuth)) # pylint: disable=invalid-name + y = distances * np.sin(np.deg2rad(azimuth)) # pylint: disable=invalid-name + + points = np.column_stack((y, x)) + + return azimuth, points, weights + + +# Simple "buffering" to output full rotations instead of each individual spoke +# pylint: disable=invalid-name +sweep_points = [] +sweep_weights = [] +last_azimuth = -1 + + +def buffer_to_full_360_view(azimuth: float, points: np.ndarray, weights: np.ndarray) -> np.ndarray: + """Buffer until we have a full sweep, then emit""" + + global sweep_points, sweep_weights, last_azimuth # pylint: disable=global-statement + + out = None + + if azimuth < last_azimuth: + # We just started a new rotation, emit the previous one + + # Angular filtering + sweep_points = sweep_points[::RADAR_SWEEP_ANGULAR_SUBSETTING] + sweep_weights = sweep_weights[::RADAR_SWEEP_ANGULAR_SUBSETTING] + + # Update out + out = ( + np.concatenate(sweep_points, axis=0), + np.concatenate(sweep_weights, axis=0), + ) + + # Clear the buffers + sweep_points.clear() + sweep_weights.clear() + LOGGER.info("Emitting new sweep!") + + # Add the current spoke + last_azimuth = azimuth + sweep_points.append(points) + sweep_weights.append(weights) + + LOGGER.debug("Buffering azimuth %.4f", azimuth) + + return out + + +def to_brefv(points: np.ndarray, weights: np.ndarray) -> Envelope: + """To brefv envelope""" + + LOGGER.info("Assembling new brefv envelope") + + envelope = Envelope( + sent_at=datetime.now(timezone.utc).isoformat(), + message={ + "points": points.tolist(), + "weights": weights.tolist(), + }, + ) + + return envelope + + +def to_mqtt(envelope: Envelope): + """Publish an envelope to a mqtt topic""" + + topic = f"{MQTT_BASE_TOPIC}" + + payload = envelope.json() + + LOGGER.debug("Publishing on %s with payload size: %s", topic, len(payload.encode())) + try: + mq.publish( + topic, + payload, + ) + except Exception: # pylint: disable=broad-except + LOGGER.exception("Failed publishing to broker!") + + +if __name__ == "__main__": + # Setup pipeline + source = Stream() + + pipe = ( + source + .slice(step=RADAR_INPUT_SLICE) # Drop if processing power is not enugh... + .map(unpack_spoke) + .filter(not_empty) + .starmap(polar_to_cartesian) + .starmap(buffer_to_full_360_view) + .filter(not_empty) + .starmap(to_brefv) + .sink(to_mqtt) + ) + + # Connect to broker + LOGGER.info("Connecting to MQTT broker %s %d", MQTT_BROKER_HOST, MQTT_BROKER_PORT) + mq.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT) + + LOGGER.info("All setup done, lets start processing messages!") + + # Register triggers + session = OD4Session(CLUON_CID) + session.add_data_trigger(1201, source.emit) + + + mq.loop_forever()