Skip to content

Commit

Permalink
Merge branch 'main' of github.com:Protofy-xyz/Protofy
Browse files Browse the repository at this point in the history
  • Loading branch information
ap0k4 committed Oct 29, 2024
2 parents d959317 + 3329c00 commit 019ef1c
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 134 deletions.
34 changes: 34 additions & 0 deletions packages/agents/js/ProtoAgentsInterface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import fs from 'fs'
import path from 'path'

export class ProtoAgentInterface {
constructor() {
if (this.constructor === ProtoAgentInterface) {
throw new Error('Cannot instantiate abstract class ProtoAgentInterface');
}
}

configure(subsystems) {
throw new Error('Method "configure" must be implemented by subclasses');
}

configureFromFile(jsonPath) {
const absPath = path.join(process.cwd(), jsonPath);

const fileContents = fs.readFileSync(absPath, 'utf8');
const subsystems = JSON.parse(fileContents);
return this.configure(subsystems);
}

connect(mqttHost, mqttPort) {
throw new Error('Method "connect" must be implemented by subclasses');
}

pubMonitor(subsystemName, monitorName, value) {
throw new Error('Method "pubMonitor" must be implemented by subclasses');
}

handle(subsystemName, actionName, handler) {
throw new Error('Method "handle" must be implemented by subclasses');
}
}
130 changes: 130 additions & 0 deletions packages/agents/js/ProtoMqttAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import * as mqtt from 'mqtt';
import { genActionEndpoint, pubMonitor, register } from './bifrost';
import { ProtoAgentInterface } from './ProtoAgentsInterface';

export class ProtoMqttAgent extends ProtoAgentInterface {
name: string
client: mqtt.MqttClient
listeners: any
subsystems: Array<any>;
subsystemsHandlers: { [key: string]: { [key: string]: (payload: any) => void } };
type: string;

constructor(agentName: string) {
super()
this.name = agentName;
this.client = null as any;
this.listeners = {} as any
this.subsystems = [];
this.type = "mqtt";
this.subsystemsHandlers = {};
}

on(key: "connect" | "error" | "monitor_pub", cb: Function) {
this.listeners[key] = cb
}

configure(subsystems) {
this.subsystems = subsystems;
return this;
}

connect(mqttHost, mqttPort) {
const url = `mqtt://${mqttHost}:${mqttPort}`;
this.client = mqtt.connect(url)
try {
this.client.on('error', (error) => {
this.__consumerCallbacksChecker("error")
});

this.client.on('connect', () => {
register(this.client.publish.bind(this.client), this.name, this.subsystems)
this.__consumerCallbacksChecker("connect")
// Register the message handler
this.__onMessage();
});

return this
} catch (err) {
console.error("Cannot connect agent", err)
}

return null
}

pubMonitor(subsystemName, monitorName, value) {
const subsystem = this.subsystems.find(s => s.name === subsystemName);
if (!subsystem) {
throw new Error(`Subsystem '${subsystemName}' not found.`);
}

const action = subsystem.monitors.find(a => a.name === monitorName);
if (!action) {
throw new Error(`Action '${monitorName}' not found in subsystem '${subsystemName}'.`);
}

pubMonitor(this.client.publish.bind(this.client), this.name, subsystemName, monitorName, value)

// Consumer defined callback
this.__consumerCallbacksChecker('monitor_pub', value);
}

handle(subsystemName, actionName, handler) {
const subsystem = this.subsystems.find(s => s.name === subsystemName);
if (!subsystem) {
throw new Error(`Subsystem '${subsystemName}' not found.`);
}

const action = subsystem.actions.find(a => a.name === actionName);
if (!action) {
throw new Error(`Action '${actionName}' not found in subsystem '${subsystemName}'.`);
}

if (!this.subsystemsHandlers[subsystemName]) {
this.subsystemsHandlers[subsystemName] = {};
}

this.subsystemsHandlers[subsystemName][actionName] = handler;

// Register action subscriber
const topic = genActionEndpoint(this.name, subsystemName, actionName);
this.client.subscribe(topic, (err) => {
if (err) {
console.error(`Failed to subscribe to topic: ${topic}`, err);
} else {
console.log(`Subscribed to topic: ${topic}`);
}
});

console.log(`Handler assigned for action '${actionName}' in subsystem '${subsystemName}'`);
}

__onMessage() {
this.client.on('message', (topic, message) => {
const payload = message.toString();
const handlers = this.subsystemsHandlers;

for (const [subsystemName, actions] of Object.entries(handlers)) {
for (const [actionName, handler] of Object.entries(actions)) {
const expectedTopic = genActionEndpoint(this.name, subsystemName, actionName);
if (topic === expectedTopic) {
handler(payload);
return;
}
}
}

console.log(`No handler found for topic: ${topic}`);
});
}

__consumerCallbacksChecker(name, value = null) {
if (typeof this.listeners[name] === 'function') {
if (value !== null) {
this.listeners[name](value);
} else {
this.listeners[name]();
}
}
}
}
134 changes: 9 additions & 125 deletions packages/agents/js/index.ts
Original file line number Diff line number Diff line change
@@ -1,128 +1,12 @@
import * as mqtt from 'mqtt';
import { genActionEndpoint, pubMonitor, register } from './bifrost';
import { ProtoMqttAgent } from "./ProtoMqttAgent";

