Skip to content

Commit

Permalink
feat(server): pass create subscription options on GooglePubSubPattern…
Browse files Browse the repository at this point in the history
…Metadata


Co-authored-by: Alessandro Rovito <[email protected]>
  • Loading branch information
m33ch and Alessandro Rovito authored Mar 3, 2022
1 parent 589fca5 commit 29977f8
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 7 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ strategy and let the framework do the rest.

## Microservice Strategy

The server/transport strategy component is inserted as a strategy when creating a microservice, taking a
The server/transport strategy component is inserted as a strategy when creating a microservice, taking a
few configuration parameters, as well as an optional PubSub instance, like so:
```typescript
async function bootstrap() {
Expand Down Expand Up @@ -62,7 +62,7 @@ that will make parsing PubSub Messages easy, simple and fun:

| Name | Desscription |
-------|-----------
| @GooglePubSubMessageHandler | Takes a subscription name and optionally a topic name. A subscription will be created if it does not already exits **if**: a topic name is supplied **and** `createSubscriptions` was set to true when the microservice was created |
| @GooglePubSubMessageHandler | Takes a subscription name and optionally a topic name and creation parameters of subscription. A subscription will be created if it does not already exits **if**: a topic name is supplied **and** `createSubscriptions` was set to true when the microservice was created. The creation parameters are of type `CreateSubscriptionOptions` from the google pub/sub library |
| @GooglePubSubMessageBody | This will retrieve and `JSON.parse()` the body of the incoming message. You may optionally include a key and the corresponding value will be returned.
| @GooglePubSubMessageAttributes | This will retrieve attributes of the incoming message. You may optionally include a key, and the corresponding value will be returned.
| @Ack | This will return a function that will `ack` the incoming message. </br> **N.B.** this will disable any auto-acking.|
Expand Down Expand Up @@ -154,10 +154,10 @@ export class BasicNackStrategy implements NackStrategy {
```

