From 6015db52fe3c545b083550fe357670bb489f5c5e Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Tue, 17 Sep 2024 13:54:30 +0000 Subject: [PATCH] Implement Program Failure Exception Handling in GCS plugins to catch known errors --- pom.xml | 4 +- .../plugin/gcp/common/ExceptionUtils.java | 157 ++++++++++++++++++ .../io/cdap/plugin/gcp/common/GCPUtils.java | 7 +- .../ServiceAccountAccessTokenProvider.java | 15 +- .../sink/DelegatingGCSOutputCommitter.java | 4 +- .../gcs/sink/DelegatingGCSOutputFormat.java | 15 +- .../gcs/sink/DelegatingGCSOutputUtils.java | 8 +- .../gcs/sink/DelegatingGCSRecordWriter.java | 12 +- .../gcs/sink/ForwardingOutputCommitter.java | 84 ++++++++++ .../gcp/gcs/sink/ForwardingOutputFormat.java | 84 ++++++++++ .../gcp/gcs/sink/ForwardingRecordWriter.java | 56 +++++++ .../plugin/gcp/gcs/sink/GCSBatchSink.java | 59 +++++-- .../gcp/gcs/sink/GCSMultiBatchSink.java | 49 ++++-- .../gcp/gcs/sink/GCSOutputFormatProvider.java | 20 ++- .../gcs/sink/RecordFilterOutputFormat.java | 17 +- .../cdap/plugin/gcp/gcs/source/GCSSource.java | 50 ++++-- .../plugin/gcp/gcs/source/TinkDecryptor.java | 32 +++- .../DataPlexOutputFormatProviderTest.java | 3 +- .../gcs/sink/GCSOutputformatProviderTest.java | 5 +- .../TestDelegatingGCSOutputCommitter.java | 6 +- 20 files changed, 603 insertions(+), 84 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java diff --git a/pom.xml b/pom.xml index f3b98ceb3d..1dfa6a226e 100644 --- a/pom.xml +++ b/pom.xml @@ -1010,8 +1010,8 @@ 1.1.0 - system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT) diff --git a/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java b/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java new file mode 100644 index 0000000000..85c5e76186 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/common/ExceptionUtils.java @@ -0,0 +1,157 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.common; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpResponseException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import java.io.IOException; + +/** + * Utility class to handle exceptions. + */ +public class ExceptionUtils { + + /** Functional interfaces for lambda-friendly method invocations */ + @FunctionalInterface + public interface IOOperation { + void execute() throws IOException; + } + + /** + * Functional interfaces for lambda-friendly method invocations. + * + * @param the return type of the function + */ + @FunctionalInterface + public interface IOFunction { + T execute() throws IOException; + } + + /** Functional interfaces for lambda-friendly method invocations */ + @FunctionalInterface + public interface IOInterruptibleOperation { + void execute() throws IOException, InterruptedException; + } + + /** + * Functional interfaces for lambda-friendly method invocations. + * + * @param the return type of the function + */ + @FunctionalInterface + public interface IOInterruptibleFunction { + + T execute () throws IOException, InterruptedException; + } + + // Generic helper method to handle IOException propagation + public static void invokeWithProgramFailureHandling(IOOperation operation) throws IOException { + try { + operation.execute(); + } catch (IOException e) { + ProgramFailureException exception = getProgramFailureException(e); + if (exception != null) { + throw exception; + } + throw e; + } + } + + // Helper method for returning values (for methods like {@link OutputCommitter#needsTaskCommit}) + public static T invokeWithProgramFailureHandling(IOFunction function) throws IOException { + try { + return function.execute(); + } catch (IOException e) { + ProgramFailureException exception = getProgramFailureException(e); + if (exception != null) { + throw exception; + } + throw e; + } + } + + // Helper method for handling both IOException and InterruptedException + public static void invokeWithProgramFailureAndInterruptionHandling( + IOInterruptibleOperation operation) throws IOException, InterruptedException { + try { + operation.execute(); + } catch (IOException e) { + ProgramFailureException exception = getProgramFailureException(e); + if (exception != null) { + throw exception; + } + throw e; + } + } + + // Helper method for handling both IOException and InterruptedException + public static T invokeWithProgramFailureAndInterruptionHandling( + IOInterruptibleFunction function) throws IOException, InterruptedException { + try { + return function.execute(); + } catch (IOException e) { + ProgramFailureException exception = getProgramFailureException(e); + if (exception != null) { + throw exception; + } + throw e; + } + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link HttpResponseException}. + * + * @param e The HttpResponseException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private static ProgramFailureException getProgramFailureException(HttpResponseException e) { + Integer statusCode = e.getStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(), + pair.getCorrectiveAction()); + + String errorMessage = e.getMessage(); + if (e instanceof GoogleJsonResponseException) { + GoogleJsonResponseException exception = (GoogleJsonResponseException) e; + errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() : + exception.getMessage(); + } + + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, pair.getErrorType(), true, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IOException}. + * + * @param e The IOException to get the error information from. + * @return A ProgramFailureException with the given error information, otherwise null. + */ + private static ProgramFailureException getProgramFailureException(IOException e) { + Throwable target = e instanceof HttpResponseException ? e : e.getCause(); + if (target instanceof HttpResponseException) { + return getProgramFailureException((HttpResponseException) target); + } + return null; + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index aed6e9bc10..3e6f4123e2 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -16,6 +16,7 @@ package io.cdap.plugin.gcp.common; +import com.google.api.client.http.HttpResponseException; import com.google.api.gax.retrying.RetrySettings; import com.google.auth.Credentials; import com.google.auth.oauth2.ExternalAccountCredentials; @@ -35,6 +36,10 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.gson.reflect.TypeToken; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.plugin.gcp.gcs.GCSPath; import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider; import org.apache.hadoop.conf.Configuration; @@ -79,7 +84,7 @@ public class GCPUtils { "https://www.googleapis.com/auth/bigquery"); public static final String FQN_RESERVED_CHARACTERS_PATTERN = ".*[.:` \t\n].*"; public static final int MILLISECONDS_MULTIPLIER = 1000; - + public static final String WRAPPED_OUTPUTFORMAT_CLASSNAME = "wrapped.outputformat.classname"; /** * Load a service account from the local file system. * diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java index 0a64c52dd9..4cedc83c8e 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java @@ -21,6 +21,10 @@ import com.google.bigtable.repackaged.com.google.gson.Gson; import com.google.cloud.hadoop.util.AccessTokenProvider; import com.google.cloud.hadoop.util.CredentialFactory; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; @@ -50,13 +54,20 @@ public AccessToken getAccessToken() { } return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime()); } catch (IOException e) { - throw new RuntimeException(e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e); } } @Override public void refresh() throws IOException { - getCredentials().refresh(); + try { + getCredentials().refresh(); + } catch (IOException e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to refresh service account access token.", e.getMessage(), + ErrorType.UNKNOWN, true, e); + } } private GoogleCredentials getCredentials() throws IOException { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java index 72d9dd0e69..bae7ebbfcd 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java @@ -65,7 +65,9 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat, taskAttemptContext.getConfiguration(), tableName)); //Wrap output committer into the GCS Output Committer. - GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext)); + ForwardingOutputCommitter gcsOutputCommitter = + new ForwardingOutputCommitter( + new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext))); gcsOutputCommitter.setupJob(taskAttemptContext); gcsOutputCommitter.setupTask(taskAttemptContext); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java index 2fe4eeff03..ab84e530e2 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java @@ -17,6 +17,11 @@ package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -46,11 +51,13 @@ public DelegatingGCSOutputFormat() { * Get required configuration properties for this Output Format */ public static Map configure(String delegateClassName, + String wrappedClassName, String filterField, String outputBaseDir, String outputSuffix) { Map config = new HashMap<>(); config.put(DELEGATE_CLASS, delegateClassName); + config.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, wrappedClassName); config.put(PARTITION_FIELD, filterField); config.put(OUTPUT_PATH_BASE_DIR, outputBaseDir); config.put(OUTPUT_PATH_SUFFIX, outputSuffix); @@ -62,7 +69,8 @@ public RecordWriter getRecordWriter(TaskAttemptC Configuration hConf = context.getConfiguration(); String partitionField = hConf.get(PARTITION_FIELD); - return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context)); + return new ForwardingRecordWriter(new DelegatingGCSRecordWriter(context, partitionField, + getOutputCommitter(context), this)); } @Override @@ -71,8 +79,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted } @Override - public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new DelegatingGCSOutputCommitter(context); + public ForwardingOutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new ForwardingOutputCommitter(new DelegatingGCSOutputCommitter(context)); } - } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java index 4ad293884a..87bcf7bb65 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java @@ -17,6 +17,10 @@ package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.OutputFormat; @@ -36,7 +40,9 @@ public static OutputFormat getDelegateFormat(Con (Class>) hConf.getClassByName(delegateClassName); return delegateClass.newInstance(); } catch (Exception e) { - throw new IOException("Unable to instantiate output format for class " + delegateClassName, e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate output format for class '%s'.", delegateClassName), + e.getMessage(), ErrorType.SYSTEM, false, e); } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java index 3bd6a13cc0..fdae3af34d 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java @@ -35,15 +35,16 @@ public class DelegatingGCSRecordWriter extends RecordWriter> delegateMap; - private final DelegatingGCSOutputCommitter delegatingGCSOutputCommitter; + private final ForwardingOutputCommitter delegatingGCSOutputCommitter; + private final DelegatingGCSOutputFormat outputFormat; - DelegatingGCSRecordWriter(TaskAttemptContext context, - String partitionField, - DelegatingGCSOutputCommitter delegatingGCSOutputCommitter) { + DelegatingGCSRecordWriter(TaskAttemptContext context, String partitionField, + ForwardingOutputCommitter delegatingGCSOutputCommitter, DelegatingGCSOutputFormat outputFormat) { this.context = context; this.partitionField = partitionField; this.delegateMap = new HashMap<>(); this.delegatingGCSOutputCommitter = delegatingGCSOutputCommitter; + this.outputFormat = outputFormat; } @Override @@ -55,6 +56,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException, if (delegateMap.containsKey(tableName)) { delegate = delegateMap.get(tableName); } else { + //Get output format from configuration. OutputFormat format = DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration()); @@ -63,7 +65,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException, delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName); //Add record writer to delegate map. - delegate = format.getRecordWriter(context); + delegate = new ForwardingRecordWriter(format.getRecordWriter(context)); delegateMap.put(tableName, delegate); } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java new file mode 100644 index 0000000000..1c1c662968 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputCommitter.java @@ -0,0 +1,84 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.sink; + +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.gcp.common.ExceptionUtils; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; + +/** + * ForwardingOutputCommitter which delegates all operations to another OutputCommitter. + *

+ * This is used to wrap the OutputCommitter of the delegate format and + * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.

+ */ +public class ForwardingOutputCommitter extends OutputCommitter { + private final OutputCommitter delegate; + + public ForwardingOutputCommitter(OutputCommitter delegate) { + this.delegate = delegate; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.setupJob(jobContext)); + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.setupTask(taskAttemptContext)); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return ExceptionUtils.invokeWithProgramFailureHandling( + () -> delegate.needsTaskCommit(taskAttemptContext)); + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.commitTask(taskAttemptContext)); + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + ExceptionUtils.invokeWithProgramFailureHandling(() -> delegate.abortTask(taskAttemptContext)); + } + + @SuppressWarnings("rawtypes") + public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat, String tableName) + throws InterruptedException, IOException { + if (delegate instanceof DelegatingGCSOutputCommitter) { + ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling(() -> + ((DelegatingGCSOutputCommitter) delegate).addGCSOutputCommitterFromOutputFormat( + outputFormat, tableName)); + } else { + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Operation is not supported in the output committer: '%s'.", + delegate.getClass().getName()), null, ErrorType.SYSTEM, false, null + ); + } + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java new file mode 100644 index 0000000000..2574b9d79f --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingOutputFormat.java @@ -0,0 +1,84 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.sink; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.gcp.common.ExceptionUtils; +import io.cdap.plugin.gcp.common.GCPUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import java.io.IOException; + +/** + * ForwardingOutputFormat which delegates all operations to another OutputFormat. + *

+ * This is used to wrap the delegate output format and + * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.

+ */ +public class ForwardingOutputFormat extends OutputFormat { + private OutputFormat delegate; + + private OutputFormat getDelegateFormatInstance(Configuration configuration) { + if (delegate != null) { + return delegate; + } + + String delegateClassName = configuration.get( + GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME); + try { + delegate = (OutputFormat) ReflectionUtils + .newInstance(configuration.getClassByName(delegateClassName), configuration); + return delegate; + } catch (ClassNotFoundException e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate output format for class '%s'.", delegateClassName), + e.getMessage(), ErrorType.SYSTEM, false, e); + } + } + + @Override + public RecordWriter getRecordWriter ( + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling( + () -> getDelegateFormatInstance( + taskAttemptContext.getConfiguration()).getRecordWriter(taskAttemptContext)); + } + + @Override + public void checkOutputSpecs (JobContext jobContext) throws IOException, InterruptedException { + ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling( + () -> getDelegateFormatInstance(jobContext.getConfiguration()).checkOutputSpecs(jobContext)); + } + + @Override + public OutputCommitter getOutputCommitter (TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling( + () -> getDelegateFormatInstance( + taskAttemptContext.getConfiguration()).getOutputCommitter(taskAttemptContext)); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java new file mode 100644 index 0000000000..aeaae898b4 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/ForwardingRecordWriter.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.sink; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.gcp.common.ExceptionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; + +/** + * ForwardingRecordWriter which delegates all operations to another RecordWriter. + *

+ * This is used to wrap the RecordWriter of the delegate format and + * throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.

+ */ +public class ForwardingRecordWriter extends RecordWriter { + private final RecordWriter delegate; + + /** + * Constructor for ForwardingRecordWriter. + * @param delegate the delegate RecordWriter + */ + public ForwardingRecordWriter(RecordWriter delegate) { + this.delegate = delegate; + } + + @Override + public void write (NullWritable nullWritable, StructuredRecord structuredRecord) + throws IOException, InterruptedException { + ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling( + () -> delegate.write(nullWritable, structuredRecord)); + } + + @Override + public void close (TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + ExceptionUtils.invokeWithProgramFailureAndInterruptionHandling( + () -> delegate.close(taskAttemptContext)); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index 129b6a297f..96466b1077 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception { collector.addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`"); collector.getOrThrowException(); - return; } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; Bucket bucket; - String location; + String location = null; try { - bucket = storage.get(config.getBucket()); + bucket = storage.get(bucketName); + if (bucket != null) { + location = bucket.getLocation(); + } else { + location = config.getLocation(); + GCPUtils.createBucket(storage, bucketName, location, cmekKeyName); + } } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); - } - if (bucket != null) { - location = bucket.getLocation(); - } else { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); - location = config.getLocation(); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + this.outputPath = getOutputDir(context); // create asset for lineage asset = Asset.builder(config.getReferenceName()) @@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) { } } - public String getBucket() { - return GCSPath.from(path).getBucket(); + public String getBucket(FailureCollector collector) { + try { + return GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + return null; } @Override @@ -718,8 +741,8 @@ public Builder setCustomContentType(@Nullable String customContentType) { return this; } - public GCSBatchSink.GCSBatchSinkConfig build() { - return new GCSBatchSink.GCSBatchSinkConfig( + public GCSBatchSinkConfig build() { + return new GCSBatchSinkConfig( referenceName, project, fileSystemProperties, diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 7b15839c10..3bf01cb257 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -129,33 +129,48 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati config.validate(collector, context.getArguments().asMap()); collector.getOrThrowException(); - Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, - config.getPath(), new HashMap<>()); Map argumentCopy = new HashMap<>(context.getArguments().asMap()); - CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); if (isServiceAccountFilePath == null) { - context.getFailureCollector().addFailure("Service account type is undefined.", - "Must be `filePath` or `JSON`"); - context.getFailureCollector().getOrThrowException(); + collector.addFailure("Service account type is undefined.", + "Must be `filePath` or `JSON`"); + collector.getOrThrowException(); return; } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; try { - if (storage.get(config.getBucket()) == null) { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); + if (storage.get(bucketName) == null) { + GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName); } } catch (StorageException e) { - // Add more descriptive error message - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, + config.getPath(), new HashMap<>()); if (config.getAllowFlexibleSchema()) { //Configure MultiSink with support for flexible schemas. configureSchemalessMultiSink(context, baseProperties, argumentCopy); @@ -194,9 +209,10 @@ private void configureMultiSinkWithSchema(BatchSinkContext context, config.splitField, name, schema)); outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name)); outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType()); + outputProperties.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, RecordFilterOutputFormat.class.getName()); context.addOutput(Output.of( config.getReferenceName() + "_" + name, - new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties))); + new SinkOutputFormatProvider(ForwardingOutputFormat.class.getName(), outputProperties))); } } @@ -208,13 +224,14 @@ private void configureSchemalessMultiSink(BatchSinkContext context, Map outputProperties = new HashMap<>(baseProperties); outputProperties.putAll(validatingOutputFormat.getOutputFormatConfiguration()); outputProperties.putAll(DelegatingGCSOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), + DelegatingGCSOutputFormat.class.getName(), config.splitField, config.getOutputBaseDir(), config.getOutputSuffix(context.getLogicalStartTime()))); outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType()); context.addOutput(Output.of( config.getReferenceName(), - new SinkOutputFormatProvider(DelegatingGCSOutputFormat.class.getName(), outputProperties))); + new SinkOutputFormatProvider(ForwardingOutputFormat.class.getName(), outputProperties))); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java index 1d57e6d28e..bd0d70b7ff 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java @@ -1,8 +1,13 @@ package io.cdap.plugin.gcp.gcs.sink; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; +import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -36,13 +41,14 @@ public void validate(FormatContext context) { @Override public String getOutputFormatClassName() { - return GCSOutputFormat.class.getName(); + return ForwardingOutputFormat.class.getName(); } @Override public Map getOutputFormatConfiguration() { Map outputFormatConfiguration = new HashMap<>(delegate.getOutputFormatConfiguration()); outputFormatConfiguration.put(DELEGATE_OUTPUTFORMAT_CLASSNAME, delegate.getOutputFormatClassName()); + outputFormatConfiguration.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, GCSOutputFormat.class.getName()); return outputFormatConfiguration; } @@ -52,7 +58,7 @@ public Map getOutputFormatConfiguration() { public static class GCSOutputFormat extends OutputFormat { private OutputFormat delegateFormat; - private OutputFormat getDelegateFormatInstance(Configuration configuration) throws IOException { + private OutputFormat getDelegateFormatInstance(Configuration configuration) { if (delegateFormat != null) { return delegateFormat; } @@ -63,9 +69,9 @@ private OutputFormat getDelegateFormatInstance(Configuration configuration) thro .newInstance(configuration.getClassByName(delegateClassName), configuration); return delegateFormat; } catch (ClassNotFoundException e) { - throw new IOException( - String.format("Unable to instantiate output format for class %s", delegateClassName), - e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate output format for class '%s'.", delegateClassName), + e.getMessage(), ErrorType.SYSTEM, false, e); } } @@ -74,7 +80,7 @@ public RecordWriter getRecordWriter(TaskAttemptC IOException, InterruptedException { RecordWriter originalWriter = getDelegateFormatInstance(taskAttemptContext.getConfiguration()) .getRecordWriter(taskAttemptContext); - return new GCSRecordWriter(originalWriter); + return new ForwardingRecordWriter(new GCSRecordWriter(originalWriter)); } @Override @@ -87,7 +93,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) InterruptedException { OutputCommitter delegateCommitter = getDelegateFormatInstance(taskAttemptContext.getConfiguration()) .getOutputCommitter(taskAttemptContext); - return new GCSOutputCommitter(delegateCommitter); + return new ForwardingOutputCommitter(new GCSOutputCommitter(delegateCommitter)); } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java index f4f5d6caca..82b0ebd2dd 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java @@ -18,6 +18,11 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -68,7 +73,8 @@ public RecordWriter getRecordWriter(TaskAttemptC String passthroughVal = hConf.get(PASS_VALUE); Schema schema = Schema.parseJson(hConf.get(ORIGINAL_SCHEMA)); - return new FilterRecordWriter(delegate, filterField, passthroughVal, schema); + return new ForwardingRecordWriter( + new FilterRecordWriter(delegate, filterField, passthroughVal, schema)); } @Override @@ -78,8 +84,9 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputCommitter outputCommitter = getDelegateFormat(context.getConfiguration()).getOutputCommitter(context); - return new GCSOutputCommitter(outputCommitter); + Configuration hConf = context.getConfiguration(); + OutputCommitter outputCommitter = getDelegateFormat(hConf).getOutputCommitter(context); + return new ForwardingOutputCommitter(new GCSOutputCommitter(outputCommitter)); } private OutputFormat getDelegateFormat(Configuration hConf) throws IOException { @@ -89,7 +96,9 @@ private OutputFormat getDelegateFormat(Configuration hConf) throws IOException { (Class>) hConf.getClassByName(delegateClassName); return delegateClass.newInstance(); } catch (Exception e) { - throw new IOException("Unable to instantiate output format for class " + delegateClassName, e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to instantiate output format for class: '%s'.", delegateClassName), + e.getMessage(), ErrorType.SYSTEM, false, e); } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index 7bfec4dd81..9283595f22 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -86,22 +86,52 @@ protected String getEmptyInputFormatClassName() { @Override public void prepareRun(BatchSourceContext context) throws Exception { // Get location of the source for lineage - String location; - String bucketName = GCSPath.from(config.getPath()).getBucket(); - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), - config.connection.isServiceAccountFilePath()); + String location = null; + String bucketName = null; + String path = config.getPath(); + FailureCollector collector = context.getFailureCollector(); + + try { + bucketName = GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); + if (isServiceAccountFilePath == null) { + collector.addFailure("Service account type is undefined.", + "Must be `filePath` or `JSON`"); + collector.getOrThrowException(); + } + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); try { location = storage.get(bucketName).getLocation(); } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access bucket %s. ", bucketName) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format("Error code: %s, Unable to access GCS bucket '%s'. ", + e.getCode(), bucketName); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), + "Ensure you entered the correct bucket path and have permissions for it.") + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } // create asset for lineage - String fqn = GCSPath.getFQN(config.getPath()); + String fqn = GCSPath.getFQN(path); String referenceName = Strings.isNullOrEmpty(config.getReferenceName()) ? ReferenceNames.normalizeFqn(fqn) : config.getReferenceName(); @@ -142,7 +172,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { @Override protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) { - lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.", + lineageRecorder.recordRead("Read", String.format("Read %s from Google Cloud Storage.", config.isEncrypted() ? " and decrypt " : " "), outputFields); } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java index f6092f66f5..424badbc4f 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java @@ -24,6 +24,12 @@ import com.google.crypto.tink.StreamingAead; import com.google.crypto.tink.config.TinkConfig; import com.google.crypto.tink.integration.gcpkms.GcpKmsClient; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.runtime.spi.runtimejob.ProgramRunFailureException; +import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.crypto.Decryptor; import io.cdap.plugin.gcp.crypto.FSInputSeekableByteChannel; import org.apache.hadoop.conf.Configurable; @@ -66,20 +72,22 @@ public TinkDecryptor() throws GeneralSecurityException { @Override public SeekableByteChannel open(FileSystem fs, Path path, int bufferSize) throws IOException { DecryptInfo decryptInfo = getDecryptInfo(fs, path); + Path metadataPath = new Path(path.getParent(), path.getName() + metadataSuffix); if (decryptInfo == null) { - throw new IllegalArgumentException("Missing encryption metadata for file '" + path - + "'. Expected metadata path is '" - + new Path(path.getParent(), path.getName() + metadataSuffix) + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing encryption metadata for file '%s'. " + + "Expected metadata path is '%s'.", path, metadataPath), null, + ErrorType.USER, false, null); } try { StreamingAead streamingAead = decryptInfo.getKeysetHandle().getPrimitive(StreamingAead.class); return streamingAead.newSeekableDecryptingChannel(new FSInputSeekableByteChannel(fs, path, bufferSize), decryptInfo.getAad()); - } catch (IOException e) { - throw e; } catch (Exception e) { - throw new IOException(e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.", + path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e); } } @@ -88,7 +96,9 @@ public void setConf(Configuration configuration) { this.configuration = configuration; this.metadataSuffix = configuration.get(METADATA_SUFFIX); if (metadataSuffix == null) { - throw new IllegalArgumentException("Missing configuration '" + METADATA_SUFFIX + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing configuration '%s'.", METADATA_SUFFIX), null, + ErrorType.USER, false, null); } } @@ -112,7 +122,13 @@ private DecryptInfo getDecryptInfo(FileSystem fs, Path path) throws IOException } // Create the DecryptInfo - return getDecryptInfo(metadata); + try { + return getDecryptInfo(metadata); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.", + path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e); + } } static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException { diff --git a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java index 37950d531c..21bedf5b5e 100644 --- a/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java +++ b/src/test/java/io/cdap/plugin/gcp/dataplex/sink/DataPlexOutputFormatProviderTest.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.gcp.dataplex.common.util.DataplexConstants; @@ -238,7 +239,7 @@ public void testGetRecordWriterWithDifferentDataset() throws IOException, Interr configuration.set("mapred.bq.output.gcs.fileformat", "AVRO"); configuration.set(DELEGATE_OUTPUTFORMAT_CLASSNAME, "class"); when(mockContext.getConfiguration()).thenReturn(configuration); - Assert.assertThrows(IOException.class, () -> { + Assert.assertThrows(ProgramFailureException.class, () -> { dataplexOutputFormat.getRecordWriter(mockContext); }); } diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java index 798cd4392d..17c4be0a93 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java @@ -25,8 +25,9 @@ public class GCSOutputformatProviderTest { @Test public void testRecordWriter() throws IOException, InterruptedException { RecordWriter mockWriter = Mockito.mock(RecordWriter.class); - GCSOutputFormatProvider.GCSRecordWriter recordWriterToTest = new GCSOutputFormatProvider.GCSRecordWriter( - mockWriter); + ForwardingRecordWriter recordWriterToTest = + new ForwardingRecordWriter(new GCSOutputFormatProvider.GCSRecordWriter( + mockWriter)); NullWritable mockWritable = Mockito.mock(NullWritable.class); StructuredRecord mockRecord = Mockito.mock(StructuredRecord.class); for (int i = 0; i < 5; i++) { diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java index 91cc1d6074..1dac23a7ad 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java @@ -87,8 +87,10 @@ public TestDelegatingGCSOutputCommitter() throws IOException { private void writeOutput(TaskAttemptContext context, DelegatingGCSOutputCommitter committer) throws IOException, InterruptedException { NullWritable nullWritable = NullWritable.get(); - DelegatingGCSRecordWriter delegatingGCSRecordWriter = new DelegatingGCSRecordWriter(context, key1, - committer); + ForwardingRecordWriter delegatingGCSRecordWriter = new ForwardingRecordWriter( + new DelegatingGCSRecordWriter(context, key1, + new ForwardingOutputCommitter(committer), + new DelegatingGCSOutputFormat())); try { delegatingGCSRecordWriter.write(nullWritable, record1); delegatingGCSRecordWriter.write(nullWritable, record2);