Skip to content

Commit

Permalink
Added partial support for running Zone-mta plugins for message queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Jun 21, 2022
1 parent 5da63a2 commit 73e0182
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 221 deletions.
2 changes: 1 addition & 1 deletion config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ processes=1
#cipher="aes192" # only for decrypting legacy values (if there are any)

[plugins]
# @include "plugins/*.toml"
# @include "plugins.toml"

[tasks]
# if enabled then process jobs like deleting expired messages etc
Expand Down
6 changes: 6 additions & 0 deletions config/plugins.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@


pluginsPath = "./plugins"

[conf]
# @include "plugins/*.toml"
9 changes: 9 additions & 0 deletions config/plugins/rspamd.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
["core/rspamd"]
enabled = false # ["receiver"]
url = "http://maildev.zone.wtf:11333/check"
interfaces = ["maildrop"]
ignoreOrigins = []
maxSize = 5242880
dropSpam = false
rewriteSubject = false
ip = true
6 changes: 5 additions & 1 deletion lib/api/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -2990,10 +2990,14 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
reason: 'submit',
from: envelope.from,
to: envelope.to,
sendTime
sendTime,
runPlugins: true
},
(err, ...args) => {
if (err || !args[0]) {
if (err && !err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}
if (err) {
err.code = err.code || 'ERRCOMPOSE';
} else {
Expand Down
7 changes: 6 additions & 1 deletion lib/api/submit.js
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,16 @@ module.exports = (db, server, messageHandler, userHandler, settingsHandler) => {
reason: 'submit',
from: compiledEnvelope.from,
to: compiledEnvelope.to,
sendTime
sendTime,
runPlugins: true
},
(err, ...args) => {
if (err || !args[0]) {
if (err) {
if (!err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}

err.code = err.code || 'ERRCOMPOSE';
}
err.responseCode = 500;
Expand Down
6 changes: 5 additions & 1 deletion lib/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ module.exports = (options, callback) => {

targets: options.targets,

interface: 'forwarder'
interface: 'forwarder',
runPlugins: true
};

let message = options.maildrop.push(mail, (err, ...args) => {
if (err || !args[0]) {
if (err && !err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}
if (err) {
err.code = err.code || 'ERRCOMPOSE';
}
Expand Down
188 changes: 125 additions & 63 deletions lib/maildropper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ const addressparser = require('nodemailer/lib/addressparser');
const punycode = require('punycode/');
const crypto = require('crypto');
const tools = require('./tools');
const plugins = require('./plugins');
const PassThrough = require('stream').PassThrough;
const util = require('util');

class Maildropper {
constructor(options) {
Expand Down Expand Up @@ -105,6 +108,33 @@ class Maildropper {
envelope.reason = options.reason;
}

let messageInfo = {
'message-id': '<>',
from: envelope.from || '<>',
to: [].concat(envelope.to || []).join(',') || '<>',
src: envelope.origin,
format() {
let values = [];
Object.keys(this).forEach(key => {
if (typeof this[key] === 'function' || typeof this[key] === 'undefined') {
return;
}
values.push(util.format('%s=%s', key, !/^"/.test(this[key]) && /\s/.test(this[key]) ? JSON.stringify(this[key]) : this[key]));
});
return values.join(' ');
},
keys() {
let data = {};
Object.keys(this).forEach(key => {
if (typeof this[key] === 'function' || typeof this[key] === 'undefined') {
return;
}
data[key] = this[key];
});
return data;
}
};

let deliveries = [];

if (options.targets) {
Expand Down Expand Up @@ -186,94 +216,126 @@ class Maildropper {

messageSplitter.once('error', err => dkimStream.emit('error', err));

this.store(id, dkimStream, err => {
plugins.handler.runHooks('message:store', [envelope, dkimStream], err => {
if (err) {
return callback(err);
}

if (this.checkLoop(envelope, deliveries)) {
// looped message
let err = new Error('Message loop detected');
err.responseCode = 500;
err.code = 'ELOOP';
return this.removeMessage(id, () => callback(err));
if (dkimStream.readable) {
dkimStream.resume(); // let the original stream to end normally before displaying the error message
}
return setImmediate(() => callback(err));
}

envelope.headers = envelope.headers.getList();
this.setMeta(id, envelope, err => {
this.store(id, dkimStream, err => {
if (err) {
return callback(err);
}

if (this.checkLoop(envelope, deliveries)) {
// looped message
let err = new Error('Message loop detected');
err.responseCode = 500;
err.code = 'ELOOP';
return this.removeMessage(id, () => callback(err));
}

let date = new Date();
plugins.handler.runHooks('message:queue', [envelope, messageInfo], err => {
if (err) {
return setImmediate(() => this.removeMessage(id, () => callback(err)));
}

for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];
envelope.headers = envelope.headers.getList();
this.setMeta(id, envelope, err => {
if (err) {
return this.removeMessage(id, () => callback(err));
}

let deliveryZone = options.zone || this.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');
let date = new Date();

seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];

// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
let deliveryZone = options.zone || this.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');

assigned: 'no',
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,

// actual recipient address
recipient: recipient.to,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,

locked: false,
lockTime: 0,
assigned: 'no',

// earliest time to attempt delivery, defaults to now
queued: options.sendTime || date,
// actual recipient address
recipient: recipient.to,

// queued date might change but created should not
created: date
};
locked: false,
lockTime: 0,

if (recipient.http) {
delivery.http = recipient.http;
delivery.targetUrl = recipient.targetUrl;
}
// earliest time to attempt delivery, defaults to now
queued: options.sendTime || date,

['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => {
if (recipient[key]) {
delivery[key] = recipient[key];
}
});
// queued date might change but created should not
created: date
};

if (recipient.skipSRS) {
delivery.skipSRS = true;
}
if (recipient.http) {
delivery.http = recipient.http;
delivery.targetUrl = recipient.targetUrl;
}

documents.push(delivery);
}
['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => {
if (recipient[key]) {
delivery[key] = recipient[key];
}
});

this.db.senderDb.collection(this.collection).insertMany(
documents,
{
writeConcern: 1,
ordered: false
},
err => {
if (err) {
return callback(err);
if (recipient.skipSRS) {
delivery.skipSRS = true;
}

documents.push(delivery);
}

callback(null, envelope);
}
);
this.db.senderDb.collection(this.collection).insertMany(
documents,
{
writeConcern: 1,
ordered: false
},
err => {
if (err) {
return callback(err);
}

callback(null, envelope);
}
);
});
});
});
});
messageSplitter.pipe(dkimStream);
return messageSplitter;

if (options.runPlugins) {
// message submissions
let source = new PassThrough();
let raw = new PassThrough();

plugins.handler.runAnalyzerHooks(envelope, source, raw);
raw.pipe(messageSplitter);
messageSplitter.pipe(dkimStream);

source.on('error', err => raw.emit('error', err));
raw.on('error', err => messageSplitter.emit('error', err));

return source;
} else {
// default, no plugins (autoreplies etc.)
messageSplitter.pipe(dkimStream);
return messageSplitter;
}
}

convertAddresses(addresses, withNames, addressList) {
Expand Down
Loading

0 comments on commit 73e0182

Please sign in to comment.