Skip to content

Commit

Permalink
Merge pull request #3 from vulogov/0.3.0
Browse files Browse the repository at this point in the history
0.3.0
  • Loading branch information
vulogov authored May 30, 2024
2 parents bc2e7ac + c33172b commit 7d499ae
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zbusdg"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
description = "Universal Data Gateway for ZBUS project"
license-file = "LICENSE"
Expand Down Expand Up @@ -30,3 +30,4 @@ timedmap = "1.0.2"
etime = "0.1.8"
zenoh = "0.10.1-rc"
nats = "0.25.0"
mqtt-protocol = "0.12.0"
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --zbus --zabbix-token z

### Output processor NATS

Collected telemetry is shipped to the NATS.io server, stored for storage, and could be accessed by any NATS.io client. Delivery could be performed in aggregated or per Zabbix key mode. If aggregated delivery is specified, all telemetry will be delivered to a single key on the bus; otherwise, the gateway will extract a destination key from the telemetry message.
Collected telemetry is shipped to the NATS.io server, and could be accessed by any NATS.io client. Delivery could be performed in aggregated or per Zabbix key mode. If aggregated delivery is specified, all telemetry will be delivered to a single key on the bus; otherwise, the gateway will extract a destination key from the telemetry message.

Delivery with telemetry aggregation

