From c3fa5122ccca36adb4587acb59f907dc7f3d8d00 Mon Sep 17 00:00:00 2001 From: dj-smart Date: Tue, 23 Jan 2024 17:30:17 +0000 Subject: [PATCH] Add ReadTimeout Property in GCS Copy/Move Plugins to handle operations on large data in GCS buckets --- docs/GCSCopy-action.md | 4 +++ docs/GCSMove-action.md | 4 +++ .../gcp/dataplex/sink/DataplexBatchSink.java | 2 +- .../io/cdap/plugin/gcp/gcs/StorageClient.java | 9 ++++-- .../cdap/plugin/gcp/gcs/actions/GCSCopy.java | 2 +- .../cdap/plugin/gcp/gcs/actions/GCSMove.java | 2 +- .../gcp/gcs/actions/SourceDestConfig.java | 28 ++++++++++++++++++- widgets/GCSCopy-action.json | 14 ++++++++++ widgets/GCSMove-action.json | 14 ++++++++++ 9 files changed, 73 insertions(+), 6 deletions(-) diff --git a/docs/GCSCopy-action.md b/docs/GCSCopy-action.md index c52b59d160..09c60cf68b 100644 --- a/docs/GCSCopy-action.md +++ b/docs/GCSCopy-action.md @@ -50,6 +50,10 @@ This value is ignored if the bucket already exists. If the bucket already exists, this is ignored. More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys) +**Read Timeout:** Timeout in seconds to read data from an established HTTP connection (Default value is 20). +For performing copy/move operation on large files in GCS buckets, a higher timeout might be needed. Setting it to 0 +implies infinite timeout (no limit on the timeout) [NOT RECOMMENDED] + Example ------- diff --git a/docs/GCSMove-action.md b/docs/GCSMove-action.md index 30e92bf416..4914d2af11 100644 --- a/docs/GCSMove-action.md +++ b/docs/GCSMove-action.md @@ -51,6 +51,10 @@ This value is ignored if the bucket already exists. If the bucket already exists, this is ignored. More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys) +**Read Timeout:** Timeout in seconds to read data from an established HTTP connection (Default value is 20). +For performing copy/move operation on large files in GCS buckets, a higher timeout might be needed. Setting it to 0 +implies infinite timeout (no limit on the timeout) [NOT RECOMMENDED] + Example ------- diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java index 0e61345149..43da64e7bd 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java @@ -615,7 +615,7 @@ private void emitMetricsForStorageBucket(boolean succeeded, BatchSinkContext con } try { StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(), - config.isServiceAccountFilePath()); + config.isServiceAccountFilePath(), null); storageClient.mapMetaDataForAllBlobs(outputPath, new MetricsEmitter(context.getMetrics())::emitMetrics); } catch (Exception e) { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java index e1c01d2319..aa0d9ef008 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java @@ -17,6 +17,7 @@ package io.cdap.plugin.gcp.gcs; import com.google.api.gax.paging.Page; +import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; @@ -320,17 +321,21 @@ private static String toPath(BlobId blobId) { } public static StorageClient create(String project, @Nullable String serviceAccount, - Boolean isServiceAccountFilePath) throws IOException { + Boolean isServiceAccountFilePath, @Nullable Integer readTimeout) + throws IOException { StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(project); if (serviceAccount != null) { builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath)); } + if (readTimeout != null) { + builder.setTransportOptions(HttpTransportOptions.newBuilder().setReadTimeout(readTimeout * 1000).build()); + } Storage storage = builder.build().getService(); return new StorageClient(storage); } public static StorageClient create(GCPConnectorConfig config) throws IOException { - return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath()); + return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath(), null); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java index 2ac255b2a2..25a984816c 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java @@ -67,7 +67,7 @@ public void run(ActionContext context) throws IOException { return; } StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(), - isServiceAccountFilePath); + isServiceAccountFilePath, config.readTimeout); GCSPath destPath = config.getDestPath(); CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java index f9f75a5213..12cf0f9a0a 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java @@ -67,7 +67,7 @@ public void run(ActionContext context) throws IOException { return; } StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccount(), - isServiceAccountFilePath); + isServiceAccountFilePath, config.readTimeout); GCSPath destPath = config.getDestPath(); CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java index 4c5bafb5a0..5e304988a6 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java @@ -43,6 +43,7 @@ public class SourceDestConfig extends GCPConfig { public static final String NAME_SOURCE_PATH = "sourcePath"; public static final String NAME_DEST_PATH = "destPath"; public static final String NAME_LOCATION = "location"; + public static final String READ_TIMEOUT = "readTimeout"; @Name(NAME_SOURCE_PATH) @Macro @@ -74,15 +75,25 @@ public class SourceDestConfig extends GCPConfig { " at https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys") protected String cmekKey; + @Name(READ_TIMEOUT) + @Macro + @Nullable + @Description("Timeout in seconds to read data from an established HTTP connection (Default value is 20). " + + ("For performing copy/move operation on large files in GCS buckets, set a higher value. " + + "Set it to 0 for infinite(no limit)")) + protected Integer readTimeout; + public SourceDestConfig(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, - @Nullable String destPath, @Nullable String location, @Nullable String cmekKey) { + @Nullable String destPath, @Nullable String location, @Nullable Integer readTimeout, + @Nullable String cmekKey) { this.serviceAccountType = serviceAccountType; this.serviceAccountJson = serviceAccountJson; this.serviceFilePath = serviceFilePath; this.project = project; this.destPath = destPath; this.location = location; + this.readTimeout = readTimeout; this.cmekKey = cmekKey; } @@ -125,9 +136,22 @@ public void validate(FailureCollector collector, Map arguments) if (!containsMacro(NAME_CMEK_KEY)) { validateCmekKey(collector, arguments); } + if (!containsMacro(READ_TIMEOUT)) { + validateReadTimeout(collector); + } collector.getOrThrowException(); } + void validateReadTimeout(FailureCollector collector) { + if (readTimeout == null) { + return; + } + if (readTimeout < 0) { + collector.addFailure("Read Timeout cannot be less than 0. ", + "Please enter 0 or a positive value.").withConfigProperty(READ_TIMEOUT); + } + } + //This method validated the pattern of CMEK Key resource ID. void validateCmekKey(FailureCollector failureCollector, Map arguments) { CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector); @@ -161,6 +185,7 @@ public static class Builder { private String destPath; private String cmekKey; private String location; + private Integer readTimeout; public SourceDestConfig.Builder setProject(@Nullable String project) { this.project = project; @@ -205,6 +230,7 @@ public SourceDestConfig build() { serviceAccountJson, destPath, location, + readTimeout, cmekKey ); } diff --git a/widgets/GCSCopy-action.json b/widgets/GCSCopy-action.json index 389aed88f1..a35b18901b 100644 --- a/widgets/GCSCopy-action.json +++ b/widgets/GCSCopy-action.json @@ -128,6 +128,20 @@ } } ] + }, + { + "label" : "Advanced", + "properties" : [ + { + "name": "readTimeout", + "label" : "Read Timeout", + "widget-type": "number", + "widget-attributes": { + "default": "20", + "minimum": "0" + } + } + ] } ], "filters": [ diff --git a/widgets/GCSMove-action.json b/widgets/GCSMove-action.json index 1385983708..ce75c0b858 100644 --- a/widgets/GCSMove-action.json +++ b/widgets/GCSMove-action.json @@ -128,6 +128,20 @@ } } ] + }, + { + "label" : "Advanced", + "properties" : [ + { + "name": "readTimeout", + "label" : "Read Timeout", + "widget-type": "number", + "widget-attributes": { + "default": "20", + "minimum": "0" + } + } + ] } ], "filters": [