-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzerosubscriber.cpp
99 lines (95 loc) · 3.45 KB
/
zerosubscriber.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "zerosubscriber.h"
#include <QCoreApplication>
#include <QDebug>
#include <QString>
#include <gen/error.h>
ZeroSubscriber::ZeroSubscriber(zmq::context_t &ctx, int sock_type, QObject *parent)
: QObject(parent), _ctx(ctx), _worker(_ctx, sock_type)
{
}
void ZeroSubscriber::work()
{
try
{
_worker.connect("inproc://backendSub");
while (is_active)
{
zmq::message_t identity;
zmq::message_t msg;
auto id = _worker.recv(identity);
auto ms = _worker.recv(msg);
std::string data(msg.to_string());
std::string iden(identity.to_string());
iden = "[ " + iden + "in ] ";
alise::PackedMessage packedMessage;
packedMessage.ParseFromString(data);
const auto &messageContent = packedMessage.content();
if (messageContent.Is<alise::Health>())
{
alise::Health *protoHealth = new alise::Health;
if (!messageContent.UnpackTo(protoHealth))
{
qWarning() << Error::WriteError;
continue;
}
qDebug() << iden.c_str() << "Health :" << protoHealth->code();
emit healthReceived(protoHealth->code());
delete protoHealth;
}
else if (messageContent.Is<google::protobuf::Timestamp>())
{
google::protobuf::Timestamp protoTime;
if (!messageContent.UnpackTo(&protoTime))
{
qWarning() << Error::WriteError;
continue;
}
timespec unixTime;
unixTime.tv_sec = protoTime.seconds();
unixTime.tv_nsec = protoTime.nanos();
qDebug() << iden.c_str() << "Time :" << unixTime.tv_sec << ":" << unixTime.tv_nsec;
emit timeReceived(unixTime);
}
else if (messageContent.Is<alise::HelloRequest>())
{
alise::HelloRequest helloAlise;
if (!messageContent.UnpackTo(&helloAlise))
{
qWarning() << Error::WriteError;
continue;
}
qDebug() << iden.c_str() << "HelloReq :" << helloAlise.message();
emit helloReceived();
}
else if (messageContent.Is<alise::TimeRequest>())
{
alise::TimeRequest payload;
if (!messageContent.UnpackTo(&payload))
{
qWarning() << Error::WriteError;
continue;
}
qDebug() << iden.c_str() << "TimeReq";
emit timeRequest();
}
else if (messageContent.Is<alise::PingRequest>())
{
qDebug() << iden.c_str() << "PingReq";
emit pingRequest();
}
else
{
qDebug() << "Received id bytes: " << id.value();
qDebug() << "Received id: " << identity.to_string().c_str();
qDebug() << "Received msg bytes: " << ms.value();
qDebug() << "Received msg: " << data.c_str();
qCritical() << Error::WrongType;
}
QCoreApplication::processEvents();
}
_worker.close();
} catch (std::exception &e)
{
qCritical() << "Exception: " << e.what();
}
}