diff --git a/src/models/controller.model.ts b/src/models/controller.model.ts index 6c666c20..2abb71ab 100644 --- a/src/models/controller.model.ts +++ b/src/models/controller.model.ts @@ -48,6 +48,8 @@ export class Controller { null; private pubSubIntegrationEventsClient: PubSubClient | null = null; + private additionalPubSubContactStreamingClient: PubSubClient | null = + null; private integrationName: string = 'UNKNOWN'; // used for garbage collection reasons, to prevent long running promises from getting canceled @@ -81,6 +83,7 @@ export class Controller { const { PUBSUB_TOPIC_NAME: topicNameLegacy, PUBSUB_TOPIC_NAME_CONTACT_STREAMING: topicName, + PUBSUB_ADDITIONAL_TOPIC_NAME: additionalTopicName, } = process.env; const topicNameProvided = topicName ?? topicNameLegacy; @@ -90,11 +93,19 @@ export class Controller { } this.pubSubContactStreamingClient = new PubSubClient(topicNameProvided); - infoLogger( 'Controller', `Initialized PubSub client for topic ${topicNameProvided}`, ); + if (additionalTopicName) { + this.additionalPubSubContactStreamingClient = new PubSubClient( + additionalTopicName, + ); + infoLogger( + 'Controller', + `Initialized PubSub client for topic ${additionalTopicName}`, + ); + } } private initContactChanges() { @@ -105,7 +116,6 @@ export class Controller { } this.pubSubIntegrationEventsClient = new PubSubClient(topicName); - infoLogger( 'Controller', `Initialized PubSub client for topic ${topicName}`, @@ -303,6 +313,7 @@ export class Controller { throw new Error('Invalid contacts received'); } + console.log('Publishing contacts', contacts.length); await this.pubSubContactStreamingClient?.publishMessage( { userId, @@ -315,6 +326,20 @@ export class Controller { }, orderingKey, ); + + // todo: remove as soon as platypus goes live + await this.additionalPubSubContactStreamingClient?.publishMessage( + { + userId, + timestamp, + contacts: contacts.map((contact) => + sanitizeContact(contact, providerConfig.locale), + ), + state: PubSubContactsState.IN_PROGRESS, + integrationName: this.integrationName, + }, + orderingKey, + ); } catch (error) { errorLogger( 'streamContacts', @@ -344,6 +369,17 @@ export class Controller { const streamingPromise = streamContacts() .then(() => { + this.additionalPubSubContactStreamingClient?.publishMessage( + { + userId: providerConfig.userId, + timestamp, + contacts: [], + state: PubSubContactsState.COMPLETE, + integrationName: this.integrationName, + }, + orderingKey, + ); + return this.pubSubContactStreamingClient?.publishMessage( { userId: providerConfig.userId, @@ -362,6 +398,17 @@ export class Controller { providerConfig.apiKey, error, ); + this.additionalPubSubContactStreamingClient?.publishMessage( + { + userId: providerConfig.userId, + timestamp, + contacts: [], + state: PubSubContactsState.FAILED, + integrationName: this.integrationName, + }, + orderingKey, + ); + return this.pubSubContactStreamingClient?.publishMessage( { userId: providerConfig.userId,