Skip to content

Commit

Permalink
Merge pull request #527 from cloudsufi/fem/postgresql
Browse files Browse the repository at this point in the history
[PLUGIN-1830] Add PostgresErrorDetailsProvider
  • Loading branch information
psainics authored Dec 20, 2024
2 parents 0ef198a + 9b9d979 commit 5af79a6
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;

import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -67,8 +66,8 @@ private ProgramFailureException getProgramFailureException(SQLException e, Error
String sqlState = e.getSQLState();
int errorCode = e.getErrorCode();
String errorMessageWithDetails = String.format(
"Error occurred in the phase: '%s'. Error message: '%s'. Error code: '%s'. sqlState: '%s'",
errorContext.getPhase(), errorMessage, errorCode, sqlState);
"Error occurred in the phase: '%s' with sqlState: '%s', errorCode: '%s', errorMessage: %s",
errorContext.getPhase(), sqlState, errorCode, errorMessage);
String externalDocumentationLink = getExternalDocumentationLink();
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
if (!errorMessageWithDetails.endsWith(".")) {
Expand All @@ -77,9 +76,10 @@ private ProgramFailureException getProgramFailureException(SQLException e, Error
errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails,
externalDocumentationLink);
}
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode), false, ErrorCodeType.SQLSTATE,
sqlState, externalDocumentationLink, e);
return ErrorUtils.getProgramFailureException(Strings.isNullOrEmpty(sqlState) ?
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN) : getErrorCategoryFromSqlState(sqlState),
errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode, sqlState), true,
ErrorCodeType.SQLSTATE, sqlState, externalDocumentationLink, e);
}

