-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbitmq_producer.cpp
executable file
·145 lines (109 loc) · 5.1 KB
/
rabbitmq_producer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once
#include <eosio/rabbitmq_plugin/rabbitmq_producer.hpp>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fc/io/json.hpp>
#include <fc/utf8.hpp>
#include <fc/variant.hpp>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <eosio/chain/controller.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>
namespace eosio {
void exception_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
FC_THROW_EXCEPTION(rabbitmq_plugin_no_reply_exception, "no reply" );
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
FC_THROW_EXCEPTION(rabbitmq_plugin_library_exception, "rabbitmq error: ${c}: ${e}\n",
("c", context)("e", amqp_error_string2(x.library_error))
);
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m =
(amqp_connection_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
// throw closed exception
FC_THROW_EXCEPTION(rabbitmq_plugin_closed_connection_exception, "closed connection" );
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
FC_THROW_EXCEPTION(rabbitmq_plugin_closed_channel_exception, "closed channel" );
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
FC_THROW_EXCEPTION(rabbitmq_plugin_unknown_exception, "%{c}: unknown server error",
("c", context) );
break;
}
break;
}
}
int rabbitmq_producer::trx_rabbitmq_init(std::string hostname, uint32_t port, std::string username,std::string password){
conn = amqp_new_connection();
m_hostname = hostname;
m_port = port;
m_username = username;
m_password = password;
socket = amqp_tcp_socket_new(conn);
EOS_ASSERT( socket, rabbitmq_plugin_open_connection_exception, "failed creating rabbitmq socket");
auto status = amqp_socket_open(socket, hostname.c_str(), port);
EOS_ASSERT( !status, rabbitmq_plugin_open_connection_exception, "failed opening connection socket");
exception_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
username.c_str(), password.c_str()),"login");
amqp_channel_open(conn, 1);
exception_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
return 0;
}
int rabbitmq_producer::trx_rabbitmq_assert_exchange(std::string exchane_name, std::string exchange_type){
auto status = amqp_exchange_declare(conn, //Connection
1, //channel
amqp_cstring_bytes(exchane_name.c_str()), //exchange_name
amqp_cstring_bytes(exchange_type.c_str()), //exchange_type
0, //passive
1, //durable
0,
0,
amqp_empty_table );
// EOS_ASSERT( !status, rabbitmq_plugin_assert_exchange, "failed asserting exchange");
return 0;
}
void rabbitmq_producer::trx_rabbitmq_sendmsg(std::string routingKey, std::string exchange, std::string msgstr){
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
int res = amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()),
amqp_cstring_bytes(routingKey.c_str()), 0, 0,
&props, amqp_cstring_bytes(msgstr.c_str()));
// dlog("sending message ${e}", ("e", exchange));
EOS_ASSERT( res == 0, rabbitmq_plugin_assert_exchange, "failed publishing message");
// dlog("message sent ${m}", ("m", msgstr));
}
void rabbitmq_producer::trx_rabbitmq_destroy(){
exception_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
exception_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
amqp_destroy_connection(conn);
}
}