Skip to content

Commit

Permalink
Merge pull request #21 from vulogov/0.9.0
Browse files Browse the repository at this point in the history
0.9.0
  • Loading branch information
vulogov authored Jun 22, 2024
2 parents 26865f1 + 78f7ac2 commit 79298dd
Show file tree
Hide file tree
Showing 15 changed files with 630 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zbusdg"
version = "0.8.2"
version = "0.9.0"
edition = "2021"
description = "Universal Data Gateway for ZBUS project"
license-file = "LICENSE"
Expand Down
25 changes: 25 additions & 0 deletions Interfaces/zabbix_alerts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"clock": 1719081343,
"ns": 515516610,
"value": 1,
"eventid": 51,
"name": "SLA triggered",
"severity": 4,
"hosts": [
{
"host": "test_host",
"name": "test_host"
}
],
"groups": [
"Linux servers"
],
"tags": [
{
"tag": "SLA",
"value": "TEST"
}
]
}

{"clock":1719081346,"ns":754691765,"value":0,"eventid":52,"p_eventid":51}
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,66 @@ If you are pass CLI option --logs-analysis , you will launch log analysis thread
"id": "Ez2JWwSDsOZN-74gPF6dr"
}
```

## Real-time Zabbix alerts receive

ZBUSUDG can receive and publish Zabbix alerts to ZBUS universal telemetry bus as a telemetry item, thus implementing the idea "Alerts is a Telemetry data too". You can launch alerts receiver

```bash
zbusudg alerts
```

and then you can receive alerts as a telemetry from the Universal Telemetry Bus

```bash
zbusdg gateway --stdout --pretty --zbus-catcher --zbus-subscribe-key events
```

This is the sample of alert

```json
{
"body": {
"details": {
"destination": "/events",
"details": {
"contentType": 3,
"data": 1,
"detailType": ""
},
"origin": "home.lan",
"properties": {
"groups": [
"Linux servers"
],
"name": "SLA triggered",
"tags": [
{
"tag": "SLA",
"value": "TEST"
}
],
"zabbix_clock": 1719087396,
"zabbix_eventid": 81,
"zabbix_host_name": [
{
"host": "test_host",
"name": "test_host"
}
],
"zabbix_ns": 170124227
}
}
},
"headers": {
"compressionAlgorithm": null,
"cultureCode": null,
"encryptionAlgorithm": null,
"messageType": "event",
"route": "local",
"streamName": "local",
"version": "v2"
},
"id": "iZ6qykdOkpDoRK10rM9Wp"
}
```
52 changes: 52 additions & 0 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub mod zbus_gateway_catcher_syslogd;
pub mod zbus_gateway_catcher_prometheus_scraper;
pub mod zbus_version;
pub mod zbus_login;
pub mod zbus_alerts;
pub mod zbus_alerts_zabbix;
pub mod zbus_alerts_processor;
pub mod zbus_alerts_processor_filter;
pub mod zbus_alerts_processor_transformation;
pub mod zbus_json;
pub mod zbus_rhai;
pub mod zbus_sampler;
Expand All @@ -54,6 +59,7 @@ pub mod zbus_thread_filter;
pub mod zbus_thread_transformation;
pub mod zbus_thread_analysis;
pub mod zbus_thread_logs_analysis;
pub mod zbus_thread_zbus_sender;
pub mod zbus_loader_logs_categorization;


Expand Down Expand Up @@ -81,6 +87,10 @@ pub fn init() {
log::debug!("Execute ZBUS pipelines");
zbus_pipeline::run(&cli, &pipeline);
}
Commands::Alerts(alerts) => {
log::debug!("Execute ZBUS alerts");
zbus_alerts::run(&cli, &alerts);
}
Commands::ConvertKey(convertkey) => {
log::debug!("Generate ZabbixAPI token");
zbus_convertkey::run(&cli, &convertkey);
Expand Down Expand Up @@ -130,6 +140,10 @@ pub struct Cli {
#[clap(long, default_value_t = 3600, help="Timeout for Zabbix ITEMS cache")]
pub item_cache_timeout: u16,

#[clap(long, default_value_t = 1, help="Number of HELLO received for Zenoh scout")]
pub hello_received: usize,


#[arg(short, long, value_name = "LOGS_CATEGORIES", help="HJSON dictionary for logs telemetry categorization")]
logs_categorization: Option<PathBuf>,

Expand Down Expand Up @@ -376,6 +390,43 @@ pub struct Gateway {
group: GatewayArgGroup,
}

#[derive(Args, Clone, Debug)]
#[clap(about="Catching and translating Zabbix Alerts")]
pub struct Alerts {

#[clap(help="Events source", long, default_value_t = String::from(hostname::get_hostname()))]
pub source: String,

#[clap(long, default_value_t = 1, help="Number of catcher threads")]
pub threads: u16,

#[clap(help="Zabbix AUTH token", long, default_value_t = String::from(""))]
pub zabbix_token: String,

#[clap(help="Listen address for the alerts catcher", long, default_value_t = String::from("0.0.0.0:10056"))]
pub listen: String,

#[clap(help="ZBUS address", long, default_value_t = String::from(env::var("ZBUS_ADDRESS").unwrap_or("tcp/127.0.0.1:7447".to_string())))]
pub zbus_connect: String,

#[clap(help="ZBUS listen address", long, default_value_t = String::from_utf8(vec![]).unwrap())]
pub zbus_listen: String,

#[clap(help="ZBUS key", long, default_value_t = String::from("events"))]
pub zbus_key: String,

#[clap(long, action = clap::ArgAction::SetTrue, help="Disable multicast discovery of ZENOH bus")]
pub zbus_disable_multicast_scout: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Configure CONNECT mode for ZENOH bus")]
pub zbus_set_connect_mode: bool,

#[arg(short, long, value_name = "SCRIPT", help="Run scripting filtering and transformation")]
script: Option<PathBuf>,

}


#[derive(Debug, Clone, clap::Args)]
#[group(required = true, multiple = false)]
pub struct GatewayArgGroup {
Expand Down Expand Up @@ -444,5 +495,6 @@ enum Commands {
Monitor(Monitor),
Api(Api),
Pipeline(Pipeline),
Alerts(Alerts),
Version(Version),
}
76 changes: 76 additions & 0 deletions src/cmd/zbus_alerts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use std::str::FromStr;
use std::path::Path;
use zenoh::config::{Config, ConnectConfig, ListenConfig, EndPoint, WhatAmI};

pub fn run(c: &cmd::Cli, alerts: &cmd::Alerts) {
log::debug!("zbus_alerts::run() reached");

let mut config = Config::default();
if alerts.zbus_disable_multicast_scout.clone() {
match config.scouting.multicast.set_enabled(Some(false)) {
Ok(_) => { log::debug!("Multicast discovery disabled")}
Err(err) => {
log::error!("Failure in disabling multicast discovery: {:?}", err);
return;
}
}
} else {
log::debug!("Multicast discovery enabled");
}
match EndPoint::from_str(&alerts.zbus_connect) {
Ok(zconn) => {
log::debug!("ZENOH bus set to: {:?}", &zconn);
let _ = config.set_connect(ConnectConfig::new(vec![zconn]).unwrap());
}
Err(err) => {
log::error!("Failure in parsing connect address: {:?}", err);
return;
}
}
match EndPoint::from_str(&alerts.zbus_listen) {
Ok(zlisten) => {
log::debug!("ZENOH listen set to: {:?}", &zlisten);
let _ = config.set_listen(ListenConfig::new(vec![zlisten]).unwrap());
}
Err(_) => {
log::debug!("ZENOH listen set to default");
}
}
if alerts.zbus_set_connect_mode {
log::debug!("ZENOH configured in CONNECT mode");
let _ = config.set_mode(Some(WhatAmI::Client));
} else {
log::debug!("ZENOH configured in PEER mode");
let _ = config.set_mode(Some(WhatAmI::Peer));
}
if config.validate() {
log::debug!("ZENOH config is OK");
} else {
log::error!("ZENOH config not OK");
return;
}

match &alerts.script {
Some(fname) => {
if Path::new(&fname).exists() {
log::debug!("Filtering and transformation enabled");
cmd::zbus_alerts_processor_filter::processor(c, alerts);
cmd::zbus_alerts_processor_transformation::processor(c, alerts);
} else {
log::error!("Script not found processing disabled");
return;
}
}
None => log::debug!("Filtering disabled"),
}

cmd::zbus_alerts_zabbix::catcher(c, alerts);
cmd::zbus_alerts_processor::processor(c, alerts);
cmd::zbus_thread_zbus_sender::sender(c, config, true, false, alerts.zbus_key.clone());


stdlib::threads::wait_all();
}
119 changes: 119 additions & 0 deletions src/cmd/zbus_alerts_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use serde_json::{json, Deserializer, Value};

pub fn processor(c: &cmd::Cli, alerts: &cmd::Alerts) {
log::debug!("zbus_alerts_processor::run() reached");
let c = c.clone();
let alerts = alerts.clone();
match stdlib::threads::THREADS.lock() {
Ok(t) => {
t.execute(move ||
{
log::debug!("ALERTS PROCESSOR thread has been started");
loop {
if ! stdlib::channel::pipe_is_empty_raw("in".to_string()) {
match stdlib::channel::pipe_pull("in".to_string()) {
Ok(res) => {
log::debug!("Received {} bytes by processor", &res.len());
let stream = Deserializer::from_str(&res).into_iter::<Value>();
for value in stream {
match value {
Ok(zjson) => {
if ! zjson.is_object() {
log::error!("Received JSON is not an object: {}", &zjson);
continue;
}
let value = match cmd::zbus_gateway_processor::zabbix_json_get_raw(&zjson, "value".to_string()) {
Some(value) => value,
None => continue,
};
if ! value.is_i64() {
log::error!("Alert value is not an integer");
continue;
}
let ivalue = match value.as_i64() {
Some(ivalue) => ivalue,
None => continue,
};
let id = match ivalue {
0 => {
match stdlib::alerts::resolve_alert(zjson.clone()) {
Some(id) => id,
None => continue,
}
}
1 => {
log::debug!("Adding alert");
match stdlib::alerts::add_alert(zjson.clone()) {
Some(id) => id,
None => continue,
}
}
_ => continue,
};
log::trace!("Alertid: {:?} {:?}", &id, &zjson);
let data = json!({
"headers": {
"messageType": "event",
"route": c.route.clone(),
"streamName": c.platform_name.clone(),
"cultureCode": null,
"version": c.protocol_version.clone(),
"encryptionAlgorithm": null,
"compressionAlgorithm": null,
},
"body": {
"details": {
"origin": alerts.source.clone(),
"destination": format!("/{}", alerts.zbus_key),
"properties": {
"zabbix_clock": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "clock".to_string()),
"zabbix_ns": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "ns".to_string()),
"zabbix_host_name": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "hosts".to_string()),
"zabbix_eventid": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "eventid".to_string()),
"name": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "name".to_string()),
"tags": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "tags".to_string()),
"groups": cmd::zbus_gateway_processor::zabbix_json_get(&zjson, "groups".to_string()),
},
"details": {
"detailType": "",
"contentType": 3,
"data": value,
}
}
},
"id": id,
});
match &alerts.script {
Some(_) => {
stdlib::channel::pipe_push("filter".to_string(), data.to_string());
}
None => {

stdlib::channel::pipe_push("out".to_string(), data.to_string());
}
}
}
Err(err) => {
log::error!("Error converting JSON: {:?}", err);
}
}
}
}
Err(err) => log::error!("Error getting data from channel: {:?}", err),
}
} else {
stdlib::sleep::sleep(1);
}
}
});
drop(t);
}
Err(err) => {
log::error!("Error accessing Thread Manager: {:?}", err);
return;
}
}
}
Loading

0 comments on commit 79298dd

Please sign in to comment.