#### No strategy/hybrid acking and nacking
In addition to using these strategies, the library also makes available ack and nack
In addition to using these strategies, the library also makes available ack and nack
functions through decorators to the controller as well as from
the `GooglePubSubContext`. When ack or nack functions are
retrieved from the context (either directly or through the
the `GooglePubSubContext`. When ack or nack functions are
retrieved from the context (either directly or through the
decorator) **the autoAck/autoNack methods will return false**,
disabling the basic strategies and optionally any strategies you
should choose to create. </br>
Expand Down Expand Up @@ -216,6 +216,9 @@ export class TestController {

@GooglePubSubMessageHandler({
subscriptionName: 'my-subscription-that-or-may-not-exist',
createOptions: {
enableMessageOrdering: true,
},
topicName: 'my-existing-topic'
})
public handler2(@GooglePubSubMessageBody('bar') bar: boolean ): void {
Expand Down
4 changes: 4 additions & 0 deletions examples/server/example.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export class ExampleController {

@GooglePubSubMessageHandler({
subscriptionName: 'lee-christmas-notifications',
createOptions: {
enableMessageOrdering: true,
retainAckedMessages: false,
},
topicName: 'expendables-headquarters',
})
public expendablesHandler(
Expand Down
52 changes: 51 additions & 1 deletion lib/client/client-google-pubsub.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
jest.mock('@google-cloud/pubsub');
import { PubSub, Subscription, Topic } from '@google-cloud/pubsub';
import { CreateSubscriptionOptions, PubSub, Subscription, Topic } from '@google-cloud/pubsub';
import { GooglePubSubTopic } from '../interfaces';
import { ClientGooglePubSub } from './client-google-pubsub';

const topicName = 'project-venison-plans';
const subscriptionName = 'project-v-tem-ray-notifier';
const subscriptionCreationOption = {
enableMessageOrdering: true,
retainAckedMessages: false,
expirationPolicy: {
ttl: {
seconds: 30,
},
},
retryPolicy: {
minimumBackoff: {
seconds: 10,
},
maximumBackoff: {
seconds: 600,
},
},
} as CreateSubscriptionOptions;
const testMessage = "We're at Side 7";
const testBuffer = Buffer.from(testMessage);

Expand Down Expand Up @@ -94,25 +111,58 @@ describe('ClientGooglePubSub', () => {
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription name and a topic name and a creation options', async () => {
await clientProxy
.createSubscription(subscriptionName, topicName, subscriptionCreationOption)
.toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance and a topic name', async () => {
const subscription = new Subscription(client, subscriptionName);
await clientProxy.createSubscription(subscription, topicName).toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance and a topic name and a creation options', async () => {
const subscription = new Subscription(client, subscriptionName);
await clientProxy
.createSubscription(subscription, topicName, subscriptionCreationOption)
.toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance with a topic name set', async () => {
const subscription = new Subscription(client, subscriptionName);
subscription.topic = topicName;
await clientProxy.createSubscription(subscription).toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance with a topic name set and a creation options', async () => {
const subscription = new Subscription(client, subscriptionName);
subscription.topic = topicName;
await clientProxy
.createSubscription(subscription, undefined, subscriptionCreationOption)
.toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance and a topic instance', async () => {
const subscription = new Subscription(client, subscriptionName);
subscription.topic = topicName;
await clientProxy.createSubscription(subscription).toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});

it('should attempt to create a Subscription when given a subscription instance and a topic instance and a creation options', async () => {
const subscription = new Subscription(client, subscriptionName);
subscription.topic = topicName;
await clientProxy
.createSubscription(subscription, undefined, subscriptionCreationOption)
.toPromise();
expect(mockedSubscriptionCreate).toHaveBeenCalled();
});
});

describe('deleteSubscription', () => {
Expand Down
3 changes: 3 additions & 0 deletions lib/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Attributes,
ClientConfig,
CreateSubscriptionOptions,
Message,
PubSub,
Subscription,
Expand All @@ -19,10 +20,12 @@ export type AckFunction = () => void;
export type NackFunction = () => void;
export interface GooglePubSubSubscriptionPatternMetadata {
subscriptionName: string;
createOptions?: CreateSubscriptionOptions;
topicName?: string;
}
export interface GooglePubSubTopicPatternMetadata {
subscriptionName?: string;
createOptions?: CreateSubscriptionOptions;
topicName: string;
}

Expand Down
5 changes: 4 additions & 1 deletion lib/server/server-google-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { CreateSubscriptionOptions } from '@google-cloud/pubsub';
import { Logger } from '@nestjs/common';
import { CustomTransportStrategy, MessageHandler, ReadPacket, Server } from '@nestjs/microservices';
import { from, merge, Observable, of, Subscription } from 'rxjs';
Expand Down Expand Up @@ -138,6 +139,7 @@ export class GooglePubSubTransport extends Server implements CustomTransportStra
const subscription: GooglePubSubSubscription | null = await this.getOrCreateSubscription(
subscriptionName,
metadata.topicName,
metadata.createOptions,
pattern,
);

Expand Down Expand Up @@ -231,6 +233,7 @@ export class GooglePubSubTransport extends Server implements CustomTransportStra
private getOrCreateSubscription = async (
subscriptionName: string,
topicName: string | undefined,
createOptions: CreateSubscriptionOptions | undefined,
pattern: string,
): Promise<GooglePubSubSubscription | null> => {
const subscriptionExists: boolean = await this.googlePubSubClient
Expand All @@ -249,7 +252,7 @@ export class GooglePubSubTransport extends Server implements CustomTransportStra
const topic: GooglePubSubTopic | null = this.googlePubSubClient.getTopic(_topicName);

return await this.googlePubSubClient
.createSubscription(subscriptionName, topic)
.createSubscription(subscriptionName, topic, createOptions)
.toPromise();
};

Expand Down
6 changes: 6 additions & 0 deletions test/server/server.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ import { createMessage } from '../utilities';

jest.mock('../../examples/server/example.service');

const subscriptionCreationOptions = {
enableMessageOrdering: true,
retainAckedMessages: false,
};

const expendablesHandlerPattern = {
subscriptionName: 'lee-christmas-notifications',
createOptions: subscriptionCreationOptions,
topicName: 'expendables-headquarters',
};

Expand Down

0 comments on commit 29977f8

Please sign in to comment.