From 3b1f0412e5465dfabc95341000b73323625f8fb8 Mon Sep 17 00:00:00 2001 From: samik Date: Thu, 25 Apr 2024 23:07:34 +0530 Subject: [PATCH] PLUGIN-1769: Add more retry codes to pubsub client --- .../gcp/publisher/source/PubSubRDDIterator.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.java b/src/main/java/io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.java index 9f6c822bf2..133edfe62a 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.gcp.publisher.source; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.StatusCode.Code; import com.google.auth.Credentials; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStub; @@ -35,9 +36,11 @@ import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Iterator for PubSub RDD. @@ -51,6 +54,12 @@ public class PubSubRDDIterator implements Iterator { private static final int MAX_MESSAGES = 1000; private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024; //20 MB. Max size for "data" field of a message is 10MB. private static final int RETRY_DELAY = 100; + // Used to set the retryable code settings for pubsub clients + // The default codes are Code.ABORTED, Code.UNAVAILABLE, Code.UNKNOWN + // for current client version 1.108.1 + private static final Set RETRYABLE_CLIENT_CODES = + Stream.of(Code.RESOURCE_EXHAUSTED, Code.ABORTED, Code.CANCELLED, Code.INTERNAL, Code.UNKNOWN, + Code.UNAVAILABLE, Code.DEADLINE_EXCEEDED).collect(Collectors.toSet()); private final long startTime; private final PubSubSubscriberConfig config; @@ -126,7 +135,8 @@ private SubscriberStub buildSubscriberClient() throws IOException { builder.setTransportChannelProvider( SubscriberStubSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(MAX_MESSAGE_SIZE).build()); - builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings()); + builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings()) + .setRetryableCodes(RETRYABLE_CLIENT_CODES); return GrpcSubscriberStub.create(builder.build()); }