Skip to content

Commit

Permalink
Merge pull request #90 from juni-b-queer/filter-jetstream-by-did
Browse files Browse the repository at this point in the history
minor: ability to filter jetstream by DIDs
  • Loading branch information
juni-b-queer authored Dec 11, 2024
2 parents 08bdd36 + f000d4b commit 5f58a72
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 23 deletions.
15 changes: 14 additions & 1 deletion src/subscriptions/firehose/JetstreamSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ export class JetstreamSubscription extends AbstractSubscription {
*
* @param {JetstreamSubscriptionHandlers} handlerControllers - An array of handler controllers.
* @param {string} wsURL - The WebSocket URL to connect to. Defaults to `wss://bsky.network`.
* @param dids
*/
constructor(
protected handlerControllers: JetstreamSubscriptionHandlers,
protected wsURL: string = 'ws://localhost:6008/subscribe'
protected wsURL: string = 'ws://localhost:6008/subscribe',
protected wantedDids: string[] = []
) {
super(handlerControllers);
this.generateWsURL();
Expand All @@ -61,6 +63,17 @@ export class JetstreamSubscription extends AbstractSubscription {
if (queryParams.length > 0) {
this.setWsURL = `${this.wsURL}?${queryParams.join('&')}`;
}

if (this.wantedDids.length > 0) {
const dids = this.wantedDids
.map((did) => `wantedDids=${did}`)
.join('&');
if (queryParams.length > 0) {
this.setWsURL = `${this.wsURL}&${dids}`;
} else {
this.setWsURL = `${this.wsURL}?${dids}`;
}
}
}

/**
Expand Down
11 changes: 11 additions & 0 deletions src/subscriptions/firehose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ const handlers: JetstreamSubscriptionHandlers = {
```

This also allows the JetstreamSubscription to automatically subscribe to only events that it's handling.


### Wanted DIDs
If you want to only listen for events from a specific user (or users) you can pass an optional 3rd parameter into the jetstream constructor
```typescript
new JetstreamSubscription(
handlers,
'ws://localhost:6010/subscribe',
['did:plc:123', 'did:plc:124']
);
```
77 changes: 55 additions & 22 deletions tests/firehose/JeststreamSubscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,35 @@ import WebSocket from 'ws';

describe('JetstreamSubscription', () => {
let jetSub: JetstreamSubscription;
const handlers: JetstreamSubscriptionHandlers = {
post: {
c: [],
d: [],
},
like: {
c: [],
d: [],
},
repost: {
c: [],
d: [],
},
follow: {
c: [],
d: [],
},
block: {
c: [],
d: [],
},
};
let handlers: JetstreamSubscriptionHandlers;
// A dummy message handler for testing
const dummyHandler: MessageHandler = {
handle: jest.fn(),
} as unknown as MessageHandler;

beforeEach(() => {
handlers = {
post: {
c: [],
d: [],
},
like: {
c: [],
d: [],
},
repost: {
c: [],
d: [],
},
follow: {
c: [],
d: [],
},
block: {
c: [],
d: [],
},
};
jetSub = new JetstreamSubscription(handlers);
(dummyHandler.handle as jest.Mock).mockClear();
});
Expand All @@ -58,6 +59,38 @@ describe('JetstreamSubscription', () => {
expect((jetSub as any).wsURL).toContain('post');
});

test('generateWsURL with did', () => {
handlers = {
post: {
c: [dummyHandler],
},
};
jetSub = new JetstreamSubscription(
handlers,
'ws://localhost:6010/subscribe',
['did:plc:123']
);
jetSub.setWsURL = 'ws://localhost:6010/subscribe';
jetSub.generateWsURL();
expect((jetSub as any).wsURL).toBe(
'ws://localhost:6010/subscribe?wantedCollections=app.bsky.feed.post&wantedDids=did:plc:123'
);
});

test('generateWsURL with dids', () => {
handlers = {};
jetSub = new JetstreamSubscription(
handlers,
'ws://localhost:6010/subscribe',
['did:plc:123', 'did:plc:124']
);
jetSub.setWsURL = 'ws://localhost:6010/subscribe';
jetSub.generateWsURL();
expect((jetSub as any).wsURL).toBe(
'ws://localhost:6010/subscribe?wantedDids=did:plc:123&wantedDids=did:plc:124'
);
});

test('handleCreate post', () => {
// @ts-ignore
handlers.post.c = [dummyHandler];
Expand Down

0 comments on commit 5f58a72

Please sign in to comment.