Skip to content

Commit

Permalink
feat(whtsapp): implementado rotina de envio de mensagem com rabbitmq.
Browse files Browse the repository at this point in the history
  • Loading branch information
ldurans committed Jul 24, 2022
1 parent 32a6407 commit 3f6ed69
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 163 deletions.
1 change: 1 addition & 0 deletions backend/src/@types/global.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
declare namespace NodeJS {
interface Global {
_loopDb: any;
rabbitWhatsapp: any;
}
}
6 changes: 6 additions & 0 deletions backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ if (process.env.AMQP_URL) {
logger.info("Rabbit started", process.env.AMQP_URL);
app.rabbit = rabbit;
})();
(async () => {
const rabbitWhatsapp = new RabbitmqServer(process.env.AMQP_URL || "");
await rabbitWhatsapp.start();
logger.info("Rabbit started", process.env.AMQP_URL);
global.rabbitWhatsapp = rabbitWhatsapp;
})();
Consumer360();
MessengerConsumer();
}
Expand Down
36 changes: 35 additions & 1 deletion backend/src/libs/rabbitmq-server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable no-useless-constructor */
import { Connection, Channel, connect, Message } from "amqplib";
import { logger } from "../utils/logger";
import { sleepRandomTime } from "../utils/sleepRandomTime";

