Skip to content

Commit

Permalink
use resumeAtTopicSequencePage and fix logger statements
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua committed Nov 8, 2024
1 parent 62d1ad8 commit ec3f57d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
20 changes: 10 additions & 10 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.lastTopicSequencePage ?? 'undefined'
options.subscriptionState.resumeAtTopicSequencePage
);
const call = this.client.Subscribe(request, {
interceptors: this.streamingInterceptors,
Expand Down Expand Up @@ -252,17 +252,17 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
);
options.onHeartbeat(new TopicHeartbeat());
} else if (resp.discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
const topicDiscontinuity = new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence,
resp.discontinuity.new_sequence_page
);
options.onDiscontinuity(
new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence,
resp.discontinuity.new_sequence_page
)
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s; %s',
truncateString(options.topicName),
topicDiscontinuity.toString()
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
Expand Down
48 changes: 23 additions & 25 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,12 @@ export class PubsubClient<
request.setResumeAtTopicSequenceNumber(
options.subscriptionState.resumeAtTopicSequenceNumber
);
if (options.subscriptionState.lastTopicSequencePage !== undefined) {
request.setSequencePage(options.subscriptionState.lastTopicSequencePage);
}
request.setSequencePage(
options.subscriptionState.resumeAtTopicSequencePage
);

this.getLogger().trace(
'Subscribing to topic with resume_at_topic_sequence_number %s and sequence_page %s',
options.subscriptionState.resumeAtTopicSequenceNumber,
options.subscriptionState.lastTopicSequencePage ?? 'undefined'
`Subscribing to topic with resume_at_topic_sequence_number ${options.subscriptionState.resumeAtTopicSequenceNumber} and sequence_page ${options.subscriptionState.resumeAtTopicSequencePage}`
);

const call = this.client.subscribe(request, {
Expand Down Expand Up @@ -195,10 +193,9 @@ export class PubsubClient<
const itemText = item.getValue()?.getText();
const itemBinary = item.getValue()?.getBinary();
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s; sequence page: %s',
truncateString(options.topicName),
sequenceNumber,
sequencePage
`Received an item on subscription stream; topic: ${truncateString(
options.topicName
)}; sequence number: ${sequenceNumber}; sequence page: ${sequencePage}`
);
if (itemText) {
options.onItem(
Expand All @@ -210,8 +207,9 @@ export class PubsubClient<
);
} else {
this.getLogger().error(
'Received subscription item with unknown type; topic: %s',
truncateString(options.topicName)
`Received subscription item with unknown type; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(
Expand All @@ -222,28 +220,28 @@ export class PubsubClient<
}
} else if (resp.getHeartbeat()) {
this.getLogger().trace(
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
`Received heartbeat from subscription stream; topic: ${truncateString(
options.topicName
)}`
);
options.onHeartbeat(new TopicHeartbeat());
} else if (discontinuity) {
this.getLogger().trace(
'Received a discontinuity; topic: %s; new sequence number: %s; new sequence page: %s',
truncateString(options.topicName),
const topicDiscontinuity = new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
);
options.onDiscontinuity(
new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence(),
discontinuity.getNewSequencePage()
)
this.getLogger().trace(
`Received a discontinuity; topic: ${truncateString(
options.topicName
)}; ${topicDiscontinuity.toString()}`
);
options.onDiscontinuity(topicDiscontinuity);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
truncateString(options.topicName)
`Received unknown subscription item; topic: ${truncateString(
options.topicName
)}`
);
options.onError(
new TopicSubscribe.Error(new UnknownError('Unknown item type')),
Expand Down

0 comments on commit ec3f57d

Please sign in to comment.