From ea77234150b96c3fc51945a3b7210283b9799e78 Mon Sep 17 00:00:00 2001 From: Tomas Alabes Date: Tue, 14 Jan 2020 15:34:09 -0300 Subject: [PATCH] Test, minor fix, 1.0 release --- README.md | 34 ++++++- package.json | 4 +- src/kafka-pubsub.ts | 171 +++++++++++++++++++--------------- src/test/InMemoryKafka.ts | 93 ++++++++++++++++++ src/test/kafka-pubsub.spec.ts | 51 ++++------ tsconfig.json | 3 +- 6 files changed, 240 insertions(+), 116 deletions(-) create mode 100644 src/test/InMemoryKafka.ts diff --git a/README.md b/README.md index b720c0e..ab66796 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Apollo graphql subscriptions over Kafka, using [kafkajs](https://github.com/tulios/kafkajs). Inspired on [graphql-kafka-subscriptions](https://github.com/ancashoria/graphql-kafka-subscriptions). -There's one producer and one consumer for each node instance. Communication happens over a single kafka topic. +Communication is done through 1 kafka topic specified in the `KafkaPubSub` `create` function. Then +channels are used to identify the right subscription. ## Installation @@ -22,10 +23,10 @@ There's one producer and one consumer for each node instance. Communication happ import { Kafka } from 'kafkajs'; import { KafkaPubSub } from 'graphql-kafkajs-subscriptions' -export const pubsub = new KafkaPubSub({ - topic: 'anything', +export const pubsub = KafkaPubSub.create({ + topic: 'my-topic', kafka: new Kafka({/* ... */}) - globalConfig: {} // options passed directly to the consumer and producer + groupIdPrefix: "my-group-id-prefix" // used for kafka pub/sub }) ``` @@ -38,7 +39,24 @@ export const pubsub = new KafkaPubSub({ return payload; }, subscribe: (_, args) => { - return externalPubSub.asyncIterator(yourChannel); + return pubsub.asyncIterator("my channel"); + } + } + }; +``` + +You can also use the subscription payload for the channel. + +```javascript +{ + collaboration: { + resolve: (payload: YourType) => { + // what you publish will end up passing through here and to the client + return payload; + }, + subscribe: (_, args) => { + // this is called from the client + return pubsub.asyncIterator(`channel-${args.myId}`); } } }; @@ -46,8 +64,14 @@ export const pubsub = new KafkaPubSub({ ### Publication +Somewhere in your code, you call this: + ```javascript pubsub.publish("my channel", { /* your event data */ }); ``` + +This ends up publishing the event to kafka (to the topic you used to create the `kafkaPubSub`) +and received by all consumers. The consumer which is listening to `my channel` will send it +to the client. diff --git a/package.json b/package.json index 48cca6a..9be9f3a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "graphql-kafkajs-subscriptions", - "version": "0.1.1", + "version": "1.0.0", "description": "Apollo graphql subscription over Kafka protocol", "main": "dist/index.js", "typings": "dist/index.d.ts", @@ -36,7 +36,7 @@ "js" ], "transform": { - "\\.(ts|tsx)$": "/node_modules/ts-jest/preprocessor.js" + "\\.(ts|tsx)$": "ts-jest" }, "testRegex": "(/__tests__/.*|\\.(test|spec))\\.(tsx?|jsx?)$" }, diff --git a/src/kafka-pubsub.ts b/src/kafka-pubsub.ts index d03d8ee..b5cf474 100644 --- a/src/kafka-pubsub.ts +++ b/src/kafka-pubsub.ts @@ -1,86 +1,107 @@ -import { PubSubEngine } from 'graphql-subscriptions'; -import { Consumer, Kafka, Producer } from 'kafkajs'; -import { PubSubAsyncIterator } from './pubsub-async-iterator'; +import { PubSubEngine } from "graphql-subscriptions"; +import { Consumer, Kafka, Producer } from "kafkajs"; +import { PubSubAsyncIterator } from "./pubsub-async-iterator"; + +interface KafkaPubSubInput { + kafka: Kafka; + topic: string; + groupIdPrefix: string; +} export class KafkaPubSub implements PubSubEngine { - private client: Kafka; - private subscriptionMap: { [subId: number]: [string, Function] }; - private channelSubscriptions: { [channel: string]: number[] }; - private producer: Producer; - private consumer: Consumer; - private topic: string; + private client: Kafka; + private subscriptionMap: { [subId: number]: [string, Function] }; + private channelSubscriptions: { [channel: string]: number[] }; + private producer: Producer; + private consumer: Consumer; + private topic: string; + + public static async create({ + kafka, + topic, + groupIdPrefix + }: KafkaPubSubInput): Promise { + const pubsub = new KafkaPubSub({ kafka, topic, groupIdPrefix }); + await pubsub.connectProducer(); + await pubsub.runConsumer(pubsub.topic); + return pubsub; + } - constructor({ kafka, topic }: { kafka: Kafka; topic: string }) { - this.client = kafka; - this.subscriptionMap = {}; - this.channelSubscriptions = {}; - this.topic = topic; - this.producer = this.client.producer(); - this.consumer = this.client.consumer({ - groupId: `pr-dt-collab-${Math.ceil(Math.random() * 9999)}` - }); + private constructor({ kafka, topic, groupIdPrefix }: KafkaPubSubInput) { + this.client = kafka; + this.subscriptionMap = {}; + this.channelSubscriptions = {}; + this.topic = topic; + this.producer = this.client.producer(); + this.consumer = this.client.consumer({ + // we need all consumers listening to all messages + groupId: `${groupIdPrefix}-${Math.ceil(Math.random() * 9999)}` + }); + } - this.createProducer(); - this.createConsumer(topic); - } + public async publish(channel: string, payload: any): Promise { + await this.producer.send({ + messages: [ + { value: Buffer.from(JSON.stringify({ channel, ...payload })) } + ], + topic: this.topic + }); + } - public async publish(channel: string, payload: any): Promise { - await this.producer.send({ - messages: [{value:Buffer.from(JSON.stringify({channel, ...payload}))}], - topic: this.topic - }); - } + public async subscribe( + channel: string, + onMessage: Function, + options?: any + ): Promise { + const index = Object.keys(this.subscriptionMap).length; + this.subscriptionMap[index] = [channel, onMessage]; + this.channelSubscriptions[channel] = ( + this.channelSubscriptions[channel] || [] + ).concat(index); + return index; + } - public async subscribe(channel: string, onMessage: Function, options?: any): Promise { - const index = Object.keys(this.subscriptionMap).length; - this.subscriptionMap[index] = [channel, onMessage]; - this.channelSubscriptions[channel] = (this.channelSubscriptions[channel] || []).concat( - index - ); - return index; - } - - public unsubscribe(index: number) { - const [channel] = this.subscriptionMap[index]; - this.channelSubscriptions[channel] = this.channelSubscriptions[channel].filter( - subId => subId !== index - ); - } + public unsubscribe(index: number) { + const [channel] = this.subscriptionMap[index]; + this.channelSubscriptions[channel] = this.channelSubscriptions[ + channel + ].filter(subId => subId !== index); + } - public asyncIterator(triggers: string | string[]): AsyncIterator { - return new PubSubAsyncIterator(this, triggers); - } + public asyncIterator(triggers: string | string[]): AsyncIterator { + return new PubSubAsyncIterator(this, triggers); + } - private onMessage(channel: string, message: any) { - const subscriptions = this.channelSubscriptions[channel]; - if (!subscriptions) { - return; - } // no subscribers, don't publish msg - for (const subId of subscriptions) { - const [cnl, listener] = this.subscriptionMap[subId]; - listener(message); - } - } + private onMessage(channel: string, message: any) { + const subscriptions = this.channelSubscriptions[channel]; + if (!subscriptions) { + return; + } // no subscribers, don't publish msg + for (const subId of subscriptions) { + const [cnl, listener] = this.subscriptionMap[subId]; + listener(message); + } + } - private async createProducer() { - await this.producer.connect(); - } + private async connectProducer() { + await this.producer.connect(); + } - private async createConsumer(topic: string) { - await this.consumer.connect(); - await this.consumer.subscribe({ topic }); - await this.consumer.run({ - eachMessage: async ({ message }) => { - const parsedMessage = JSON.parse(message.value.toString()); - // Using channel abstraction - if (parsedMessage.channel) { - const { channel, ...payload } = parsedMessage; - this.onMessage(channel, payload); - } else { - // No channel abstraction, publish over the whole topic - this.onMessage(topic, parsedMessage); - } - } - }); - } + private async runConsumer(topic: string) { + await this.consumer.connect(); + await this.consumer.subscribe({ topic }); + await this.consumer.run({ + eachMessage: async ({ message }) => { + const parsedMessage = JSON.parse(message.value.toString()); + // Using channel abstraction + if (parsedMessage.channel) { + const { channel, ...payload } = parsedMessage; + this.onMessage(channel, payload); + } else { + // No channel abstraction, publish over the whole topic + this.onMessage(topic, parsedMessage); + } + } + }); + } } diff --git a/src/test/InMemoryKafka.ts b/src/test/InMemoryKafka.ts new file mode 100644 index 0000000..1bc349e --- /dev/null +++ b/src/test/InMemoryKafka.ts @@ -0,0 +1,93 @@ +// tslint:disable +// copied from: https://github.com/Wei-Zou/jest-mock-kafkajs/blob/master/__mocks__/kafkajs.js + +export class Producer { + private sendCb: any; + constructor({ sendCb }: any) { + this.sendCb = sendCb; + } + + public async connect() { + return Promise.resolve(); + } + + public async send({ topic, messages }: any) { + this.sendCb({ topic, messages }); + } + + public async disconnect() { + return Promise.resolve(); + } +} + +class Consumer { + private groupId: string; + private subscribeCb: any; + eachMessage: any; + + constructor({ groupId, subscribeCb }: any) { + this.groupId = groupId; + this.subscribeCb = subscribeCb; + } + + public getGroupId() { + return this.groupId; + } + + public async connect() { + return Promise.resolve(); + } + + public async subscribe({ topic }: any) { + this.subscribeCb(topic, this); + } + + public async run({ eachMessage }: { eachMessage: (message: any) => void }) { + this.eachMessage = eachMessage; + } + + public async disconnect() { + return Promise.resolve(); + } +} + +export class Kafka { + private topics: { [key: string]: { [key: string]: Consumer[] } }; + + constructor() { + this.topics = {}; + } + + public producer() { + return new Producer({ + sendCb: this._sendCb.bind(this) + }); + } + + public consumer({ groupId }: any) { + return new Consumer({ + groupId, + subscribeCb: this._subscribeCb.bind(this) + }); + } + + private _subscribeCb(topic: string, consumer: Consumer) { + this.topics[topic] = this.topics[topic] || {}; + const topicObj = this.topics[topic]; + topicObj[consumer.getGroupId()] = topicObj[consumer.getGroupId()] || []; + topicObj[consumer.getGroupId()].push(consumer); + } + + private _sendCb({ topic, messages }: any) { + messages.forEach((message: any) => { + Object.values(this.topics[topic]).forEach((consumers: Consumer[]) => { + const consumerToGetMessage = Math.floor( + Math.random() * consumers.length + ); + consumers[consumerToGetMessage].eachMessage({ + message + }); + }); + }); + } +} diff --git a/src/test/kafka-pubsub.spec.ts b/src/test/kafka-pubsub.spec.ts index 32cf635..5ae2e51 100644 --- a/src/test/kafka-pubsub.spec.ts +++ b/src/test/kafka-pubsub.spec.ts @@ -1,36 +1,23 @@ -// ToDo test!!! +import { Kafka } from "./InMemoryKafka"; +import { KafkaPubSub } from "../index"; -// import { Kafka } from 'kafkajs'; -// import { KafkaPubSub } from '../index' +describe("Test Suite", () => { + it("should test basic pub sub", async () => { + const topic = "mock_topic"; + const channel = "my_channel"; + const payload = { data: 1 }; -// const mockWrite = jest.fn((msg) => msg) -// const mockProducer = jest.fn(() => ({ -// write: mockWrite -// })) -// const mockConsumer = jest.fn(() => {}) + const onMessage = jest.fn((channel, msg) => {}); -// const topic = 'test-topic' -// const host = 'localhost' -// const port = '9092' -// const pubsub = new KafkaPubSub({kafka: Kafka(), topic}) + const pubsub = await KafkaPubSub.create({ + groupIdPrefix: "my-prefix", + kafka: new Kafka() as any, + topic + }); -// describe('KafkaPubSub', () => { -// it('should create producer/consumers correctly', () => { -// const onMessage = jest.fn() -// const testChannel = 'testChannel' -// expect(mockProducer).toBeCalledWith(topic) -// expect(mockConsumer).toBeCalledWith(topic) -// }) -// it('should subscribe and publish messages correctly', async () => { -// const channel = 'test-channel' -// const onMessage = jest.fn() -// const payload = { -// channel, -// id: 'test', -// } -// const subscription = await pubsub.subscribe(channel, onMessage) -// pubsub.publish(payload) -// expect(mockWrite).toBeCalled() -// expect(mockWrite).toBeCalledWith(new Buffer(JSON.stringify(payload))) -// }) -// }) \ No newline at end of file + await pubsub.subscribe(channel, onMessage); + await pubsub.publish(channel, payload); + expect(onMessage).toBeCalled(); + expect(onMessage).toBeCalledWith(payload); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index ecf96e0..b57aead 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,6 @@ "strictNullChecks": false, "moduleResolution": "node", "removeComments": true, - "jsx": "react", "sourceMap": true, "declaration": true, "rootDir": "./src", @@ -15,5 +14,5 @@ "baseUrl": ".", "lib": ["es6", "esnext"] }, - "exclude": ["node_modules", "dist"] + "exclude": ["dist"] }