From 8a2cc4693270752f24ab994d3f121af4b43575d3 Mon Sep 17 00:00:00 2001 From: zobaidul kazi Date: Tue, 21 May 2024 09:51:03 +0600 Subject: [PATCH] email and email receive bode successfully --- packages/services/email/src/EmailBody.ts | 5 ++- packages/services/email/src/utils/receiver.ts | 43 +++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/packages/services/email/src/EmailBody.ts b/packages/services/email/src/EmailBody.ts index e8d55ca..ef3fc1b 100644 --- a/packages/services/email/src/EmailBody.ts +++ b/packages/services/email/src/EmailBody.ts @@ -4,7 +4,10 @@ const emailBody = async (req: Request, res: Response, next: NextFunction) => { try { const { email } = req.body; - console.log(email); + res.status(200).json({ + message: "Email body fetched successfully", + data: email, + }); } catch (error) { next(error); } diff --git a/packages/services/email/src/utils/receiver.ts b/packages/services/email/src/utils/receiver.ts index 5553cc7..3dfeb62 100644 --- a/packages/services/email/src/utils/receiver.ts +++ b/packages/services/email/src/utils/receiver.ts @@ -1,7 +1,42 @@ import amqp from "amqplib"; -import {defaultSender, transporter} from '@/config' +import { defaultSender, transporter } from "@/config"; +const receiveFromQueue = async ( + queue: string, + callback: (message: string) => void +) => { + try { + const connection = await amqp.connect("amqp://localhost"); -const receiveFromQueue = async (queue: string, callback: (message: string) => void) => { - -} \ No newline at end of file + const channel = await connection.createChannel(); + + await channel.assertQueue(queue); + + channel.consume(queue, (message) => { + if (message) { + callback(message.content.toString()); + } + + channel.ack(message); + + connection.close(); + }); + } catch (error) { + console.log(error); + } +}; + +export const sendToQueue = async (queue: string, message: string) => { + try { + const connection = await amqp.connect("amqp://localhost"); + const channel = await connection.createChannel(); + await channel.assertQueue(queue); + await channel.sendToQueue(queue, Buffer.from(message)); + await channel.close(); + await connection.close(); + } catch (error) { + console.log(error); + } +}; + +export default receiveFromQueue;