Skip to content

Commit

Permalink
:small_adhesive_bandage: add additional topic Name for contact streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
LemonaOna committed May 28, 2024
1 parent 411b538 commit bbd265f
Showing 1 changed file with 49 additions and 2 deletions.
51 changes: 49 additions & 2 deletions src/models/controller.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class Controller {
null;
private pubSubIntegrationEventsClient: PubSubClient<PubSubIntegrationsEventMessage> | null =
null;
private additionalPubSubContactStreamingClient: PubSubClient<PubSubContactsMessage> | null =
null;
private integrationName: string = 'UNKNOWN';

// used for garbage collection reasons, to prevent long running promises from getting canceled
Expand Down Expand Up @@ -81,6 +83,7 @@ export class Controller {
const {
PUBSUB_TOPIC_NAME: topicNameLegacy,
PUBSUB_TOPIC_NAME_CONTACT_STREAMING: topicName,
PUBSUB_ADDITIONAL_TOPIC_NAME: additionalTopicName,
} = process.env;

const topicNameProvided = topicName ?? topicNameLegacy;
Expand All @@ -90,11 +93,19 @@ export class Controller {
}

this.pubSubContactStreamingClient = new PubSubClient(topicNameProvided);

infoLogger(
'Controller',
`Initialized PubSub client for topic ${topicNameProvided}`,
);
if (additionalTopicName) {
this.additionalPubSubContactStreamingClient = new PubSubClient(
additionalTopicName,
);
infoLogger(
'Controller',
`Initialized PubSub client for topic ${additionalTopicName}`,
);
}
}

private initContactChanges() {
Expand All @@ -105,7 +116,6 @@ export class Controller {
}

this.pubSubIntegrationEventsClient = new PubSubClient(topicName);

infoLogger(
'Controller',
`Initialized PubSub client for topic ${topicName}`,
Expand Down Expand Up @@ -303,6 +313,7 @@ export class Controller {
throw new Error('Invalid contacts received');
}

console.log('Publishing contacts', contacts.length);
await this.pubSubContactStreamingClient?.publishMessage(
{
userId,
Expand All @@ -315,6 +326,20 @@ export class Controller {
},
orderingKey,
);

// todo: remove as soon as platypus goes live
await this.additionalPubSubContactStreamingClient?.publishMessage(
{
userId,
timestamp,
contacts: contacts.map((contact) =>
sanitizeContact(contact, providerConfig.locale),
),
state: PubSubContactsState.IN_PROGRESS,
integrationName: this.integrationName,
},
orderingKey,
);
} catch (error) {
errorLogger(
'streamContacts',
Expand Down Expand Up @@ -344,6 +369,17 @@ export class Controller {

const streamingPromise = streamContacts()
.then(() => {
this.additionalPubSubContactStreamingClient?.publishMessage(
{
userId: providerConfig.userId,
timestamp,
contacts: [],
state: PubSubContactsState.COMPLETE,
integrationName: this.integrationName,
},
orderingKey,
);

return this.pubSubContactStreamingClient?.publishMessage(
{
userId: providerConfig.userId,
Expand All @@ -362,6 +398,17 @@ export class Controller {
providerConfig.apiKey,
error,
);
this.additionalPubSubContactStreamingClient?.publishMessage(
{
userId: providerConfig.userId,
timestamp,
contacts: [],
state: PubSubContactsState.FAILED,
integrationName: this.integrationName,
},
orderingKey,
);

return this.pubSubContactStreamingClient?.publishMessage(
{
userId: providerConfig.userId,
Expand Down

0 comments on commit bbd265f

Please sign in to comment.