diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java new file mode 100644 index 000000000..453854fdc --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java @@ -0,0 +1,84 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.common; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpResponseException; +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; + +import java.util.List; + +/** + * A custom ErrorDetailsProvider for GCP plugins. + */ +public class GCPErrorDetailsProvider implements ErrorDetailsProvider { + + /** + * Get a ProgramFailureException with the given error + * information from generic exceptions like {@link java.io.IOException}. + * + * @param e The Throwable to get the error information from. + * @return A ProgramFailureException with the given error information, otherwise null. + */ + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof HttpResponseException) { + return getProgramFailureException((HttpResponseException) t, errorContext); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link HttpResponseException}. + * + * @param e The HttpResponseException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(HttpResponseException e, + ErrorContext errorContext) { + Integer statusCode = e.getStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(), + pair.getCorrectiveAction()); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + + String errorMessage = e.getMessage(); + if (e instanceof GoogleJsonResponseException) { + GoogleJsonResponseException exception = (GoogleJsonResponseException) e; + errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() : + exception.getMessage(); + } + + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), + pair.getErrorType(), true, e); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java index 0a64c52dd..4cedc83c8 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java @@ -21,6 +21,10 @@ import com.google.bigtable.repackaged.com.google.gson.Gson; import com.google.cloud.hadoop.util.AccessTokenProvider; import com.google.cloud.hadoop.util.CredentialFactory; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; @@ -50,13 +54,20 @@ public AccessToken getAccessToken() { } return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime()); } catch (IOException e) { - throw new RuntimeException(e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e); } } @Override public void refresh() throws IOException { - getCredentials().refresh(); + try { + getCredentials().refresh(); + } catch (IOException e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + "Unable to refresh service account access token.", e.getMessage(), + ErrorType.UNKNOWN, true, e); + } } private GoogleCredentials getCredentials() throws IOException { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index 129b6a297..13fc33570 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -39,6 +39,7 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; @@ -51,6 +52,7 @@ import io.cdap.plugin.format.plugin.FileSinkProperties; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPConnectorConfig; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.Formats; import io.cdap.plugin.gcp.gcs.GCSPath; @@ -121,30 +123,50 @@ public void prepareRun(BatchSinkContext context) throws Exception { collector.addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`"); collector.getOrThrowException(); - return; } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; Bucket bucket; - String location; + String location = null; try { - bucket = storage.get(config.getBucket()); + bucket = storage.get(bucketName); + if (bucket != null) { + location = bucket.getLocation(); + } else { + location = config.getLocation(); + GCPUtils.createBucket(storage, bucketName, location, cmekKeyName); + } } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); - } - if (bucket != null) { - location = bucket.getLocation(); - } else { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); - location = config.getLocation(); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + this.outputPath = getOutputDir(context); // create asset for lineage asset = Asset.builder(config.getReferenceName()) .setFqn(GCSPath.getFQN(config.getPath())).setLocation(location).build(); + + // set error details provider + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName())); // super is called down here to avoid instantiating the lineage recorder with a null asset super.prepareRun(context); @@ -532,8 +554,20 @@ public void validateContentType(FailureCollector failureCollector) { } } - public String getBucket() { - return GCSPath.from(path).getBucket(); + /** + * Get the bucket name from the path. + * @param collector failure collector + * @return bucket name as {@link String} if found, otherwise null. + */ + public String getBucket(FailureCollector collector) { + try { + return GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + return null; } @Override @@ -718,8 +752,8 @@ public Builder setCustomContentType(@Nullable String customContentType) { return this; } - public GCSBatchSink.GCSBatchSinkConfig build() { - return new GCSBatchSink.GCSBatchSinkConfig( + public GCSBatchSinkConfig build() { + return new GCSBatchSinkConfig( referenceName, project, fileSystemProperties, diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 7b15839c1..202663e63 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -41,10 +41,12 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.format.FileFormat; import io.cdap.plugin.gcp.common.CmekUtils; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.connector.GCSConnector; import org.apache.hadoop.io.NullWritable; @@ -129,33 +131,52 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati config.validate(collector, context.getArguments().asMap()); collector.getOrThrowException(); - Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, - config.getPath(), new HashMap<>()); Map argumentCopy = new HashMap<>(context.getArguments().asMap()); CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); if (isServiceAccountFilePath == null) { - context.getFailureCollector().addFailure("Service account type is undefined.", + collector.addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`"); - context.getFailureCollector().getOrThrowException(); - return; + collector.getOrThrowException(); } - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + String bucketName = config.getBucket(collector); Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; try { - if (storage.get(config.getBucket()) == null) { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); + if (storage.get(bucketName) == null) { + GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName); } } catch (StorageException e) { - // Add more descriptive error message - throw new RuntimeException( - String.format("Unable to access or create bucket %s. ", config.getBucket()) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format(errorReasonFormat, e.getCode()); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } + // set error details provider + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName())); + + Map baseProperties = GCPUtils.getFileSystemProperties(config.connection, + config.getPath(), new HashMap<>()); if (config.getAllowFlexibleSchema()) { //Configure MultiSink with support for flexible schemas. configureSchemalessMultiSink(context, baseProperties, argumentCopy); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index 7bfec4dd8..9cd745e7d 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -28,11 +28,13 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; @@ -42,6 +44,7 @@ import io.cdap.plugin.format.plugin.AbstractFileSourceConfig; import io.cdap.plugin.format.plugin.FileSourceProperties; import io.cdap.plugin.gcp.common.GCPConnectorConfig; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.common.GCSEmptyInputFormat; import io.cdap.plugin.gcp.crypto.EncryptedFileSystem; @@ -85,29 +88,63 @@ protected String getEmptyInputFormatClassName() { @Override public void prepareRun(BatchSourceContext context) throws Exception { - // Get location of the source for lineage - String location; - String bucketName = GCSPath.from(config.getPath()).getBucket(); - Credentials credentials = config.connection.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), - config.connection.isServiceAccountFilePath()); + FailureCollector collector = context.getFailureCollector(); + + String path = config.getPath(); + String bucketName = null; + try { + bucketName = GCSPath.from(path).getBucket(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + + Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath(); + if (isServiceAccountFilePath == null) { + collector.addFailure("Service account type is undefined.", + "Must be `filePath` or `JSON`"); + collector.getOrThrowException(); + } + + Credentials credentials = null; + try { + credentials = config.connection.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), + isServiceAccountFilePath); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + String location = null; try { + // Get location of the source for lineage location = storage.get(bucketName).getLocation(); } catch (StorageException e) { - throw new RuntimeException( - String.format("Unable to access bucket %s. ", bucketName) - + "Ensure you entered the correct bucket path and have permissions for it.", e); + String errorReason = String.format("Error code: %s, Unable to access GCS bucket '%s'. ", + e.getCode(), bucketName); + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), + "Ensure you entered the correct bucket path and have permissions for it.") + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } // create asset for lineage - String fqn = GCSPath.getFQN(config.getPath()); + String fqn = GCSPath.getFQN(path); String referenceName = Strings.isNullOrEmpty(config.getReferenceName()) ? ReferenceNames.normalizeFqn(fqn) : config.getReferenceName(); asset = Asset.builder(referenceName) .setFqn(fqn).setLocation(location).build(); + // set error details provider + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName())); + // super is called down here to avoid instantiating the lineage recorder with a null asset super.prepareRun(context); } @@ -142,7 +179,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { @Override protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) { - lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.", + lineageRecorder.recordRead("Read", String.format("Read %s from Google Cloud Storage.", config.isEncrypted() ? " and decrypt " : " "), outputFields); } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java index f6092f66f..a43c545fb 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/TinkDecryptor.java @@ -24,6 +24,10 @@ import com.google.crypto.tink.StreamingAead; import com.google.crypto.tink.config.TinkConfig; import com.google.crypto.tink.integration.gcpkms.GcpKmsClient; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.crypto.Decryptor; import io.cdap.plugin.gcp.crypto.FSInputSeekableByteChannel; import org.apache.hadoop.conf.Configurable; @@ -66,18 +70,18 @@ public TinkDecryptor() throws GeneralSecurityException { @Override public SeekableByteChannel open(FileSystem fs, Path path, int bufferSize) throws IOException { DecryptInfo decryptInfo = getDecryptInfo(fs, path); + Path metadataPath = new Path(path.getParent(), path.getName() + metadataSuffix); if (decryptInfo == null) { - throw new IllegalArgumentException("Missing encryption metadata for file '" + path - + "'. Expected metadata path is '" - + new Path(path.getParent(), path.getName() + metadataSuffix) + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing encryption metadata for file '%s'. " + + "Expected metadata path is '%s'.", path, metadataPath), null, + ErrorType.USER, false, null); } try { StreamingAead streamingAead = decryptInfo.getKeysetHandle().getPrimitive(StreamingAead.class); return streamingAead.newSeekableDecryptingChannel(new FSInputSeekableByteChannel(fs, path, bufferSize), decryptInfo.getAad()); - } catch (IOException e) { - throw e; } catch (Exception e) { throw new IOException(e); } @@ -88,7 +92,9 @@ public void setConf(Configuration configuration) { this.configuration = configuration; this.metadataSuffix = configuration.get(METADATA_SUFFIX); if (metadataSuffix == null) { - throw new IllegalArgumentException("Missing configuration '" + METADATA_SUFFIX + "'"); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Missing configuration '%s'.", METADATA_SUFFIX), null, + ErrorType.USER, false, null); } } @@ -112,7 +118,13 @@ private DecryptInfo getDecryptInfo(FileSystem fs, Path path) throws IOException } // Create the DecryptInfo - return getDecryptInfo(metadata); + try { + return getDecryptInfo(metadata); + } catch (Exception e) { + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + String.format("Unable to decrypt the file '%s' using encryption metadata file '%s'.", + path, metadataPath), e.getMessage(), ErrorType.UNKNOWN, false, e); + } } static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException { @@ -125,8 +137,6 @@ static DecryptInfo getDecryptInfo(JSONObject metadata) throws IOException { byte[] aad = Base64.getDecoder().decode(metadata.getString(AAD)); return new DecryptInfo(handle, aad); - } catch (IOException e) { - throw e; } catch (Exception e) { throw new IOException(e); }