/**
Expand Down Expand Up @@ -121,7 +121,11 @@ protected String getExternalDocumentationLink() {
return null;
}

protected ErrorType getErrorTypeFromErrorCode(int errorCode) {
protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
return ErrorType.UNKNOWN;
}

protected ErrorCategory getErrorCategoryFromSqlState(String sqlState) {
return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class DBUtils {
public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();
public static final String MYSQL_SUPPORTED_DOC_URL = "https://dev.mysql.com/doc/mysql-errors/9.0/en/";
public static final String CLOUDSQLMYSQL_SUPPORTED_DOC_URL = "https://cloud.google.com/sql/docs/mysql/error-messages";
public static final String POSTGRES_SUPPORTED_DOC_URL =
"https://www.postgresql.org/docs/current/errcodes-appendix.html";

// Java by default uses October 15, 1582 as a Gregorian cut over date.
// Any timestamp created with time less than this cut over date is treated as Julian date.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected String getExternalDocumentationLink() {
}

@Override
protected ErrorType getErrorTypeFromErrorCode(int errorCode) {
protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
// https://dev.mysql.com/doc/refman/9.0/en/error-message-elements.html#error-code-ranges
if (errorCode >= 1000 && errorCode <= 5999) {
return ErrorType.USER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ Feature: PostgreSQL - Verify data transfer from PostgreSQL source to BigQuery si
And Save and Deploy Pipeline
And Run the Pipeline in Runtime
And Wait till pipeline is in running state
And Open and capture logs
And Verify the pipeline status is "Failed"
And Close the pipeline logs
Then Open Pipeline logs and verify Log entries having below listed Level and Message:
| Level | Message |
| ERROR | errorLogsMessageInvalidBoundingQuery |
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

package io.cdap.plugin.postgres;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;

/**
* Postgres constants.
*/
public final class PostgresConstants {

private PostgresConstants() {
throw new AssertionError("Should not instantiate static utility class.");
String errorMessage = "Should not instantiate static utility class.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, errorMessage, ErrorType.SYSTEM, false, new AssertionError(errorMessage));
}

public static final String PLUGIN_NAME = "Postgres";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.postgres;

import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.plugin.db.DBErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;

import java.util.HashMap;
import java.util.Map;

/**
* A custom ErrorDetailsProvider for Postgres plugins.
*/
public class PostgresErrorDetailsProvider extends DBErrorDetailsProvider {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
private static final Map<String, ErrorType> ERROR_CODE_TO_ERROR_TYPE;
private static final Map<String, ErrorCategory> ERROR_CODE_TO_ERROR_CATEGORY;
static {
ERROR_CODE_TO_ERROR_TYPE = new HashMap<>();
ERROR_CODE_TO_ERROR_TYPE.put("01", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("02", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("08", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("0A", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("22", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("23", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("28", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("40", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("42", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("53", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("54", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("55", ErrorType.USER);
ERROR_CODE_TO_ERROR_TYPE.put("57", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("58", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("P0", ErrorType.SYSTEM);
ERROR_CODE_TO_ERROR_TYPE.put("XX", ErrorType.SYSTEM);

ErrorCategory.ErrorCategoryEnum plugin = ErrorCategory.ErrorCategoryEnum.PLUGIN;
ERROR_CODE_TO_ERROR_CATEGORY = new HashMap<>();
ERROR_CODE_TO_ERROR_CATEGORY.put("01", new ErrorCategory(plugin, "Warning"));
ERROR_CODE_TO_ERROR_CATEGORY.put("02", new ErrorCategory(plugin, "No Data"));
ERROR_CODE_TO_ERROR_CATEGORY.put("08", new ErrorCategory(plugin, "Postgres Server Connection Exception"));
ERROR_CODE_TO_ERROR_CATEGORY.put("0A", new ErrorCategory(plugin, "Postgres Server Feature Not Supported"));
ERROR_CODE_TO_ERROR_CATEGORY.put("22", new ErrorCategory(plugin, "Postgres Server Data Exception"));
ERROR_CODE_TO_ERROR_CATEGORY.put("23", new ErrorCategory(plugin, "Postgres Integrity Constraint Violation"));
ERROR_CODE_TO_ERROR_CATEGORY.put("28", new ErrorCategory(plugin, "Postgres Invalid Authorization Specification"));
ERROR_CODE_TO_ERROR_CATEGORY.put("40", new ErrorCategory(plugin, "Transaction Rollback"));
ERROR_CODE_TO_ERROR_CATEGORY.put("42", new ErrorCategory(plugin, "Syntax Error or Access Rule Violation"));
ERROR_CODE_TO_ERROR_CATEGORY.put("53", new ErrorCategory(plugin, "Postgres Server Insufficient Resources"));
ERROR_CODE_TO_ERROR_CATEGORY.put("54", new ErrorCategory(plugin, "Postgres Program Limit Exceeded"));
ERROR_CODE_TO_ERROR_CATEGORY.put("55", new ErrorCategory(plugin, "Object Not in Prerequisite State"));
ERROR_CODE_TO_ERROR_CATEGORY.put("57", new ErrorCategory(plugin, "Operator Intervention"));
ERROR_CODE_TO_ERROR_CATEGORY.put("58", new ErrorCategory(plugin, "Postgres Server System Error"));
ERROR_CODE_TO_ERROR_CATEGORY.put("P0", new ErrorCategory(plugin, "PL/pgSQL Error"));
ERROR_CODE_TO_ERROR_CATEGORY.put("XX", new ErrorCategory(plugin, "Postgres Server Internal Error"));
}

@Override
protected String getExternalDocumentationLink() {
return DBUtils.POSTGRES_SUPPORTED_DOC_URL;
}

@Override
protected ErrorType getErrorTypeFromErrorCode(int errorCode, String sqlState) {
if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 &&
ERROR_CODE_TO_ERROR_TYPE.containsKey(sqlState.substring(0, 2))) {
return ERROR_CODE_TO_ERROR_TYPE.get(sqlState.substring(0, 2));
}
return ErrorType.UNKNOWN;
}

@Override
protected ErrorCategory getErrorCategoryFromSqlState(String sqlState) {
if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 &&
ERROR_CODE_TO_ERROR_CATEGORY.containsKey(sqlState.substring(0, 2))) {
return ERROR_CODE_TO_ERROR_CATEGORY.get(sqlState.substring(0, 2));
}
return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
return new LineageRecorder(context, asset);
}

@Override
protected String getErrorDetailsProviderClassName() {
return PostgresErrorDetailsProvider.class.getName();
}

/**
* PostgreSQL action configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,21 @@ protected SchemaReader getSchemaReader() {
return new PostgresSchemaReader();
}

@Override
protected String getErrorDetailsProviderClassName() {
return PostgresErrorDetailsProvider.class.getName();
}

@Override
protected Class<? extends DBWritable> getDBRecordType() {
return PostgresDBRecord.class;
}

@Override
protected String getExternalDocumentationLink() {
return DBUtils.POSTGRES_SUPPORTED_DOC_URL;
}

@Override
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
String fqn = DBUtils.constructFQN("postgres",
Expand Down

0 comments on commit 5af79a6

Please sign in to comment.