-
Notifications
You must be signed in to change notification settings - Fork 0
/
scapi_nngpp_session.cpp
98 lines (84 loc) · 2.72 KB
/
scapi_nngpp_session.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include "scapi_nngpp_session.hpp"
#include "scapi_messages_asn1c.hpp"
#include "utils.hpp"
#include <nngpp/nngpp.h>
#include <nngpp/protocol/req0.h>
#include <nngpp/protocol/pull0.h>
using namespace ::std;
using namespace ::std::chrono;
using namespace ::scapi::nngpp;
using namespace ::nng;
using ::scapi::Response;
using ::scapi::Request;
using ::scapi::Notification;
class RecvTimeoutGuard {
::socket& socket;
nng_duration stored_timeout;
bool ignore = true;
public:
RecvTimeoutGuard(::socket& s, const nng_duration timeout)
: socket(s)
, stored_timeout(get_opt_recv_timeout(s)) {
if (timeout >= 0) {
set_opt_recv_timeout(socket, timeout);
ignore = false;
}
}
~RecvTimeoutGuard(void) noexcept {
if (!ignore) {
set_opt_recv_timeout(socket, stored_timeout);
}
}
};
struct Session::Impl {
::socket interaction_socket;
::socket notification_socket;
int req_counter = 0;
Impl(void);
~Impl(void) = default;
vector<unsigned char> exch(const vector<unsigned char>&);
Response interaction(const Request& r, milliseconds);
Notification notification(void);
};
Session::Impl::Impl(void) :
interaction_socket(req::open()),
notification_socket(pull::open()) {
set_opt_recv_timeout(notification_socket, 1 * 60 * 60 * 1000);
set_opt_recv_timeout(interaction_socket, 3 * 1000);
set_opt_send_timeout(interaction_socket, 1 * 1000);
interaction_socket.dial("ipc:///tmp/fatrq");
notification_socket.listen("ipc:///tmp/fatnt");
}
vector<unsigned char>
Session::Impl::exch(const vector<unsigned char>& b) {
const view vreq(b.data(), b.size());
interaction_socket.send(vreq);
buffer nrsp = interaction_socket.recv();
vector<unsigned char> rsp(nrsp.data<unsigned char>(), nrsp.data<unsigned char>() + nrsp.size());
return rsp;
}
Response
Session::Impl::interaction(const Request& r, const milliseconds rcv_timeout) {
const RecvTimeoutGuard guard(interaction_socket, integer_cast<nng_duration>(rcv_timeout.count()));
const auto rq = encode_nng({r, ++req_counter});
const auto rs = exch(rq);
return decode_nng(rs).rsp;
}
Notification
Session::Impl::notification(void) {
buffer nntf = notification_socket.recv();
vector<unsigned char> ntf(nntf.data<unsigned char>(), nntf.data<unsigned char>() + nntf.size());
return decode_nng_ntf(ntf).ntf;
}
Session::Session(void) :
pimpl(make_unique<Session::Impl>()) {
}
Session::~Session(void) noexcept = default;
Response
Session::interaction(const Request& r, const milliseconds rcv_timeout) {
return pimpl->interaction(r, rcv_timeout);
}
Notification
Session::notification(void) {
return pimpl->notification();
}