-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.js
102 lines (69 loc) · 2.14 KB
/
producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
'use strict'
const DEBUG = true // turn on console logging
const FREQ = 1 // Hz
const MQTT_TOPIC = "/flight/data" // topic on to which to publish
// -------------------------- DO NOT EDIT BELOW THIS LINE --------------------------
var mosca = require('mosca');
var fs = require('fs');
var stream = require('stream');
var lineByLine = require('n-readlines');
var countLinesInFile = require('./lib/countLinesInFile')
var dataFilename = "data.csv"
var instream = fs.createReadStream(dataFilename);
var numberOfLines = 0 ;
var INTERVAL = 1000/FREQ // ms
var settings = {
port: 1883,
backend: {
pubsubCollection: 'ascoltatori',
redis: {}
}
}
var lineCounter = 1;
var server = new mosca.Server(settings);
server.on('clientConnected', clientConnected);
server.on('ready', setup);
server.on('published', function(packet, client) {
// console.log('Published', packet.topic);
});
// fired when the mqtt server is ready
function setup() {
if (DEBUG)
{
console.log('Mosca server is up and running');
console.log(`Publish rate: ${FREQ} Hz`)
}
countLinesInFile(dataFilename, (err, numLines) => {
if (!err){
numberOfLines = numLines
}
});
}
server.on('clientDisconnected', function(client) {
if (DEBUG)
console.log('Client Disconnected:', client.id);
});
function clientConnected(client) {
if (DEBUG)
console.log('client connected', client.id);
var liner = new lineByLine(dataFilename);
var lineNumber = 0 ;
var interval = setInterval(function(){
var line = liner.next();
var jsonLine = JSON.parse(line).data
var message = {
topic: MQTT_TOPIC,
payload : JSON.stringify(jsonLine),
qos: 0, // 0, 1, or 2
retain: false // or true
}
server.publish(message, function() {
});
if (numberOfLines-lineNumber > 0){
lineNumber++;
} else {
lineNumber=0;
liner = new lineByLine(dataFilename);
}
}.bind(this),INTERVAL)
}