forked from CESARBR/knot-cloud-source
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.coffee
92 lines (72 loc) · 2.71 KB
/
worker.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
_ = require 'lodash'
process.env.MESSAGE_BUS_PORT = "" + _.random 10000, 50000
async = require 'async'
debug = require('debug')('meshblu:worker')
Benchmark = require 'simple-benchmark'
redis = require './lib/redis'
authDevice = require './lib/authDevice'
sendMessageCreator = require './lib/sendMessage'
createMessageIOEmitter = require './lib/createMessageIOEmitter'
MessageIO = require './lib/MessageIO'
RedisNS = require '@octoblu/redis-ns'
NAMESPACE = process.env.JOB_QUEUE_NAMESPACE ? 'meshblu'
class Worker
constructor: ->
messageIO = new MessageIO()
messageIO.start()
redisStore = redis.createIoStore()
messageIO.setAdapter redisStore
@_sendMessage = sendMessageCreator createMessageIOEmitter messageIO.io
@redis = new RedisNS NAMESPACE, redis.createClient()
@logClient = new RedisNS 'meshblu', redis.createClient()
run: =>
async.whilst @true, @popMessage, (error) =>
console.error 'whilst error:', error.stack
process.exit 1
true: => true
popMessage: (callback) =>
@redis.brpop 'meshblu-messages', 60, (err, result) =>
return callback err if err?
return callback() unless result?
benchmark = new Benchmark label: 'message'
debug 'start', benchmark.toString()
[queueName, jobStr] = result
@processJobStr jobStr, benchmark, callback
processJobStr: (jobStr, benchmark, callback) =>
debug 'parseJobStr', benchmark.toString()
@parseJob jobStr, benchmark, (error, job) =>
return callback() if error?
@processJob job, benchmark, callback
parseJob: (jobStr, benchmark, callback) =>
debug 'parseJob', benchmark.toString()
try
callback null, JSON.parse jobStr
catch error
console.error error.stack
callback()
processJob: (job, benchmark, callback) =>
debug 'processJob', benchmark.toString()
{auth,message} = job
{uuid,token} = auth
@authDevice uuid, token, benchmark, (error, device) =>
debug 'authedDevice', benchmark.toString()
return callback() if error?
@sendMessage device, message, benchmark, callback
authDevice: (uuid, token, benchmark, callback) =>
debug 'authDevice', benchmark.toString()
authDevice uuid, token, callback
sendMessage: (device, message, benchmark, callback) =>
debug 'sendMessage', benchmark.toString()
@_sendMessage device, message, 'message', =>
debug 'sentMessage', benchmark.toString()
jobStr = JSON.stringify
index: 'meshblu_worker'
type: 'message'
body:
elapsedTime: benchmark.elapsed()
devices: [message.devices]
fromUuid: device.uuid
date: benchmark.startTime
@logClient.lpush 'job-log', jobStr, callback
worker = new Worker()
worker.run()