diff --git a/packages/protofy/package.json b/packages/protofy/package.json index d3996a64a..fd88e074f 100644 --- a/packages/protofy/package.json +++ b/packages/protofy/package.json @@ -17,9 +17,10 @@ }, "devDependencies": { "@types/jest": "~29.5.5", - "express": "^4.18.2", "aedes": "^0.50.0", + "express": "^4.18.2", "jest": "~29.7.0", + "mqtt": "^5.2.1", "ts-jest": "~29.1.1", "typescript": "~5.3.3", "zod": "^3.22.2", diff --git a/packages/protofy/src/AgentProtocol.ts b/packages/protofy/src/AgentProtocol.ts index cc11a27e9..534378652 100644 --- a/packages/protofy/src/AgentProtocol.ts +++ b/packages/protofy/src/AgentProtocol.ts @@ -3,9 +3,11 @@ import { Agent } from "./Agent"; export class AgentProtocol { agent: Agent; listeners: Function[]; - constructor(agent: Agent) { + options: any + constructor(agent: Agent, options?: any) { this.agent = agent; this.listeners = []; + this.options = options; } onMessage(cb: Function) { @@ -20,7 +22,7 @@ export class AgentProtocol { throw new Error('Not implemented'); } - static create(agent: Agent) { - return new AgentProtocol(agent); + static create(agent: Agent, options?: any) { + return new AgentProtocol(agent, options); } } \ No newline at end of file diff --git a/packages/protofy/src/protocols/mqtt.ts b/packages/protofy/src/protocols/mqtt.ts index 403e54379..c16542166 100644 --- a/packages/protofy/src/protocols/mqtt.ts +++ b/packages/protofy/src/protocols/mqtt.ts @@ -1,22 +1,27 @@ import { Agent } from "../Agent"; - -//options is for auth params and similar protocol-specific options -export default async (agent: Agent, params: any, options = {}) => { - const protocol = agent.getProtocol(); - if (protocol.type !== 'mqtt') { - throw new Error('Error: Invalid protocol type, expected http'); +import { AgentProtocol } from "../AgentProtocol"; +export class MQTTProtocol extends AgentProtocol { + mqttClient: any; + constructor(agent: Agent, mqttClient: any) { + super(agent); + this.mqttClient = mqttClient; } - if (!protocol.config || !protocol.config.url || !protocol.config.topic) { - throw new Error('Error: Missing URL or topic in protocol config'); - } + async send(params, options?) { + const agent = this.agent + const protocol = agent.getProtocol(); + if (protocol.type !== 'mqtt') { + throw new Error('Error: Invalid protocol type, expected http'); + } + + const { + topic, + } = protocol.config; - const { - url, - topic, - encoder = 'body', - serializer = 'json', - } = protocol.config; + this.mqttClient.publish(topic, JSON.stringify(params)); + } - -}; \ No newline at end of file + static create(agent: Agent, mqttClient: any) { + return new MQTTProtocol(agent, mqttClient); + } +} \ No newline at end of file diff --git a/packages/protofy/tests/mqtt.test.ts b/packages/protofy/tests/mqtt.test.ts index 0f658dfff..f73929b28 100644 --- a/packages/protofy/tests/mqtt.test.ts +++ b/packages/protofy/tests/mqtt.test.ts @@ -3,7 +3,8 @@ import aedes from 'aedes'; import { Agent } from '../src/Agent'; import { z } from 'zod'; import { zodToJsonSchema } from 'zod-to-json-schema'; -import mqttRunner from '../src/protocols/mqtt'; +import {MQTTProtocol} from '../src/protocols/mqtt'; +import * as mqtt from 'mqtt'; const aedesInstance = new aedes(); aedesInstance.authenticate = function (client, username, password, callback) { callback(null, true); }; @@ -12,15 +13,19 @@ let paramsSchema; let returnSchema; let agent; let server; +let mqttClient; describe('MQTT Agents', () => { - beforeEach(() => { + beforeAll(async () => { server = net.createServer((socket) => { aedesInstance.handle(socket); }); server.listen(12346); - + await new Promise((resolve) => setTimeout(resolve, 1000)); + mqttClient = mqtt.connect('mqtt://localhost:12346'); + await new Promise((resolve) => setTimeout(resolve, 1000)); + paramsSchema = z.object({ id: z.string().uuid(), name: z.string().min(1), @@ -52,12 +57,12 @@ describe('MQTT Agents', () => { }); }); - afterEach(() => { + afterAll(() => { server.close(); aedesInstance.close(); }); - it.skip('Should be able to run the agent through mqtt', async () => { + it('Should be able to run the agent through mqtt', async () => { // Variables para almacenar el estado y el mensaje publicado let messagePublished = false; let publishedPayload = null; @@ -76,8 +81,8 @@ describe('MQTT Agents', () => { age: 30, email: 'a@a.com', }; - - await mqttRunner(agent, payload); + const protocol = MQTTProtocol.create(agent, mqttClient); + await protocol.send(payload); await new Promise((resolve) => setTimeout(resolve, 100)); expect(messagePublished).toBe(true); expect(() => paramsSchema.parse(JSON.parse(publishedPayload))).not.toThrow(); diff --git a/yarn.lock b/yarn.lock index 5dc27023e..6ed580164 100644 --- a/yarn.lock +++ b/yarn.lock @@ -27873,10 +27873,12 @@ __metadata: resolution: "protofy@workspace:packages/protofy" dependencies: "@types/jest": "npm:~29.5.5" + aedes: "npm:^0.50.0" ajv: "npm:8.17.1" axios: "npm:^1.7.7" express: "npm:^4.18.2" jest: "npm:~29.7.0" + mqtt: "npm:^5.2.1" ts-jest: "npm:~29.1.1" typescript: "npm:~5.3.3" zod: "npm:^3.22.2"