-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
70286e8
commit ea77234
Showing
6 changed files
with
240 additions
and
116 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,86 +1,107 @@ | ||
import { PubSubEngine } from 'graphql-subscriptions'; | ||
import { Consumer, Kafka, Producer } from 'kafkajs'; | ||
import { PubSubAsyncIterator } from './pubsub-async-iterator'; | ||
import { PubSubEngine } from "graphql-subscriptions"; | ||
import { Consumer, Kafka, Producer } from "kafkajs"; | ||
import { PubSubAsyncIterator } from "./pubsub-async-iterator"; | ||
|
||
interface KafkaPubSubInput { | ||
kafka: Kafka; | ||
topic: string; | ||
groupIdPrefix: string; | ||
} | ||
|
||
export class KafkaPubSub implements PubSubEngine { | ||
private client: Kafka; | ||
private subscriptionMap: { [subId: number]: [string, Function] }; | ||
private channelSubscriptions: { [channel: string]: number[] }; | ||
private producer: Producer; | ||
private consumer: Consumer; | ||
private topic: string; | ||
private client: Kafka; | ||
private subscriptionMap: { [subId: number]: [string, Function] }; | ||
private channelSubscriptions: { [channel: string]: number[] }; | ||
private producer: Producer; | ||
private consumer: Consumer; | ||
private topic: string; | ||
|
||
public static async create({ | ||
kafka, | ||
topic, | ||
groupIdPrefix | ||
}: KafkaPubSubInput): Promise<KafkaPubSub> { | ||
const pubsub = new KafkaPubSub({ kafka, topic, groupIdPrefix }); | ||
await pubsub.connectProducer(); | ||
await pubsub.runConsumer(pubsub.topic); | ||
return pubsub; | ||
} | ||
|
||
constructor({ kafka, topic }: { kafka: Kafka; topic: string }) { | ||
this.client = kafka; | ||
this.subscriptionMap = {}; | ||
this.channelSubscriptions = {}; | ||
this.topic = topic; | ||
this.producer = this.client.producer(); | ||
this.consumer = this.client.consumer({ | ||
groupId: `pr-dt-collab-${Math.ceil(Math.random() * 9999)}` | ||
}); | ||
private constructor({ kafka, topic, groupIdPrefix }: KafkaPubSubInput) { | ||
this.client = kafka; | ||
this.subscriptionMap = {}; | ||
this.channelSubscriptions = {}; | ||
this.topic = topic; | ||
this.producer = this.client.producer(); | ||
this.consumer = this.client.consumer({ | ||
// we need all consumers listening to all messages | ||
groupId: `${groupIdPrefix}-${Math.ceil(Math.random() * 9999)}` | ||
}); | ||
} | ||
|
||
this.createProducer(); | ||
this.createConsumer(topic); | ||
} | ||
public async publish(channel: string, payload: any): Promise<void> { | ||
await this.producer.send({ | ||
messages: [ | ||
{ value: Buffer.from(JSON.stringify({ channel, ...payload })) } | ||
], | ||
topic: this.topic | ||
}); | ||
} | ||
|
||
public async publish(channel: string, payload: any): Promise<void> { | ||
await this.producer.send({ | ||
messages: [{value:Buffer.from(JSON.stringify({channel, ...payload}))}], | ||
topic: this.topic | ||
}); | ||
} | ||
public async subscribe( | ||
channel: string, | ||
onMessage: Function, | ||
options?: any | ||
): Promise<number> { | ||
const index = Object.keys(this.subscriptionMap).length; | ||
this.subscriptionMap[index] = [channel, onMessage]; | ||
this.channelSubscriptions[channel] = ( | ||
this.channelSubscriptions[channel] || [] | ||
).concat(index); | ||
return index; | ||
} | ||
|
||
public async subscribe(channel: string, onMessage: Function, options?: any): Promise<number> { | ||
const index = Object.keys(this.subscriptionMap).length; | ||
this.subscriptionMap[index] = [channel, onMessage]; | ||
this.channelSubscriptions[channel] = (this.channelSubscriptions[channel] || []).concat( | ||
index | ||
); | ||
return index; | ||
} | ||
|
||
public unsubscribe(index: number) { | ||
const [channel] = this.subscriptionMap[index]; | ||
this.channelSubscriptions[channel] = this.channelSubscriptions[channel].filter( | ||
subId => subId !== index | ||
); | ||
} | ||
public unsubscribe(index: number) { | ||
const [channel] = this.subscriptionMap[index]; | ||
this.channelSubscriptions[channel] = this.channelSubscriptions[ | ||
channel | ||
].filter(subId => subId !== index); | ||
} | ||
|
||
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> { | ||
return new PubSubAsyncIterator<T>(this, triggers); | ||
} | ||
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> { | ||
return new PubSubAsyncIterator<T>(this, triggers); | ||
} | ||
|
||
private onMessage(channel: string, message: any) { | ||
const subscriptions = this.channelSubscriptions[channel]; | ||
if (!subscriptions) { | ||
return; | ||
} // no subscribers, don't publish msg | ||
for (const subId of subscriptions) { | ||
const [cnl, listener] = this.subscriptionMap[subId]; | ||
listener(message); | ||
} | ||
} | ||
private onMessage(channel: string, message: any) { | ||
const subscriptions = this.channelSubscriptions[channel]; | ||
if (!subscriptions) { | ||
return; | ||
} // no subscribers, don't publish msg | ||
for (const subId of subscriptions) { | ||
const [cnl, listener] = this.subscriptionMap[subId]; | ||
listener(message); | ||
} | ||
} | ||
|
||
private async createProducer() { | ||
await this.producer.connect(); | ||
} | ||
private async connectProducer() { | ||
await this.producer.connect(); | ||
} | ||
|
||
private async createConsumer(topic: string) { | ||
await this.consumer.connect(); | ||
await this.consumer.subscribe({ topic }); | ||
await this.consumer.run({ | ||
eachMessage: async ({ message }) => { | ||
const parsedMessage = JSON.parse(message.value.toString()); | ||
// Using channel abstraction | ||
if (parsedMessage.channel) { | ||
const { channel, ...payload } = parsedMessage; | ||
this.onMessage(channel, payload); | ||
} else { | ||
// No channel abstraction, publish over the whole topic | ||
this.onMessage(topic, parsedMessage); | ||
} | ||
} | ||
}); | ||
} | ||
private async runConsumer(topic: string) { | ||
await this.consumer.connect(); | ||
await this.consumer.subscribe({ topic }); | ||
await this.consumer.run({ | ||
eachMessage: async ({ message }) => { | ||
const parsedMessage = JSON.parse(message.value.toString()); | ||
// Using channel abstraction | ||
if (parsedMessage.channel) { | ||
const { channel, ...payload } = parsedMessage; | ||
this.onMessage(channel, payload); | ||
} else { | ||
// No channel abstraction, publish over the whole topic | ||
this.onMessage(topic, parsedMessage); | ||
} | ||
} | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// tslint:disable | ||
// copied from: https://github.com/Wei-Zou/jest-mock-kafkajs/blob/master/__mocks__/kafkajs.js | ||
|
||
export class Producer { | ||
private sendCb: any; | ||
constructor({ sendCb }: any) { | ||
this.sendCb = sendCb; | ||
} | ||
|
||
public async connect() { | ||
return Promise.resolve(); | ||
} | ||
|
||
public async send({ topic, messages }: any) { | ||
this.sendCb({ topic, messages }); | ||
} | ||
|
||
public async disconnect() { | ||
return Promise.resolve(); | ||
} | ||
} | ||
|
||
class Consumer { | ||
private groupId: string; | ||
private subscribeCb: any; | ||
eachMessage: any; | ||
|
||
constructor({ groupId, subscribeCb }: any) { | ||
this.groupId = groupId; | ||
this.subscribeCb = subscribeCb; | ||
} | ||
|
||
public getGroupId() { | ||
return this.groupId; | ||
} | ||
|
||
public async connect() { | ||
return Promise.resolve(); | ||
} | ||
|
||
public async subscribe({ topic }: any) { | ||
this.subscribeCb(topic, this); | ||
} | ||
|
||
public async run({ eachMessage }: { eachMessage: (message: any) => void }) { | ||
this.eachMessage = eachMessage; | ||
} | ||
|
||
public async disconnect() { | ||
return Promise.resolve(); | ||
} | ||
} | ||
|
||
export class Kafka { | ||
private topics: { [key: string]: { [key: string]: Consumer[] } }; | ||
|
||
constructor() { | ||
this.topics = {}; | ||
} | ||
|
||
public producer() { | ||
return new Producer({ | ||
sendCb: this._sendCb.bind(this) | ||
}); | ||
} | ||
|
||
public consumer({ groupId }: any) { | ||
return new Consumer({ | ||
groupId, | ||
subscribeCb: this._subscribeCb.bind(this) | ||
}); | ||
} | ||
|
||
private _subscribeCb(topic: string, consumer: Consumer) { | ||
this.topics[topic] = this.topics[topic] || {}; | ||
const topicObj = this.topics[topic]; | ||
topicObj[consumer.getGroupId()] = topicObj[consumer.getGroupId()] || []; | ||
topicObj[consumer.getGroupId()].push(consumer); | ||
} | ||
|
||
private _sendCb({ topic, messages }: any) { | ||
messages.forEach((message: any) => { | ||
Object.values(this.topics[topic]).forEach((consumers: Consumer[]) => { | ||
const consumerToGetMessage = Math.floor( | ||
Math.random() * consumers.length | ||
); | ||
consumers[consumerToGetMessage].eachMessage({ | ||
message | ||
}); | ||
}); | ||
}); | ||
} | ||
} |
Oops, something went wrong.