export default class RabbitmqServer {
private conn: Connection;
Expand All @@ -15,9 +16,18 @@ export default class RabbitmqServer {
this.channel = await this.conn.createChannel();
}

// async createExchange(name: string): Promise<void> {
// // const ex = this.channel.assertExchange(name, type, { durable: true });
// // console.log("Ex", ex);
// // await this.channel.bindQueue(name, name, name);
// }

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async publishInQueue(queue: string, message: string) {
return this.channel.sendToQueue(queue, Buffer.from(message));
await this.channel.assertQueue(queue, { durable: true });
return this.channel.sendToQueue(queue, Buffer.from(message), {
persistent: true
});
}

async publishInExchange(
Expand All @@ -30,6 +40,30 @@ export default class RabbitmqServer {
});
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async consumeWhatsapp(
queue: string,
callback: (message: Message) => Promise<void>
) {
this.channel.prefetch(1, false);
this.channel.consume(queue, async (message: any) => {
try {
await callback(message);
// delay para processamento da mensagem
await sleepRandomTime({
minMilliseconds: Number(process.env.MIN_SLEEP_INTERVAL || 500),
maxMilliseconds: Number(process.env.MAX_SLEEP_INTERVAL || 2000)
});
this.channel.ack(message);
return;
} catch (error) {
this.channel.nack(message);
logger.error("consumeWhatsapp", error);
// this.channel.close();
}
});
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async consume(queue: string, callback: (message: Message) => void) {
return this.channel.consume(queue, (message: any) => {
Expand Down
60 changes: 31 additions & 29 deletions backend/src/libs/wbot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import { Client, LocalAuth, DefaultOptions } from "whatsapp-web.js";
// import slugify from "slugify";
import { getIO } from "./socket";
import Whatsapp from "../models/Whatsapp";
import { getValue, setValue } from "./redisClient";
import { setValue } from "./redisClient";
import { logger } from "../utils/logger";
import SyncUnreadMessagesWbot from "../services/WbotServices/SyncUnreadMessagesWbot";
import Queue from "./Queue";
// import Queue from "./Queue";
import WhatsappConsumer from "../services/WbotServices/WhatsappConsumer";

interface Session extends Client {
id: number;
Expand All @@ -17,28 +18,28 @@ const sessions: Session[] = [];

// const checking: any = {};

const checkMessages = async (wbot: Session, tenantId: number | string) => {
// if (checking[tenantId]) return;
// checking[tenantId] = true;
try {
const isConnectStatus = await getValue(`wbotStatus-${tenantId}`);
logger.info(
"wbot:checkMessages:status",
wbot.id,
tenantId,
isConnectStatus
);
// const checkMessages = async (wbot: Session, tenantId: number | string) => {
// // if (checking[tenantId]) return;
// // checking[tenantId] = true;
// try {
// const isConnectStatus = await getValue(`wbotStatus-${tenantId}`);
// logger.info(
// "wbot:checkMessages:status",
// wbot.id,
// tenantId,
// isConnectStatus
// );

if (isConnectStatus === "CONNECTED" || !isConnectStatus) {
logger.info("wbot:connected:checkMessages", wbot, tenantId);
// logger.info(`checking new message tenant ${tenantId}`);
// await SendMessagesSystemWbot(wbot, tenantId);
Queue.add("SendMessages", { sessionId: wbot.id, tenantId });
}
} catch (error) {
logger.error(`ERROR: checkMessages Tenant: ${tenantId}::`, error);
}
};
// if (isConnectStatus === "CONNECTED" || !isConnectStatus) {
// logger.info("wbot:connected:checkMessages", wbot, tenantId);
// // logger.info(`checking new message tenant ${tenantId}`);
// // await SendMessagesSystemWbot(wbot, tenantId);
// Queue.add("SendMessages", { sessionId: wbot.id, tenantId });
// }
// } catch (error) {
// logger.error(`ERROR: checkMessages Tenant: ${tenantId}::`, error);
// }
// };

// const apagarPastaSessao = async (whatsapp: Whatsapp): Promise<void> => {
// const pathRoot = path.resolve(__dirname, "..", "..", "WWebJS");
Expand Down Expand Up @@ -240,12 +241,13 @@ export const initWbot = async (whatsapp: Whatsapp): Promise<Session> => {
});
});
setValue(`sendingMessages_tenant_${whatsapp.tenantId}`, false);
setInterval(
checkMessages,
+(process.env.CHECK_INTERVAL || 5000),
wbot,
tenantId
);
// setInterval(
// checkMessages,
// +(process.env.CHECK_INTERVAL || 5000),
// wbot,
// tenantId
// );
WhatsappConsumer(tenantId);
} catch (err) {
logger.error(`initWbot error | Error: ${err}`);
// 'Error: Protocol error (Runtime.callFunctionOn): Session closed.'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ const CreateMessageSystemService = async ({
type: "chat:create",
payload: messageCreated
});

global.rabbitWhatsapp.publishInQueue(
`whatsapp::${tenantId}`,
JSON.stringify({
...messageCreated.toJSON(),
contact: ticket.contact.toJSON()
})
);
}
} catch (error) {
logger.error("CreateMessageSystemService", error);
Expand Down
4 changes: 2 additions & 2 deletions backend/src/services/WbotServices/SendMessagesSystemWbot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ const SendMessagesSystemWbot = async (

// delay para processamento da mensagem
await sleepRandomTime({
minMilliseconds: Number(process.env.MIN_SLEEP_INTERVAL || 2000),
maxMilliseconds: Number(process.env.MAX_SLEEP_INTERVAL || 5000)
minMilliseconds: Number(process.env.MIN_SLEEP_INTERVAL || 500),
maxMilliseconds: Number(process.env.MAX_SLEEP_INTERVAL || 2000)
});

logger.info("sendMessage", sendedMessage.id.id);
Expand Down
73 changes: 73 additions & 0 deletions backend/src/services/WbotServices/WhatsappConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* eslint-disable no-restricted-syntax */
/* eslint-disable no-await-in-loop */
import { join } from "path";
import { MessageMedia } from "whatsapp-web.js";
import Message from "../../models/Message";
import { logger } from "../../utils/logger";
// import { sleepRandomTime } from "../../utils/sleepRandomTime";
import { getWbot } from "../../libs/wbot";
// import SetTicketMessagesAsRead from "../../helpers/SetTicketMessagesAsRead";

const SendMessage = async (message: Message): Promise<void> => {
const wbot = getWbot(message.ticket.whatsappId);
let sendedMessage;

// logger.info(
// `SystemWbot SendMessages | Count: ${messages.length} | Tenant: ${tenantId} `
// );

let quotedMsgSerializedId: string | undefined;
const { ticket } = message;
const contactNumber = message.contact.number;
const typeGroup = ticket?.isGroup ? "g" : "c";
const chatId = `${contactNumber}@${typeGroup}.us`;

if (message.quotedMsg) {
quotedMsgSerializedId = `${message.quotedMsg.fromMe}_${contactNumber}@${typeGroup}.us_${message.quotedMsg.messageId}`;
}

if (message.mediaType !== "chat" && message.mediaName) {
const customPath = join(__dirname, "..", "..", "..", "public");
const mediaPath = join(customPath, message.mediaName);
const newMedia = MessageMedia.fromFilePath(mediaPath);
sendedMessage = await wbot.sendMessage(chatId, newMedia, {
quotedMessageId: quotedMsgSerializedId,
linkPreview: false, // fix: send a message takes 2 seconds when there's a link on message body
sendAudioAsVoice: true
});
logger.info("rabbit::sendMessage media");
} else {
sendedMessage = await wbot.sendMessage(chatId, message.body, {
quotedMessageId: quotedMsgSerializedId,
linkPreview: false // fix: send a message takes 2 seconds when there's a link on message body
});
logger.info("rabbit::sendMessage text");
}

// enviar old_id para substituir no front a mensagem corretamente
const messageToUpdate = {
...message,
...sendedMessage,
id: message.id,
messageId: sendedMessage.id.id,
status: "sended"
};

await Message.update({ ...messageToUpdate }, { where: { id: message.id } });

logger.info("rabbit::Message Update");
// await SetTicketMessagesAsRead(ticket);

logger.info("rabbit::sendMessage", sendedMessage.id.id);
// throw new Error("SIMULANDO ERRO");
};

const WhatsappConsumer = tenantId => {
const queue = `whatsapp::${tenantId}`;
global.rabbitWhatsapp.consumeWhatsapp(queue, async message => {
const content = JSON.parse(message.content.toString());
await SendMessage(content);
});
};

export default WhatsappConsumer;
1 change: 1 addition & 0 deletions backend/src/services/WbotServices/wbotMessageListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ interface Session extends Client {
}

const wbotMessageListener = (wbot: Session): void => {
// const queue = `whatsapp::${wbot.id}`;
wbot.on("message_create", async msg => {
// desconsiderar atualização de status
if (msg.isStatus) {
Expand Down
Loading

0 comments on commit 3f6ed69

Please sign in to comment.