This repository has been archived by the owner on Oct 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 51
/
iothub-explorer-monitor-events.js
156 lines (137 loc) · 6.33 KB
/
iothub-explorer-monitor-events.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env node
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var Promise = require('bluebird');
var Registry = require('azure-iothub').Registry;
// external dependencies
var program = require('commander');
var chalk = require('chalk');
// local dependencies
var inputError = require('./common.js').inputError;
var serviceError = require('./common.js').serviceError;
var printErrorWithHintAndExit = require('./common.js').printErrorWithHintAndExit;
var errorHints = require('./error-hints.js');
// Azure Event Hubs dependencies
var EventHubsClient = require('azure-event-hubs').Client;
var showDeprecationText = require('./common.js').showDeprecationText;
showDeprecationText('az iot hub monitor-events');
function coerceAndValidateDuration(value) {
var d = parseInt(value);
if (isNaN(d)) {
inputError('The value specified for --duration must be a number.');
}
else if (d < 0) {
inputError('The value specified for --duration must be a positive number.');
}
else if (d * 1000 > Number.MAX_SAFE_INTEGER) {
inputError('The value specified for --duration is too big. It must be no greater than Number.MAX_SAFE_INT / 1000.');
}
return d * 1000;
}
program
.description('Monitor messages sent by devices to the IoT hub')
.option('-l, --login <connectionString>', 'Use the provided connection string to authenticate with IoT Hub')
.option('-r, --raw', 'Return raw output instead of pretty-printed output (useful for automation)')
.option('-v, --verbose', 'Show more information from the received event, including annotations and properties')
.option('-c, --consumer-group <consumer-group>', 'Use the provided consumer group when connecting to Event Hubs')
.option('-s, --start-time <start-time>', 'Read messages that arrived on or after the given time (milliseconds since epoch or ISO-8601 string)')
.option('-d, --duration <duration>', 'Exit after the given number of seconds (runs indefinitely if not specified)', coerceAndValidateDuration)
.parse(process.argv);
if (!program.login) inputError('You must provide a connection string using the --login argument.');
var deviceId = program.args[0];
var connectionString = program.login;
var monitorEvents = function () {
if (!program.raw) {
if (deviceId) {
console.log(chalk.grey('Monitoring events from device ' + chalk.green(deviceId) + chalk.gray('...')));
} else {
console.log(chalk.grey('Monitoring events from all devices...'));
}
}
var consumerGroup = program.consumerGroup || '$Default';
var startTime = program.startTime ?
isNaN(program.startTime) ? new Date(program.startTime) : new Date(parseInt(program.startTime)) :
Date.now();
var ehClient = EventHubsClient.fromConnectionString(connectionString);
ehClient.open()
.then(function () {
return Promise.any([
program.duration ? Promise.delay(program.duration).then(ehClient.close.bind(ehClient)) : Promise.race([]),
ehClient.getPartitionIds()
.then(function (partitionIds) {
return partitionIds.map(function (partitionId) {
return ehClient.createReceiver(consumerGroup, partitionId, { 'startAfterTime': startTime })
.then(function (receiver) {
receiver.on('errorReceived', function (error) {
if (error.transport && error.transport.condition.indexOf('amqp:link:stolen') >= 0) {
printErrorWithHintAndExit(error.message, errorHints.amqpLinkStolen);
}
serviceError(error.message);
});
receiver.on('message', function (eventData) {
var from = eventData.annotations['iothub-connection-device-id'];
var raw = program.raw;
if (!deviceId || (deviceId && from === deviceId)) {
if (!raw) console.log('==== From: \'' + from + '\' at \''+(new Date()).toISOString()+'\' ====');
if (eventData.body instanceof Buffer) {
console.log(eventData.body.toString());
} else if (typeof eventData.body === 'string') {
console.log(JSON.stringify(eventData.body));
} else {
if (!raw) {
console.log(JSON.stringify(eventData.body, null, 2));
} else {
console.log(JSON.stringify(eventData.body));
}
}
if (program.verbose) {
if (eventData.annotations) {
if (!raw) {
console.log('---- annotations ----');
console.log(JSON.stringify(eventData.annotations, null, 2));
} else {
console.log(JSON.stringify(eventData.annotations));
}
}
if (eventData.properties) {
if (!raw) {
console.log('---- properties ----');
console.log(JSON.stringify(eventData.properties, null, 2));
} else {
console.log(JSON.stringify(eventData.properties));
}
}
}
if (eventData.applicationProperties) {
if (!raw) {
console.log('---- application properties ----');
console.log(JSON.stringify(eventData.applicationProperties, null, 2));
} else {
console.log(JSON.stringify(eventData.applicationProperties));
}
}
if (!raw) console.log('====================');
}
});
});
});
})
]);
})
.catch(function (error) {
serviceError(error.message);
});
};
if (deviceId) {
var registry = Registry.fromConnectionString(connectionString);
registry.get(deviceId, function (err) {
if (err) serviceError(err);
else {
monitorEvents();
}
});
}
else {
monitorEvents();
}