From ec3f57d56965825c3f71845fd9ab7fb316074e48 Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 8 Nov 2024 10:26:36 -0800 Subject: [PATCH] use resumeAtTopicSequencePage and fix logger statements --- .../src/internal/pubsub-client.ts | 20 ++++---- .../src/internal/pubsub-client.ts | 48 +++++++++---------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/packages/client-sdk-nodejs/src/internal/pubsub-client.ts b/packages/client-sdk-nodejs/src/internal/pubsub-client.ts index 25993cc33..22fb95bf9 100644 --- a/packages/client-sdk-nodejs/src/internal/pubsub-client.ts +++ b/packages/client-sdk-nodejs/src/internal/pubsub-client.ts @@ -175,7 +175,7 @@ export class PubsubClient extends AbstractPubsubClient { this.getLogger().trace( 'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s', options.subscriptionState.resumeAtTopicSequenceNumber, - options.subscriptionState.lastTopicSequencePage ?? 'undefined' + options.subscriptionState.resumeAtTopicSequencePage ); const call = this.client.Subscribe(request, { interceptors: this.streamingInterceptors, @@ -252,17 +252,17 @@ export class PubsubClient extends AbstractPubsubClient { ); options.onHeartbeat(new TopicHeartbeat()); } else if (resp.discontinuity) { - this.getLogger().trace( - 'Received discontinuity from subscription stream; topic: %s', - truncateString(options.topicName) + const topicDiscontinuity = new TopicDiscontinuity( + resp.discontinuity.last_topic_sequence, + resp.discontinuity.new_topic_sequence, + resp.discontinuity.new_sequence_page ); - options.onDiscontinuity( - new TopicDiscontinuity( - resp.discontinuity.last_topic_sequence, - resp.discontinuity.new_topic_sequence, - resp.discontinuity.new_sequence_page - ) + this.getLogger().trace( + 'Received discontinuity from subscription stream; topic: %s; %s', + truncateString(options.topicName), + topicDiscontinuity.toString() ); + options.onDiscontinuity(topicDiscontinuity); } else { this.getLogger().error( 'Received unknown subscription item; topic: %s', diff --git a/packages/client-sdk-web/src/internal/pubsub-client.ts b/packages/client-sdk-web/src/internal/pubsub-client.ts index 973d40616..76f4bc418 100644 --- a/packages/client-sdk-web/src/internal/pubsub-client.ts +++ b/packages/client-sdk-web/src/internal/pubsub-client.ts @@ -134,14 +134,12 @@ export class PubsubClient< request.setResumeAtTopicSequenceNumber( options.subscriptionState.resumeAtTopicSequenceNumber ); - if (options.subscriptionState.lastTopicSequencePage !== undefined) { - request.setSequencePage(options.subscriptionState.lastTopicSequencePage); - } + request.setSequencePage( + options.subscriptionState.resumeAtTopicSequencePage + ); this.getLogger().trace( - 'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s', - options.subscriptionState.resumeAtTopicSequenceNumber, - options.subscriptionState.lastTopicSequencePage ?? 'undefined' + `Subscribing to topic with resume_at_topic_sequence_number ${options.subscriptionState.resumeAtTopicSequenceNumber} and sequence_page ${options.subscriptionState.resumeAtTopicSequencePage}` ); const call = this.client.subscribe(request, { @@ -195,10 +193,9 @@ export class PubsubClient< const itemText = item.getValue()?.getText(); const itemBinary = item.getValue()?.getBinary(); this.getLogger().trace( - 'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s', - truncateString(options.topicName), - sequenceNumber, - sequencePage + `Received an item on subscription stream; topic: ${truncateString( + options.topicName + )}; sequence number: ${sequenceNumber}; sequence page: ${sequencePage}` ); if (itemText) { options.onItem( @@ -210,8 +207,9 @@ export class PubsubClient< ); } else { this.getLogger().error( - 'Received subscription item with unknown type; topic: %s', - truncateString(options.topicName) + `Received subscription item with unknown type; topic: ${truncateString( + options.topicName + )}` ); options.onError( new TopicSubscribe.Error( @@ -222,28 +220,28 @@ export class PubsubClient< } } else if (resp.getHeartbeat()) { this.getLogger().trace( - 'Received heartbeat from subscription stream; topic: %s', - truncateString(options.topicName) + `Received heartbeat from subscription stream; topic: ${truncateString( + options.topicName + )}` ); options.onHeartbeat(new TopicHeartbeat()); } else if (discontinuity) { - this.getLogger().trace( - 'Received a discontinuity; topic: %s; new sequence number: %s; new sequence page: %s', - truncateString(options.topicName), + const topicDiscontinuity = new TopicDiscontinuity( + discontinuity.getLastTopicSequence(), discontinuity.getNewTopicSequence(), discontinuity.getNewSequencePage() ); - options.onDiscontinuity( - new TopicDiscontinuity( - discontinuity.getLastTopicSequence(), - discontinuity.getNewTopicSequence(), - discontinuity.getNewSequencePage() - ) + this.getLogger().trace( + `Received a discontinuity; topic: ${truncateString( + options.topicName + )}; ${topicDiscontinuity.toString()}` ); + options.onDiscontinuity(topicDiscontinuity); } else { this.getLogger().error( - 'Received unknown subscription item; topic: %s', - truncateString(options.topicName) + `Received unknown subscription item; topic: ${truncateString( + options.topicName + )}` ); options.onError( new TopicSubscribe.Error(new UnknownError('Unknown item type')),