Expand All @@ -77,6 +77,29 @@ Delivery without aggregation,to an individual item keys
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --nats --zabbix-token zabbixapitoken
```

### Output processor MQTT

Collected telemetry is shipped to the MQTT server, and could be accessed by any MQTT client. Delivery could be performed in aggregated only mode.

Delivery with telemetry aggregation

```
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --mqtt --zabbix-token zabbixapitoken --mqtt-aggregate-key mykey
```


### Send UDG telemetry to ZBUS

ZBUS UDG can send some internal telemetry alongside with telemetry received from Zabbix server.

#### Monitor elapsed time spent in processing JSON telemetry batches

You can monitor elapsed time for JSON batch processing by passing --telemetry-monitor-elapsed to the gateway command line target. Trelemetry will be submitted to the key /zbus/udg/elapsed

```
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --nats --zabbix-token zabbixapitoken --telemetry-monitor-elapsed
```

## Monitor ZBUS submission

In order to verify and debug your gateway, you can run zbusudg in the "monitor mode", where you subscribing to the key on ZBUS and dump on STDOUT all data packets received on that key.
Expand Down
14 changes: 14 additions & 0 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ pub mod zbus_gateway_processor;
pub mod zbus_gateway_stdout_sender;
pub mod zbus_gateway_zbus_sender;
pub mod zbus_gateway_nats_sender;
pub mod zbus_gateway_mqtt_sender;
pub mod zbus_gateway_tcpsocket_sender;
pub mod zbus_version;
pub mod zbus_login;
pub mod zbus_json;

pub fn init() {
log::debug!("Parsing CLI parameters");
Expand Down Expand Up @@ -144,6 +146,9 @@ pub struct Gateway {
#[clap(long, default_value_t = 1, help="Number of catcher threads")]
pub threads: u16,

#[clap(long, action = clap::ArgAction::SetTrue, help="Monitor elapsed time for JSON batch processing")]
pub telemetry_monitor_elapsed: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Display a pretty JSON")]
pub pretty: bool,

Expand All @@ -156,6 +161,9 @@ pub struct Gateway {
#[clap(help="NATS address", long, default_value_t = String::from(env::var("NATS_ADDRESS").unwrap_or("127.0.0.1:4222".to_string())))]
pub nats_connect: String,

#[clap(help="MQTT address", long, default_value_t = String::from(env::var("MQTT_ADDRESS").unwrap_or("127.0.0.1:1883".to_string())))]
pub mqtt_connect: String,

#[clap(help="ZBUS listen address", long, default_value_t = String::from_utf8(vec![]).unwrap())]
pub zbus_listen: String,

Expand All @@ -165,6 +173,9 @@ pub struct Gateway {
#[clap(help="NATS aggregate key", long, default_value_t = String::from("aggregation"))]
pub nats_aggregate_key: String,

#[clap(help="MQTT aggregate key", long, default_value_t = String::from("aggregation"))]
pub mqtt_aggregate_key: String,

#[clap(long, action = clap::ArgAction::SetTrue, help="Disable multicast discovery of ZENOH bus")]
pub zbus_disable_multicast_scout: bool,

Expand Down Expand Up @@ -199,6 +210,9 @@ pub struct GatewayArgGroup {
#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to NATS")]
pub nats: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to MQTT")]
pub mqtt: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to NONE")]
pub none: bool,
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/zbus_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub fn run(c: &cmd::Cli, gateway: &cmd::Gateway) {
cmd::zbus_gateway_zbus_sender::sender(c, gateway);
} else if gateway.group.nats {
cmd::zbus_gateway_nats_sender::sender(c, gateway);
} else if gateway.group.mqtt {
cmd::zbus_gateway_mqtt_sender::sender(c, gateway);
} else if gateway.group.none {
log::info!("Sender is set to NONE");
} else {
Expand Down
99 changes: 99 additions & 0 deletions src/cmd/zbus_gateway_mqtt_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use nanoid;
use std::io::prelude::*;
use std::net::TcpStream;
use mqtt::{Encodable, Decodable};
use mqtt::packet::{QoSWithPacketIdentifier, SubscribePacket, ConnackPacket, ConnectPacket, PublishPacketRef};
use mqtt::{TopicName, TopicFilter, QualityOfService};
use mqtt::control::variable_header::ConnectReturnCode;
use serde_json::{Deserializer, Value};


pub fn sender(c: &cmd::Cli, gateway: &cmd::Gateway) {
log::trace!("zbus_gateway_mqtt_sender::run() reached");
let gateway = gateway.clone();
let c = c.clone();

match stdlib::threads::THREADS.lock() {
Ok(t) => {
t.execute(move ||
{
log::debug!("MQTT sender thread has been started");
let aggregate_key = format!("{}/{}", &c.platform_name, &gateway.mqtt_aggregate_key);
log::debug!("Published telemetry will be aggregated to: {}", &aggregate_key);
let mut channel_filter: Vec<(TopicFilter, QualityOfService)> = Vec::new();
let mut channels: Vec<TopicName> = Vec::new();
channel_filter.push((TopicFilter::new(aggregate_key.clone()).unwrap(), QualityOfService::Level0));
channels.push(TopicName::new(aggregate_key.clone()).unwrap());
'outside: loop {
match TcpStream::connect(gateway.mqtt_connect.clone()) {
Ok(mut stream) => {
let mut conn = ConnectPacket::new(nanoid::nanoid!());
conn.set_clean_session(true);
let mut buf = Vec::new();
conn.encode(&mut buf).unwrap();
stream.write(&buf[..]).unwrap();
let connack = ConnackPacket::decode(&mut stream).unwrap();
if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted {
log::error!("MQTT returned: {:?}", connack.connect_return_code());
break 'outside;
}
let sub = SubscribePacket::new(10, channel_filter);
let mut buf = Vec::new();
sub.encode(&mut buf).unwrap();
stream.write(&buf[..]).unwrap();
loop {
if ! stdlib::channel::pipe_is_empty_raw("out".to_string()) {
match stdlib::channel::pipe_pull("out".to_string()) {
Ok(res) => {
log::debug!("Received {} bytes by MQTT processor", &res.len());
let vstream = Deserializer::from_str(&res).into_iter::<Value>();
for value in vstream {
match value {
Ok(zjson) => {
match serde_json::to_string(&zjson) {
Ok(payload) => {
for chan in &channels {
let publish_packet = PublishPacketRef::new(chan, QoSWithPacketIdentifier::Level0, payload.as_bytes());
let mut buf = Vec::new();
publish_packet.encode(&mut buf).unwrap();
stream.write(&buf[..]).unwrap();
}
}
Err(err) => {
log::error!("Error convert JSON to string: {}", err);
}
}
}
Err(err) => {
log::error!("Error converting JSON: {:?}", err);
}
}
log::debug!("End of JSON");
}
log::debug!("End of JSON series");
}
Err(err) => log::error!("Error getting data from channel: {:?}", err),
}
} else {
stdlib::sleep::sleep(1);
}
}
}
Err(err) => {
log::error!("Error connecting to MQTT: {}", err);
break 'outside;
}
}
}
});
drop(t);
}
Err(err) => {
log::error!("Error accessing Thread Manager: {:?}", err);
return;
}
}
}
10 changes: 9 additions & 1 deletion src/cmd/zbus_gateway_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,22 @@ pub fn processor(c: &cmd::Cli, gateway: &cmd::Gateway) {
if ! gateway.group.none {
stdlib::channel::pipe_push("out".to_string(), data.to_string());
}
// stdlib::channel::pipe_push("out".to_string(), data.to_string());
// stdlib::channel::pipe_push("out".to_string(), zjson.to_string());
}
Err(err) => {
log::error!("Error converting JSON: {:?}", err);
}
}
}
log::debug!("Elapsed time for processing: {} seconds", e.toc().as_secs_f32());
let elapsed = e.toc().as_secs_f32();
log::debug!("Elapsed time for processing: {} seconds", elapsed);
if gateway.telemetry_monitor_elapsed {
let data = cmd::zbus_json::generate_json_telemetry(&c, "/zbus/udg/elapsed".to_string(), "Elapsed time for JSON batch processing".to_string(), 3, json!(elapsed));
if ! gateway.group.none {
stdlib::channel::pipe_push("out".to_string(), data.to_string());
}
}
}
Err(err) => log::error!("Error getting data from channel: {:?}", err),
}
Expand Down
41 changes: 41 additions & 0 deletions src/cmd/zbus_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use nanoid::nanoid;
use serde_json::{json, Value};

pub fn generate_json_telemetry(c: &cmd::Cli, dst: String, name: String, ctype: usize, data: Value) -> Value {
let ts = stdlib::time::timestamp_ns();
json!({
"headers": {
"messageType": "telemetry",
"route": c.route.clone(),
"streamName": c.platform_name.clone(),
"cultureCode": null,
"version": c.protocol_version.clone(),
"encryptionAlgorithm": null,
"compressionAlgorithm": null,
},
"body": {
"details": {
"origin": c.platform_name.clone(),
"destination": dst.clone(),
"properties": {
"zabbix_clock": stdlib::time::whole_seconds(ts),
"zabbix_ns": stdlib::time::nanoseconds(ts),
"zabbix_host_name": c.source.clone(),
"zabbix_itemid": null,
"name": name.clone(),
"tags": null,

},
"details": {
"detailType": "",
"contentType": ctype,
"data": data,
}
}
},
"id": nanoid!(),
})
}
1 change: 1 addition & 0 deletions src/stdlib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod hostname;
pub mod channel;
pub mod sleep;
pub mod threads;
pub mod time;

use crate::cmd::{Cli};

Expand Down
11 changes: 11 additions & 0 deletions src/stdlib/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::time::{SystemTime, UNIX_EPOCH, Duration};

pub fn timestamp_ns() -> f64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as f64
}
pub fn whole_seconds(t: f64) -> f64 {
Duration::from_nanos(t as u64).as_secs_f64()
}
pub fn nanoseconds(t: f64) -> f64 {
Duration::from_nanos(t as u64).subsec_nanos() as f64
}

0 comments on commit 7d499ae

Please sign in to comment.