diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java index cedac55d9e..a9fdec0928 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java @@ -29,11 +29,14 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.StorageException; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; @@ -64,6 +67,12 @@ public final class GcsResourceManager implements ArtifactClient, ResourceManager private final String testClassName; private final String runId; + // Retry settings for notification creation + private static final int CREATE_MAX_RETRIES = 5; + private static final Duration CREATE_BACKOFF_DELAY = Duration.ofSeconds(10); + private static final Duration CREATE_BACKOFF_MAX_DELAY = Duration.ofSeconds(60); + private static final double CREATE_BACKOFF_JITTER = 0.1; + public GcsResourceManager(Builder builder) { this.client = ArtifactUtils.createStorageClient(builder.credentials); this.bucket = builder.bucket; @@ -201,18 +210,30 @@ public Notification createNotification(String topicName, String gcsPrefix) { .setObjectNamePrefix(gcsPrefix) .setPayloadFormat(NotificationInfo.PayloadFormat.JSON_API_V1) .build(); - try { - Notification notification = client.createNotification(bucket, notificationInfo); - LOG.info("Successfully created notification {}", notification); - notificationList.add(notification); - return notification; - } catch (StorageException e) { - throw new RuntimeException( - String.format( - "Unable to create notification for bucket %s. Notification: %s", - bucket, notificationInfo), - e); - } + return Failsafe.with(retryOnException()) + .get( + () -> { + try { + Notification notification = client.createNotification(bucket, notificationInfo); + LOG.info("Successfully created notification {}", notification); + notificationList.add(notification); + return notification; + } catch (StorageException e) { + throw new RuntimeException( + String.format( + "Unable to create notification for bucket %s. Notification: %s", + bucket, notificationInfo), + e); + } + }); + } + + private static RetryPolicy retryOnException() { + return RetryPolicy.builder() + .withMaxRetries(CREATE_MAX_RETRIES) + .withBackoff(CREATE_BACKOFF_DELAY, CREATE_BACKOFF_MAX_DELAY) + .withJitter(CREATE_BACKOFF_JITTER) + .build(); } /**