-
Notifications
You must be signed in to change notification settings - Fork 0
/
onpush.c
121 lines (93 loc) · 2.25 KB
/
onpush.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#include "app.h"
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
int rc = 0;
if ((rc = MQTTClient_connect(*client, opts)) != 0)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct opts_struct
{
char* clientid;
int nodelimiter;
char* delimiter;
int qos;
char* token;
char* appid;
char* host;
char* port;
int showtopics;
} opts =
{
"stdout-subscriber", 0, "\n", 0, TOKEN, APPID, HOST, PORT, 0
};
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char topic[128];
int rc = 0;
char url[100];
create_push_topic(topic, "message");
printf("topic is %s\n", topic);
if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1;
if (opts.showtopics)
printf("topic is %s\n", topic);
sprintf(url, "%s:%s", opts.host, opts.port);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 0;
conn_opts.username = opts.token;
conn_opts.password = opts.appid;
myconnect(&client, &conn_opts);
rc = MQTTClient_subscribe(client, topic, opts.qos);
while (!toStop)
{
char* topicName = NULL;
int topicLen;
MQTTClient_message* message = NULL;
rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
if (message)
{
if (opts.showtopics)
printf("%s\t", topicName);
if (opts.nodelimiter)
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
fflush(stdout);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
}
if (rc != 0)
myconnect(&client, &conn_opts);
}
printf("Stopping\n");
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return 0;
}