export class ProtoMqttAgent {
name: string
client: mqtt.MqttClient
listeners: any
subsystems: Array<any>;
subsystemsHandlers: { [key: string]: { [key: string]: (payload: any) => void } };
type: string;

constructor(agentName) {
this.name = agentName;
this.client = null as any;
this.listeners = {} as any
this.subsystems = [];
this.type = "mqtt";
this.subsystemsHandlers = {};
}

on(key: "connect" | "error" | "monitor_pub", cb: Function) {
this.listeners[key] = cb
}

configure(subsystems) {
this.subsystems = subsystems;
return this;
}

connect(mqttHost, mqttPort) {
const url = `mqtt://${mqttHost}:${mqttPort}`;
this.client = mqtt.connect(url)
try {
this.client.on('error', (error) => {
this.__consumerCallbacksChecker("error")
});

this.client.on('connect', () => {
register(this.client.publish.bind(this.client), this.name, this.subsystems)
this.__consumerCallbacksChecker("connect")
// Register the message handler
this.__onMessage();
});

return this
} catch (err) {
console.error("Cannot connect agent", err)
}

return null
export class ProtoAgent {
static mqtt(name: string): ProtoMqttAgent {
return new ProtoMqttAgent(name);
}

pubMonitor(subsystemName, monitorName, value) {
const subsystem = this.subsystems.find(s => s.name === subsystemName);
if (!subsystem) {
throw new Error(`Subsystem '${subsystemName}' not found.`);
}

const action = subsystem.monitors.find(a => a.name === monitorName);
if (!action) {
throw new Error(`Action '${monitorName}' not found in subsystem '${subsystemName}'.`);
}

pubMonitor(this.client.publish.bind(this.client), this.name, subsystemName, monitorName, value)

// Consumer defined callback
this.__consumerCallbacksChecker('monitor_pub', value);
}

handle(subsystemName, actionName, handler) {
const subsystem = this.subsystems.find(s => s.name === subsystemName);
if (!subsystem) {
throw new Error(`Subsystem '${subsystemName}' not found.`);
}

const action = subsystem.actions.find(a => a.name === actionName);
if (!action) {
throw new Error(`Action '${actionName}' not found in subsystem '${subsystemName}'.`);
}

if (!this.subsystemsHandlers[subsystemName]) {
this.subsystemsHandlers[subsystemName] = {};
}

this.subsystemsHandlers[subsystemName][actionName] = handler;

// Register action subscriber
const topic = genActionEndpoint(this.name, subsystemName, actionName);
this.client.subscribe(topic, (err) => {
if (err) {
console.error(`Failed to subscribe to topic: ${topic}`, err);
} else {
console.log(`Subscribed to topic: ${topic}`);
}
});

console.log(`Handler assigned for action '${actionName}' in subsystem '${subsystemName}'`);
}

__onMessage() {
this.client.on('message', (topic, message) => {
const payload = message.toString();
const handlers = this.subsystemsHandlers;

for (const [subsystemName, actions] of Object.entries(handlers)) {
for (const [actionName, handler] of Object.entries(actions)) {
const expectedTopic = genActionEndpoint(this.name, subsystemName, actionName);
if (topic === expectedTopic) {
handler(payload);
return;
}
}
}

console.log(`No handler found for topic: ${topic}`);
});
}

__consumerCallbacksChecker(name, value = null) {
if (typeof this.listeners[name] === 'function') {
if (value !== null) {
this.listeners[name](value);
} else {
this.listeners[name]();
}
}
}
}
// Future HTTP agent implementation
// static http(name) {
// return new ProtoHttpAgent(name);
// }
}
6 changes: 4 additions & 2 deletions packages/app/bundles/masks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import logsMasks from 'protolib/bundles/logs/masks'
import playwrightMasks from 'protolib/bundles/playwright/masks'
import networkMasks from 'protolib/bundles/network/masks'
import stateMachineMasks from 'protolib/bundles/stateMachines/masks'
import agentsMasks from 'protolib/bundles/agents/agents/masks'
import { paths } from './flows';

export const getFlowsCustomComponents = (path: string, queryParams: {}) => {
Expand Down Expand Up @@ -58,8 +59,9 @@ export const getFlowsCustomComponents = (path: string, queryParams: {}) => {
...discordMasks,
...logsMasks,
...playwrightMasks,
...networkMasks,
...stateMachineMasks
...networkMasks,
...stateMachineMasks,
...agentsMasks
]
return []
}
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions packages/protolib/src/bundles/agents/agents/context/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import CreateAgent from "./CreateAgent"
import spawnAgent from "./SpawnAgent"

export default {
CreateAgent
spawnAgent
}
28 changes: 28 additions & 0 deletions packages/protolib/src/bundles/agents/agents/context/spawnAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ProtoAgent } from 'agents'

export default function (options: {
agentName: string,
connectionType: string,
subsystems: object[],
host: string,
hostPort: number,
onConnect: () => void
}) {
const {
agentName,
connectionType,
subsystems,
host,
hostPort,
onConnect,
} = options

switch (connectionType) {
case "mqtt":
const agent = ProtoAgent.mqtt(agentName).configure(subsystems)
agent.on("connect", onConnect)
agent.connect(host, hostPort)
default:
return ProtoAgent.mqtt(agentName)
}
}
Loading

0 comments on commit 019ef1c

Please sign in to comment.