Skip to content

Commit

Permalink
Merge pull request data-integrations#1393 from data-integrations/PLUG…
Browse files Browse the repository at this point in the history
…IN-1769

PLUGIN-1769: Add retries for fetch in PubSubRDDIterator
  • Loading branch information
samdgupi authored Apr 26, 2024
2 parents ef32134 + 3b1f041 commit 2b86c51
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
Expand All @@ -35,9 +36,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Iterator for PubSub RDD.
Expand All @@ -51,6 +54,12 @@ public class PubSubRDDIterator implements Iterator<PubSubMessage> {
private static final int MAX_MESSAGES = 1000;
private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024; //20 MB. Max size for "data" field of a message is 10MB.
private static final int RETRY_DELAY = 100;
// Used to set the retryable code settings for pubsub clients
// The default codes are Code.ABORTED, Code.UNAVAILABLE, Code.UNKNOWN
// for current client version 1.108.1
private static final Set<Code> RETRYABLE_CLIENT_CODES =
Stream.of(Code.RESOURCE_EXHAUSTED, Code.ABORTED, Code.CANCELLED, Code.INTERNAL, Code.UNKNOWN,
Code.UNAVAILABLE, Code.DEADLINE_EXCEEDED).collect(Collectors.toSet());

private final long startTime;
private final PubSubSubscriberConfig config;
Expand Down Expand Up @@ -126,7 +135,8 @@ private SubscriberStub buildSubscriberClient() throws IOException {
builder.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE).build());
builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings());
builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings())
.setRetryableCodes(RETRYABLE_CLIENT_CODES);
return GrpcSubscriberStub.create(builder.build());
}

Expand Down

0 comments on commit 2b86c51

Please sign in to comment.