-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
109 lines (91 loc) · 2.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//
// Author: Avocarrot Ltd
// Site: https://github.com/Avocarrot/winston-kafka
// Maintainer: Panayiotis Papageorgiou <[email protected]>
// License: MIT
//
var util = require('util'),
winston = require('winston'),
kafka = require('kafka-node'),
Transport = winston.Transport;
//
// function Kafka (options)
// @options {Object} Options for this instance.
// Constructor function for the Kafka transport object responsible
// for persisting log messages and metadata to Apache Kafka.
//
var Kafka = exports.Kafka = function(options) {
Transport.call(this, options);
options = options || {};
this.connectionString = options.connectionString || 'localhost:2181';
this.clientId = options.clientId || 'winston-kafka-transport';
this.zkOptions = options.zkOptions;
this.producerOptions = options.producerOptions;
// Producer Props
this.topic = options.topic;
this.compress = !!options.compress ? 1 : 0; // if 1 then compression done using Gzip
this.producerReady = false;
// Construct Kafka client
this.client = new kafka.Client(this.connectionString, this.clientId, this.zkOptions);
// Construct Producer
this.producer = new kafka.HighLevelProducer(this.client, this.producerOptions);
var that = this;
this.producer.on('ready', function() {
that.producerReady = true;
});
this.producer.on('error', function(error) {
that.producerReady = false;
});
};
util.inherits(Kafka, winston.Transport);
//
// Expose the name of this Transport on the prototype
//
Kafka.prototype.name = 'kafka';
//
// function _send (message, callback)
// @callback {function} Continuation to respond to when complete.
// Uses the kafka producer to send the log message to the kafka cluster
//
Kafka.prototype._send = function(message, callback) {
var cb = (typeof callback === 'function') ? callback : function() {};
if (!message) cb(new Error('No message to log'));
var payload = [{
topic: this.topic,
messages: JSON.stringify(message),
attributes: this.compress
}];
if (this.producerReady) {
this.producer.send(payload, cb);
} else {
cb(new Error('Kafka producer not ready'));
}
};
//
// function log (level, msg, [meta], callback)
// @level {string} Level at which to log the message.
// @msg {string} Message to log
// @meta {Object} **Optional** Additional metadata to attach
// @callback {function} Continuation to respond to when complete.
// Core logging method exposed to Winston. Metadata is optional.
//
Kafka.prototype.log = function(level, msg, meta, callback) {
var that = this;
if (typeof meta === 'function' && meta()) {
callback = meta;
meta = {};
} else {
callback = (typeof callback === 'function') ? callback : function() {};
}
var message = {
msg: msg,
level: level,
meta: meta,
timestamp: new Date().toISOString()
};
this._send(message, function(error) {
if (error) return callback(error);
that.emit('logged');
callback(null, true);
});
};