diff --git a/pom.xml b/pom.xml index 3446ad8..076af7c 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ HTTP Plugins io.cdap http-plugins - 1.5.0-SNAPSHOT + 1.5.4-SNAPSHOT diff --git a/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java index c78b824..793412a 100644 --- a/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java +++ b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java @@ -28,6 +28,7 @@ import io.cdap.plugin.http.common.http.OAuthUtil; import java.io.File; +import java.util.Objects; import java.util.Optional; import javax.annotation.Nullable; @@ -349,10 +350,16 @@ public void validate(FailureCollector failureCollector) { // Validate OAuth2 properties if (!containsMacro(PROPERTY_OAUTH2_ENABLED) && this.getOauth2Enabled()) { String reasonOauth2 = "OAuth2 is enabled"; - assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); - assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); - assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); - assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + assertIsSetWithFailureCollector(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2, failureCollector); + } + + if (!containsMacro(PROPERTY_WAIT_TIME_BETWEEN_PAGES) && Objects.nonNull(waitTimeBetweenPages) + && waitTimeBetweenPages < 0) { + failureCollector.addFailure("Wait Time Between Pages cannot be a negative number.", + null).withConfigProperty(PROPERTY_WAIT_TIME_BETWEEN_PAGES); } // Validate Authentication properties @@ -361,16 +368,18 @@ public void validate(FailureCollector failureCollector) { case OAUTH2: String reasonOauth2 = "OAuth2 is enabled"; if (!containsMacro(PROPERTY_TOKEN_URL)) { - assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); + assertIsSetWithFailureCollector(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2, failureCollector); } if (!containsMacro(PROPERTY_CLIENT_ID)) { - assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); + assertIsSetWithFailureCollector(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2, failureCollector); } if (!containsMacro((PROPERTY_CLIENT_SECRET))) { - assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); + assertIsSetWithFailureCollector(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2, + failureCollector); } if (!containsMacro(PROPERTY_REFRESH_TOKEN)) { - assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + assertIsSetWithFailureCollector(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2, + failureCollector); } break; case SERVICE_ACCOUNT: @@ -390,10 +399,12 @@ public void validate(FailureCollector failureCollector) { case BASIC_AUTH: String reasonBasicAuth = "Basic Authentication is enabled"; if (!containsMacro(PROPERTY_USERNAME)) { - assertIsSet(getUsername(), PROPERTY_USERNAME, reasonBasicAuth); + assertIsSetWithFailureCollector(getUsername(), PROPERTY_USERNAME, reasonBasicAuth, + failureCollector); } if (!containsMacro(PROPERTY_PASSWORD)) { - assertIsSet(getPassword(), PROPERTY_PASSWORD, reasonBasicAuth); + assertIsSetWithFailureCollector(getPassword(), PROPERTY_PASSWORD, reasonBasicAuth, + failureCollector); } break; } @@ -405,4 +416,12 @@ public static void assertIsSet(Object propertyValue, String propertyName, String String.format("Property '%s' must be set, since %s", propertyName, reason), propertyName); } } + + public static void assertIsSetWithFailureCollector(Object propertyValue, String propertyName, String reason, + FailureCollector failureCollector) { + if (propertyValue == null) { + failureCollector.addFailure(String.format("Property '%s' must be set, since %s", propertyName, reason), + null).withConfigProperty(propertyName); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index e657525..5cd9616 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -197,9 +197,8 @@ private boolean executeHTTPServiceAndCheckStatusCode() { } catch (IOException e) { LOG.warn("Error making {} request to URL {}.", config.getMethod(), config.getUrl()); String errorMessage = "Unable to make request. "; - throw ErrorUtils.getProgramFailureException( - new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, e); } } @@ -229,12 +228,12 @@ private CloseableHttpResponse executeHttpRequest(CloseableHttpClient httpClient, } catch (IOException e) { String errorMessage = String.format("Unable to execute HTTP request to %s.", url); throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new IOException(errorMessage)); + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } catch (Exception e) { String errorMessage = String.format("Unexpected error occurred while executing HTTP request to URL: %s", url); - throw ErrorUtils.getProgramFailureException( - new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, - errorMessage, ErrorType.UNKNOWN, true, e); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, e); } } @@ -372,7 +371,8 @@ private void flushMessageBuffer() { String errorMessage = "Error while executing http request for remaining input messages" + " after the batch execution."; throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new RuntimeException(errorMessage)); + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, + new RuntimeException(e)); } messageBuffer.clear(); diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java index f3391fb..dca5326 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java @@ -58,6 +58,7 @@ public void validate(FailureCollector failureCollector) { failureCollector.addFailure("Sample size must be greater than 0.", null) .withConfigProperty(PROPERTY_SAMPLE_SIZE); } + validateCredentials(failureCollector); } @@ -94,7 +95,7 @@ private void validateOAuth2Credentials(FailureCollector collector) { } catch (IOException e) { String errorMessage = "Unable to validate OAuth and process the request."; throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, e); } } } @@ -126,7 +127,7 @@ public void validateBasicAuthResponse(FailureCollector collector, HttpClient htt } catch (IOException e) { String errorMessage = "Unable to process the response and validate credentials"; throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, e); } }