From 73e01827cb0a147c4a19115bcc9259d54e1eba33 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Tue, 21 Jun 2022 10:30:26 +0300 Subject: [PATCH] Added partial support for running Zone-mta plugins for message queueing --- config/default.toml | 2 +- config/plugins.toml | 6 ++ config/plugins/rspamd.toml | 9 ++ lib/api/messages.js | 6 +- lib/api/submit.js | 7 +- lib/forward.js | 6 +- lib/maildropper.js | 188 ++++++++++++++++++++++++------------- lib/plugins.js | 149 +++-------------------------- package.json | 13 +-- plugins/core/rspamd.js | 3 + plugins/example.js | 5 +- worker.js | 12 +-- 12 files changed, 185 insertions(+), 221 deletions(-) create mode 100644 config/plugins.toml create mode 100644 config/plugins/rspamd.toml create mode 100644 plugins/core/rspamd.js diff --git a/config/default.toml b/config/default.toml index 5b4169ce..d3df5f80 100644 --- a/config/default.toml +++ b/config/default.toml @@ -86,7 +86,7 @@ processes=1 #cipher="aes192" # only for decrypting legacy values (if there are any) [plugins] -# @include "plugins/*.toml" +# @include "plugins.toml" [tasks] # if enabled then process jobs like deleting expired messages etc diff --git a/config/plugins.toml b/config/plugins.toml new file mode 100644 index 00000000..6ae81b1a --- /dev/null +++ b/config/plugins.toml @@ -0,0 +1,6 @@ + + +pluginsPath = "./plugins" + +[conf] +# @include "plugins/*.toml" \ No newline at end of file diff --git a/config/plugins/rspamd.toml b/config/plugins/rspamd.toml new file mode 100644 index 00000000..bc49732b --- /dev/null +++ b/config/plugins/rspamd.toml @@ -0,0 +1,9 @@ +["core/rspamd"] + enabled = false # ["receiver"] + url = "http://maildev.zone.wtf:11333/check" + interfaces = ["maildrop"] + ignoreOrigins = [] + maxSize = 5242880 + dropSpam = false + rewriteSubject = false + ip = true \ No newline at end of file diff --git a/lib/api/messages.js b/lib/api/messages.js index a65916d8..4a4a6ce4 100644 --- a/lib/api/messages.js +++ b/lib/api/messages.js @@ -2990,10 +2990,14 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti reason: 'submit', from: envelope.from, to: envelope.to, - sendTime + sendTime, + runPlugins: true }, (err, ...args) => { if (err || !args[0]) { + if (err && !err.code && err.name === 'SMTPReject') { + err.code = 'MessageRejected'; + } if (err) { err.code = err.code || 'ERRCOMPOSE'; } else { diff --git a/lib/api/submit.js b/lib/api/submit.js index e90212ad..8a17aa96 100644 --- a/lib/api/submit.js +++ b/lib/api/submit.js @@ -469,11 +469,16 @@ module.exports = (db, server, messageHandler, userHandler, settingsHandler) => { reason: 'submit', from: compiledEnvelope.from, to: compiledEnvelope.to, - sendTime + sendTime, + runPlugins: true }, (err, ...args) => { if (err || !args[0]) { if (err) { + if (!err.code && err.name === 'SMTPReject') { + err.code = 'MessageRejected'; + } + err.code = err.code || 'ERRCOMPOSE'; } err.responseCode = 500; diff --git a/lib/forward.js b/lib/forward.js index 96698001..8531764a 100644 --- a/lib/forward.js +++ b/lib/forward.js @@ -10,11 +10,15 @@ module.exports = (options, callback) => { targets: options.targets, - interface: 'forwarder' + interface: 'forwarder', + runPlugins: true }; let message = options.maildrop.push(mail, (err, ...args) => { if (err || !args[0]) { + if (err && !err.code && err.name === 'SMTPReject') { + err.code = 'MessageRejected'; + } if (err) { err.code = err.code || 'ERRCOMPOSE'; } diff --git a/lib/maildropper.js b/lib/maildropper.js index c458ad1e..cd4dd108 100644 --- a/lib/maildropper.js +++ b/lib/maildropper.js @@ -12,6 +12,9 @@ const addressparser = require('nodemailer/lib/addressparser'); const punycode = require('punycode/'); const crypto = require('crypto'); const tools = require('./tools'); +const plugins = require('./plugins'); +const PassThrough = require('stream').PassThrough; +const util = require('util'); class Maildropper { constructor(options) { @@ -105,6 +108,33 @@ class Maildropper { envelope.reason = options.reason; } + let messageInfo = { + 'message-id': '<>', + from: envelope.from || '<>', + to: [].concat(envelope.to || []).join(',') || '<>', + src: envelope.origin, + format() { + let values = []; + Object.keys(this).forEach(key => { + if (typeof this[key] === 'function' || typeof this[key] === 'undefined') { + return; + } + values.push(util.format('%s=%s', key, !/^"/.test(this[key]) && /\s/.test(this[key]) ? JSON.stringify(this[key]) : this[key])); + }); + return values.join(' '); + }, + keys() { + let data = {}; + Object.keys(this).forEach(key => { + if (typeof this[key] === 'function' || typeof this[key] === 'undefined') { + return; + } + data[key] = this[key]; + }); + return data; + } + }; + let deliveries = []; if (options.targets) { @@ -186,94 +216,126 @@ class Maildropper { messageSplitter.once('error', err => dkimStream.emit('error', err)); - this.store(id, dkimStream, err => { + plugins.handler.runHooks('message:store', [envelope, dkimStream], err => { if (err) { - return callback(err); - } - - if (this.checkLoop(envelope, deliveries)) { - // looped message - let err = new Error('Message loop detected'); - err.responseCode = 500; - err.code = 'ELOOP'; - return this.removeMessage(id, () => callback(err)); + if (dkimStream.readable) { + dkimStream.resume(); // let the original stream to end normally before displaying the error message + } + return setImmediate(() => callback(err)); } - envelope.headers = envelope.headers.getList(); - this.setMeta(id, envelope, err => { + this.store(id, dkimStream, err => { if (err) { + return callback(err); + } + + if (this.checkLoop(envelope, deliveries)) { + // looped message + let err = new Error('Message loop detected'); + err.responseCode = 500; + err.code = 'ELOOP'; return this.removeMessage(id, () => callback(err)); } - let date = new Date(); + plugins.handler.runHooks('message:queue', [envelope, messageInfo], err => { + if (err) { + return setImmediate(() => this.removeMessage(id, () => callback(err))); + } - for (let i = 0, len = deliveries.length; i < len; i++) { - let recipient = deliveries[i]; + envelope.headers = envelope.headers.getList(); + this.setMeta(id, envelope, err => { + if (err) { + return this.removeMessage(id, () => callback(err)); + } - let deliveryZone = options.zone || this.zone || 'default'; - let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, ''); + let date = new Date(); - seq++; - let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); - let delivery = { - id, - seq: deliverySeq, + for (let i = 0, len = deliveries.length; i < len; i++) { + let recipient = deliveries[i]; - // Actual delivery data - domain: recipientDomain, - sendingZone: deliveryZone, + let deliveryZone = options.zone || this.zone || 'default'; + let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, ''); - assigned: 'no', + seq++; + let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); + let delivery = { + id, + seq: deliverySeq, - // actual recipient address - recipient: recipient.to, + // Actual delivery data + domain: recipientDomain, + sendingZone: deliveryZone, - locked: false, - lockTime: 0, + assigned: 'no', - // earliest time to attempt delivery, defaults to now - queued: options.sendTime || date, + // actual recipient address + recipient: recipient.to, - // queued date might change but created should not - created: date - }; + locked: false, + lockTime: 0, - if (recipient.http) { - delivery.http = recipient.http; - delivery.targetUrl = recipient.targetUrl; - } + // earliest time to attempt delivery, defaults to now + queued: options.sendTime || date, - ['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => { - if (recipient[key]) { - delivery[key] = recipient[key]; - } - }); + // queued date might change but created should not + created: date + }; - if (recipient.skipSRS) { - delivery.skipSRS = true; - } + if (recipient.http) { + delivery.http = recipient.http; + delivery.targetUrl = recipient.targetUrl; + } - documents.push(delivery); - } + ['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => { + if (recipient[key]) { + delivery[key] = recipient[key]; + } + }); - this.db.senderDb.collection(this.collection).insertMany( - documents, - { - writeConcern: 1, - ordered: false - }, - err => { - if (err) { - return callback(err); + if (recipient.skipSRS) { + delivery.skipSRS = true; + } + + documents.push(delivery); } - callback(null, envelope); - } - ); + this.db.senderDb.collection(this.collection).insertMany( + documents, + { + writeConcern: 1, + ordered: false + }, + err => { + if (err) { + return callback(err); + } + + callback(null, envelope); + } + ); + }); + }); }); }); - messageSplitter.pipe(dkimStream); - return messageSplitter; + + if (options.runPlugins) { + // message submissions + let source = new PassThrough(); + let raw = new PassThrough(); + + plugins.handler.runAnalyzerHooks(envelope, source, raw); + raw.pipe(messageSplitter); + messageSplitter.pipe(dkimStream); + + source.on('error', err => raw.emit('error', err)); + raw.on('error', err => messageSplitter.emit('error', err)); + + return source; + } else { + // default, no plugins (autoreplies etc.) + messageSplitter.pipe(dkimStream); + return messageSplitter; + } } convertAddresses(addresses, withNames, addressList) { diff --git a/lib/plugins.js b/lib/plugins.js index 48d01ad0..4127dbe2 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -1,144 +1,19 @@ 'use strict'; const config = require('wild-config'); -const pathlib = require('path'); const log = require('npmlog'); +const PluginHandler = require('zone-mta/lib/plugin-handler'); const db = require('./db'); -const WD_PATH = pathlib.join(__dirname, '..'); -const CONFIG_PATH = config.configDirectory || WD_PATH; - -const hooks = new Map(); - -class PluginInstance { - constructor(key, config) { - this.db = db; - - this.key = key; - this.config = config || {}; - - this.logger = {}; - ['silly', 'verbose', 'info', 'http', 'warn', 'error', 'debug', 'err'].forEach(level => { - this.logger[level] = (...args) => { - switch (level) { - case 'debug': - level = 'verbose'; - break; - case 'err': - level = 'error'; - break; - } - log[level]('[' + key + ']', ...args); - }; - }); - } - - addHook(hook, handler) { - hook = (hook || '') - .toString() - .replace(/\s+/g, '') - .toLowerCase(); - if (!hook) { - return; - } - if (!hooks.has(hook)) { - hooks.set(hook, []); - } - hooks.get(hook).push({ plugin: this, handler }); - } - - init(done) { - if (!this.config.path) { - this.logger.debug('Plugin path not provided, skipping'); - return setImmediate(done); - } - try { - let pluginPath = this.config.path.replace(/\$WD/g, WD_PATH).replace(/\$CONFIG/g, CONFIG_PATH); - this.module = require(pluginPath); //eslint-disable-line global-require - } catch (E) { - this.logger.error('Failed to load plugin. %s', E.message); - return setImmediate(done); - } - - if (typeof this.module.init !== 'function') { - this.logger.debug('Init method not found'); - return setImmediate(done); - } - - try { - return this.module.init(this, err => { - if (err) { - this.logger.error('Initialization resulted with an error. %s', err.message); - } else { - this.logger.debug('Plugin "%s" initialized', this.module.title || this.key); - } - return setImmediate(done); - }); - } catch (E) { - this.logger.error('Failed executing init method. %s', E.message); - return setImmediate(done); - } - } -} - -module.exports.init = next => { - let keys = Object.keys(config.plugins || {}); - - let pos = 0; - let loadNextPlugin = () => { - if (pos >= keys.length) { - return setImmediate(next); - } - let key = keys[pos++]; - if (!config.plugins[key].enabled) { - return setImmediate(loadNextPlugin); - } - let plugin = new PluginInstance(key, config.plugins[key]); - plugin.init(loadNextPlugin); - }; - setImmediate(loadNextPlugin); -}; - -module.exports.runHooks = (hook, ...args) => { - let next = args.pop(); - - hook = (hook || '') - .toString() - .replace(/\s+/g, '') - .toLowerCase(); - - if (!hook || !hooks.has(hook)) { - return setImmediate(next); - } - - let handlers = hooks.get(hook); - let pos = 0; - let processHandler = () => { - if (pos >= handlers.length) { - return setImmediate(next); - } - let entry = handlers[pos++]; - let returned = false; - try { - entry.handler(...args, err => { - if (returned) { - return; - } - returned = true; - - if (err) { - entry.plugin.logger.error('Failed processing hook %s. %s', hook, err.message); - } - setImmediate(processHandler); - }); - } catch (E) { - if (returned) { - return; - } - returned = true; - entry.plugin.logger.error('Failed processing hook %s. %s', hook, E.message); - setImmediate(processHandler); - } - }; - setImmediate(processHandler); +module.exports.handler = false; + +module.exports.init = context => { + module.exports.handler = new PluginHandler({ + logger: log, + pluginsPath: config.plugins.pluginsPath, + plugins: config.plugins.conf, + context, + log: config.log, + db + }); }; diff --git a/package.json b/package.json index de807dec..fa5f2cdb 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "ajv": "8.11.0", "chai": "4.3.6", "docsify-cli": "4.4.4", - "eslint": "8.17.0", + "eslint": "8.18.0", "eslint-config-nodemailer": "1.2.0", "eslint-config-prettier": "8.5.0", "grunt": "1.5.3", @@ -31,7 +31,7 @@ "grunt-mocha-test": "0.13.3", "grunt-shell-spawn": "0.4.0", "grunt-wait": "0.3.0", - "imapflow": "1.0.99", + "imapflow": "1.0.100", "mailparser": "3.5.0", "mocha": "10.0.0", "request": "2.88.2", @@ -48,7 +48,7 @@ "base32.js": "0.1.0", "bcryptjs": "2.4.3", "bull": "3.29.3", - "fido2-lib": "3.2.0", + "fido2-lib": "3.2.4", "gelf": "2.0.1", "generate-password": "1.7.0", "he": "1.2.0", @@ -74,7 +74,7 @@ "node-html-parser": "5.3.3", "nodemailer": "6.7.5", "npmlog": "6.0.2", - "openpgp": "5.2.1", + "openpgp": "5.3.0", "pem-jwk": "2.0.0", "punycode": "2.1.1", "pwnedpasswords": "1.0.6", @@ -89,8 +89,9 @@ "unix-crypt-td-js": "1.1.4", "unixcrypt": "1.1.0", "uuid": "8.3.2", - "wild-config": "1.6.0", - "yargs": "17.5.1" + "wild-config": "1.6.1", + "yargs": "17.5.1", + "zone-mta": "3.4.0" }, "repository": { "type": "git", diff --git a/plugins/core/rspamd.js b/plugins/core/rspamd.js new file mode 100644 index 00000000..d7f2efeb --- /dev/null +++ b/plugins/core/rspamd.js @@ -0,0 +1,3 @@ +'use strict'; + +module.exports = require('zone-mta/plugins/core/rspamd'); diff --git a/plugins/example.js b/plugins/example.js index 734d2fcf..b6dfa64b 100644 --- a/plugins/example.js +++ b/plugins/example.js @@ -6,9 +6,8 @@ module.exports.init = (app, done) => { // do your initialization stuff here // init hook is called immediatelly after server is started - app.addHook('init', next => { - app.logger.info('Example plugin initialized. Value1=%s', app.config.value1); - next(); + app.addHook('init', async () => { + app.logger.info('Example plugin initialized. Value1=%s', JSON.stringify(app.config)); }); setImmediate(done); diff --git a/worker.js b/worker.js index 90e8fcd8..43703b25 100644 --- a/worker.js +++ b/worker.js @@ -98,14 +98,10 @@ db.connect(err => { } } - plugins.init(err => { - if (err) { - log.error('App', 'Failed to start plugins'); - errors.notify(err); - return setTimeout(() => process.exit(1), 3000); - } - - plugins.runHooks('init', () => { + plugins.init('receiver'); + plugins.handler.load(() => { + log.verbose('Plugins', 'Plugins loaded'); + plugins.handler.runHooks('init', [], () => { log.info('App', 'All servers started, ready to process some mail'); }); });