From d273336c04369debbb49bc64752b9dee6e2de92f Mon Sep 17 00:00:00 2001 From: Amit Kumar Singh Date: Fri, 6 Dec 2024 08:18:01 +0000 Subject: [PATCH] Added HTTP error details provider, refactored HTTP-sink and source package to handle error provider, fix sonar issues and added Validation error for linear retry duration --- .../http/common/HttpErrorDetailsProvider.java | 120 ++++++++++++ .../http/sink/batch/HTTPOutputFormat.java | 12 +- .../http/sink/batch/HTTPRecordWriter.java | 173 ++++++++++-------- .../cdap/plugin/http/sink/batch/HTTPSink.java | 29 ++- .../http/sink/batch/HTTPSinkConfig.java | 82 +++++---- .../plugin/http/sink/batch/MessageBuffer.java | 42 +++-- .../http/sink/batch/PlaceholderBean.java | 3 +- .../http/source/batch/HttpBatchSource.java | 21 ++- .../source/batch/HttpBatchSourceConfig.java | 65 ++++--- .../http/source/batch/HttpInputFormat.java | 3 +- .../source/common/BaseHttpSourceConfig.java | 90 ++++----- .../common/DelimitedSchemaDetector.java | 7 +- .../http/source/common/RawStringPerLine.java | 10 +- 13 files changed, 420 insertions(+), 237 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java diff --git a/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java new file mode 100644 index 00000000..f16dccf0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java @@ -0,0 +1,120 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; + +import java.util.List; + +import java.util.NoSuchElementException; + +/** + * Error details provided for the HTTP + **/ +public class HttpErrorDetailsProvider implements ErrorDetailsProvider { + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext); + } + if (t instanceof InvalidConfigPropertyException) { + return getProgramFailureException((InvalidConfigPropertyException) t, errorContext); + } + if (t instanceof NoSuchElementException) { + return getProgramFailureException((NoSuchElementException) t, errorContext); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalArgumentException}. + * + * @param e The IllegalArgumentException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalStateException}. + * + * @param e The IllegalStateException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link InvalidConfigPropertyException}. + * + * @param e The InvalidConfigPropertyException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(InvalidConfigPropertyException e, + ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link NoSuchElementException}. + * + * @param e The NoSuchElementException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(NoSuchElementException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } +} diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java index 867d46be..45cd88d4 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java @@ -37,12 +37,16 @@ public class HTTPOutputFormat extends OutputFormat getRecordWriter(TaskAttemptContext context) - throws IOException { + public RecordWriter getRecordWriter(TaskAttemptContext context) { Configuration hConf = context.getConfiguration(); HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class); - Schema inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); - return new HTTPRecordWriter(config, inputSchema); + Schema inputSchema; + try { + inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); + return new HTTPRecordWriter(config, inputSchema); + } catch (IOException e) { + throw new IllegalStateException("Unable to parse the input schema. Reason: " + e.getMessage(), e); + } } @Override diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index 2180737b..e6575255 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -17,10 +17,12 @@ package io.cdap.plugin.http.sink.batch; import com.google.auth.oauth2.AccessToken; -import com.google.common.base.Charsets; import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.http.common.RetryPolicy; import io.cdap.plugin.http.common.error.ErrorHandling; import io.cdap.plugin.http.common.error.HttpErrorHandler; @@ -55,10 +57,10 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; -import java.net.ProtocolException; import java.net.URI; import java.net.URL; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; @@ -72,7 +74,6 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -90,9 +91,9 @@ public class HTTPRecordWriter extends RecordWriter placeHolderList; + private final List placeHolderList; private final Map headers; private AccessToken accessToken; @@ -107,31 +108,30 @@ public class HTTPRecordWriter extends RecordWriter duration.multiply(2), - Duration.FIVE_HUNDRED_MILLISECONDS); + Duration.FIVE_HUNDRED_MILLISECONDS); } url = config.getUrl(); placeHolderList = getPlaceholderListFromURL(); } @Override - public void write(StructuredRecord input, StructuredRecord unused) throws IOException { + public void write(StructuredRecord input, StructuredRecord unused) { configURL = url; if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT) || - config.getMethod().equals(REQUEST_METHOD_PATCH)) { + config.getMethod().equals(REQUEST_METHOD_PATCH)) { messageBuffer.add(input); } if (config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) || - config.getMethod().equals(REQUEST_METHOD_DELETE) - && !placeHolderList.isEmpty()) { + config.getMethod().equals(REQUEST_METHOD_DELETE) && !placeHolderList.isEmpty()) { configURL = updateURLWithPlaceholderValue(input); } @@ -151,7 +151,7 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException, Int private void disableSSLValidation() { TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; + return new X509Certificate[0]; } public void checkClientTrusted(X509Certificate[] certs, String authType) { @@ -169,76 +169,92 @@ public void checkServerTrusted(X509Certificate[] certs, String authType) { throw new IllegalStateException("Error while installing the trust manager: " + e.getMessage(), e); } HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory()); - HostnameVerifier allHostsValid = new HostnameVerifier() { - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; + HostnameVerifier allHostsValid = (hostname, session) -> true; HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } - private boolean executeHTTPServiceAndCheckStatusCode() throws IOException { + private boolean executeHTTPServiceAndCheckStatusCode() { LOG.debug("HTTP Request Attempt No. : {}", ++retryCount); - CloseableHttpClient httpClient = createHttpClient(configURL); - CloseableHttpResponse response = null; + // Try-with-resources ensures proper resource management + try (CloseableHttpClient httpClient = createHttpClient(configURL); + CloseableHttpResponse response = executeHttpRequest(httpClient, new URL(configURL))) { + httpStatusCode = response.getStatusLine().getStatusCode(); + LOG.debug("Response HTTP Status code: {}", httpStatusCode); + httpResponseBody = new HttpResponse(response).getBody(); + + RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); + boolean shouldRetry = errorHandlingStrategy.shouldRetry(); + + if (!shouldRetry) { + messageBuffer.clear(); + retryCount = 0; + } + return !shouldRetry; + + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid URL: " + configURL, e); + } catch (IOException e) { + LOG.warn("Error making {} request to URL {}.", config.getMethod(), config.getUrl()); + String errorMessage = "Unable to make request. "; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); + } + } + + private CloseableHttpResponse executeHttpRequest(CloseableHttpClient httpClient, URL url) { try { - URL url = new URL(configURL); - HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(String.valueOf(url)), - config.getMethod()); - - if (url.getProtocol().equalsIgnoreCase("https")) { - // Disable SSLv3 - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - if (config.getDisableSSLValidation()) { - disableSSLValidation(); - } + HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(url.toString()), config.getMethod()); + + if ("https".equalsIgnoreCase(url.getProtocol())) { + configureHttpsSettings(); } if (!messageBuffer.isEmpty()) { String requestBodyString = messageBuffer.getMessage(); if (requestBodyString != null) { - StringEntity requestBody = new StringEntity(requestBodyString, Charsets.UTF_8.toString()); + StringEntity requestBody = new StringEntity(requestBodyString, StandardCharsets.UTF_8.name()); request.setEntity(requestBody); } } request.setHeaders(getRequestHeaders()); - response = httpClient.execute(request); - httpStatusCode = response.getStatusLine().getStatusCode(); - LOG.debug("Response HTTP Status code: {}", httpStatusCode); - httpResponseBody = new HttpResponse(response).getBody(); + // Execute the request and return the response + return httpClient.execute(request); - } catch (MalformedURLException | ProtocolException e) { - throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("Error encoding the request Reason: " + e.getMessage(), e); } catch (IOException e) { - LOG.warn("Error making {} request to url {}.", config.getMethod(), config.getUrl()); - } finally { - if (response != null) { - response.close(); - } - } - RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); - boolean shouldRetry = errorHandlingStrategy.shouldRetry(); - if (!shouldRetry) { - messageBuffer.clear(); - retryCount = 0; + String errorMessage = String.format("Unable to execute HTTP request to %s.", url); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new IOException(errorMessage)); + } catch (Exception e) { + String errorMessage = String.format("Unexpected error occurred while executing HTTP request to URL: %s", url); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + errorMessage, ErrorType.UNKNOWN, true, e); } - return !shouldRetry; } + private void configureHttpsSettings() { + System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); + if (Boolean.TRUE.equals(config.getDisableSSLValidation())) { + disableSSLValidation(); + } + } - public CloseableHttpClient createHttpClient(String pageUriStr) throws IOException { + public CloseableHttpClient createHttpClient(String pageUriStr) { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); // set timeouts - Long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); - Long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); + long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); + long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); RequestConfig.Builder requestBuilder = RequestConfig.custom(); - requestBuilder.setSocketTimeout(readTimeoutMillis.intValue()); - requestBuilder.setConnectTimeout(connectTimeoutMillis.intValue()); - requestBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue()); + requestBuilder.setSocketTimeout((int) readTimeoutMillis); + requestBuilder.setConnectTimeout((int) connectTimeoutMillis); + requestBuilder.setConnectionRequestTimeout((int) connectTimeoutMillis); httpClientBuilder.setDefaultRequestConfig(requestBuilder.build()); // basic auth @@ -247,7 +263,7 @@ public CloseableHttpClient createHttpClient(String pageUriStr) throws IOExceptio URI uri = URI.create(pageUriStr); AuthScope authScope = new AuthScope(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme())); credentialsProvider.setCredentials(authScope, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); } // proxy and proxy auth @@ -255,8 +271,8 @@ public CloseableHttpClient createHttpClient(String pageUriStr) throws IOExceptio HttpHost proxyHost = HttpHost.create(config.getProxyUrl()); if (!Strings.isNullOrEmpty(config.getProxyUsername()) && !Strings.isNullOrEmpty(config.getProxyPassword())) { credentialsProvider.setCredentials(new AuthScope(proxyHost), - new UsernamePasswordCredentials( - config.getProxyUsername(), config.getProxyPassword())); + new UsernamePasswordCredentials( + config.getProxyUsername(), config.getProxyPassword())); } httpClientBuilder.setProxy(proxyHost); } @@ -274,23 +290,20 @@ private Header[] getRequestHeaders() throws IOException { if (accessToken != null) { Header authorizationHeader = getAuthorizationHeader(accessToken); - if (authorizationHeader != null) { - clientHeaders.add(authorizationHeader); - } + clientHeaders.add(authorizationHeader); } headers.put("Request-Method", config.getMethod().toUpperCase()); headers.put("Instance-Follow-Redirects", String.valueOf(config.getFollowRedirects())); headers.put("charset", config.getCharset()); - if (config.getMethod().equals(REQUEST_METHOD_POST) - || config.getMethod().equals(REQUEST_METHOD_PATCH) - || config.getMethod().equals(REQUEST_METHOD_PUT)) { - if (!headers.containsKey("Content-Type")) { - headers.put("Content-Type", contentType); - } + if ((config.getMethod().equals(REQUEST_METHOD_POST) + || config.getMethod().equals(REQUEST_METHOD_PATCH) + || config.getMethod().equals(REQUEST_METHOD_PUT)) && !headers.containsKey("Content-Type")) { + headers.put("Content-Type", contentType); } + // set default headers if (headers != null) { for (Map.Entry headerEntry : this.headers.entrySet()) { @@ -311,7 +324,7 @@ private Header getAuthorizationHeader(AccessToken accessToken) { private List getPlaceholderListFromURL() { List placeholderList = new ArrayList<>(); if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) || - config.getMethod().equals(REQUEST_METHOD_DELETE))) { + config.getMethod().equals(REQUEST_METHOD_DELETE))) { return placeholderList; } Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); @@ -350,30 +363,32 @@ private void flushMessageBuffer() { contentType = messageBuffer.getContentType(); try { Awaitility - .await().with() - .pollInterval(pollInterval) - .pollDelay(config.getWaitTimeBetweenPages(), TimeUnit.MILLISECONDS) - .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) - .until(this::executeHTTPServiceAndCheckStatusCode); + .await().with() + .pollInterval(pollInterval) + .pollDelay(config.getWaitTimeBetweenPages(), TimeUnit.MILLISECONDS) + .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) + .until(this::executeHTTPServiceAndCheckStatusCode); } catch (Exception e) { - throw new RuntimeException("Error while executing http request for remaining input messages " + - "after the batch execution. " + e); + String errorMessage = "Error while executing http request for remaining input messages" + + " after the batch execution."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new RuntimeException(errorMessage)); } messageBuffer.clear(); ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode) - .getAfterRetryStrategy(); + .getAfterRetryStrategy(); switch (postRetryStrategy) { case SUCCESS: break; case STOP: throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - config.getUrl(), httpStatusCode, httpResponseBody)); + config.getUrl(), httpStatusCode, httpResponseBody)); case SKIP: case SEND: LOG.warn(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - config.getUrl(), httpStatusCode, httpResponseBody)); + config.getUrl(), httpStatusCode, httpResponseBody)); break; default: throw new IllegalArgumentException(String.format("Unexpected http error handling: '%s'", postRetryStrategy)); diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java index f33036f5..89e47cb2 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java @@ -1,3 +1,4 @@ + /* * Copyright © 2017 Cask Data, Inc. * @@ -30,12 +31,15 @@ import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -45,7 +49,7 @@ @Name("HTTP") @Description("Sink plugin to send the messages from the pipeline to an external http endpoint.") public class HTTPSink extends BatchSink { - private HTTPSinkConfig config; + private final HTTPSinkConfig config; public HTTPSink(HTTPSinkConfig config) { this.config = config; @@ -70,16 +74,24 @@ public void prepareRun(BatchSinkContext context) { Schema inputSchema = context.getInputSchema(); Asset asset = Asset.builder(config.getReferenceNameOrNormalizedFQN()) - .setFqn(config.getUrl()).build(); + .setFqn(config.getUrl()).build(); LineageRecorder lineageRecorder = new LineageRecorder(context, asset); lineageRecorder.createExternalDataset(context.getInputSchema()); - List fields = inputSchema == null ? - Collections.emptyList() : - inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()); + List fields; + if (inputSchema == null) { + fields = Collections.emptyList(); + } else { + fields = Objects.requireNonNull(Objects.requireNonNull(inputSchema).getFields()).stream() + .map(Schema.Field::getName).collect(Collectors.toList()); + } lineageRecorder.recordWrite("Write", String.format("Wrote to HTTP '%s'", config.getUrl()), fields); + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); + context.addOutput(Output.of(config.getReferenceNameOrNormalizedFQN(), - new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); + new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); + } /** @@ -104,9 +116,8 @@ public String getOutputFormatClassName() { public Map getOutputFormatConfiguration() { Schema defaultValidSchema = Schema.recordOf("schema", Schema.Field.of("body", Schema.of(Schema.Type.STRING))); return ImmutableMap.of("http.sink.config", GSON.toJson(config), - "http.sink.input.schema", - inputSchema == null ? defaultValidSchema.toString() : inputSchema.toString()); + "http.sink.input.schema", + inputSchema == null ? defaultValidSchema.toString() : inputSchema.toString()); } } - } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java index cb3cc9ff..d9847c6b 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java @@ -1,3 +1,4 @@ + /* * Copyright © 2019 Cask Data, Inc. * @@ -80,12 +81,12 @@ public class HTTPSinkConfig extends BaseHttpConfig { private static final String REGEX_HASHED_VAR = "#(\\w+)"; private static final String PLACEHOLDER = "#"; private static final Set METHODS = ImmutableSet.of(HttpMethod.GET, HttpMethod.POST, - HttpMethod.PUT, HttpMethod.DELETE, "PATCH"); + HttpMethod.PUT, HttpMethod.DELETE, "PATCH"); @Name(URL) @Description("The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that " + - "can be substituted with column value at the runtime. E.g. https://customer-url/user/#user_id. Here user_id " + - "column should exist in input schema. (Macro Enabled)") + "can be substituted with column value at the runtime. E.g. https://customer-url/user/#user_id. " + + "Here user_id column should exist in input schema. (Macro Enabled)") @Macro private final String url; @@ -108,7 +109,7 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Name(JSON_BATCH_KEY) @Nullable @Description("Optional key to be used for wrapping json array as object. " + - "Leave empty for no wrapping of the array (Macro Enabled)") + "Leave empty for no wrapping of the array (Macro Enabled)") @Macro private final String jsonBatchKey; @@ -126,10 +127,10 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Name(BODY) @Nullable @Description("Optional custom message. This is required if the message format is set to 'Custom'." + - "User can leverage incoming message fields in the post payload. For example-" + - "User has defined payload as \\{ \"messageType\" : \"update\", \"name\" : \"#firstName\" \\}" + - "where #firstName will be substituted for the value that is in firstName in the incoming message. " + - "(Macro enabled)") + "User can leverage incoming message fields in the post payload. For example-" + + "User has defined payload as \\{ \"messageType\" : \"update\", \"name\" : \"#firstName\" \\}" + + "where #firstName will be substituted for the value that is in firstName in the incoming message. " + + "(Macro enabled)") @Macro private final String body; @@ -151,16 +152,16 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Name(DISABLE_SSL_VALIDATION) @Description("If user enables SSL validation, they will be expected to add the certificate to the trustStore" + - " on each machine. Defaults to true. (Macro enabled)") + " on each machine. Defaults to true. (Macro enabled)") @Macro private final Boolean disableSSLValidation; @Nullable @Name(PROPERTY_HTTP_ERROR_HANDLING) @Description("Defines the error handling strategy to use for certain HTTP response codes." + - "The left column contains a regular expression for HTTP status code. The right column contains an action which" + - "is done in case of match. If HTTP status code matches multiple regular expressions, " + - "the first specified in mapping is matched.") + "The left column contains a regular expression for HTTP status code. " + + "The right column contains an action which is done in case of match. " + + "If HTTP status code matches multiple regular expressions, the first specified in mapping is matched.") protected String httpErrorsHandling; @Nullable @@ -178,7 +179,7 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Description("Interval in seconds between retries. Is only used if retry policy is \"linear\".") @Macro protected Long linearRetryInterval; - + @Nullable @Name(PROPERTY_MAX_RETRY_DURATION) @Description("Maximum time in seconds retries can take. Default value is 600 seconds (10 minute).") @@ -187,14 +188,14 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Name(CONNECTION_TIMEOUT) @Description("Sets the connection timeout in milliseconds. Set to 0 for infinite. Default is 60000 (1 minute). " + - "(Macro enabled)") + "(Macro enabled)") @Nullable @Macro private final Integer connectTimeout; @Name(READ_TIMEOUT) @Description("The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute). " + - "(Macro enabled)") + "(Macro enabled)") @Nullable @Macro private final Integer readTimeout; @@ -204,7 +205,7 @@ public HTTPSinkConfig(String referenceName, String url, String method, Integer b @Nullable String requestHeaders, String charset, boolean followRedirects, boolean disableSSLValidation, @Nullable String httpErrorsHandling, String errorHandling, String retryPolicy, @Nullable Long linearRetryInterval, - Long maxRetryDuration, @Nullable int readTimeout, @Nullable int connectTimeout, + Long maxRetryDuration, int readTimeout, int connectTimeout, String oauth2Enabled, String authType, @Nullable String jsonBatchKey, Boolean writeJsonAsArray) { super(referenceName); @@ -345,10 +346,10 @@ public RetryPolicy getRetryPolicy() { private static T getEnumValueByString(Class enumClass, String stringValue, String propertyName) { return Stream.of(enumClass.getEnumConstants()) - .filter(keyType -> keyType.getValue().equalsIgnoreCase(stringValue)) - .findAny() - .orElseThrow(() -> new InvalidConfigPropertyException( - String.format("Unsupported value for '%s': '%s'", propertyName, stringValue), propertyName)); + .filter(keyType -> keyType.getValue().equalsIgnoreCase(stringValue)) + .findAny() + .orElseThrow(() -> new InvalidConfigPropertyException( + String.format("Unsupported value for '%s': '%s'", propertyName, stringValue), propertyName)); } @Nullable @@ -393,13 +394,14 @@ public List getHttpErrorHandlingEntries() { String regex = entry.getKey(); try { results.add(new HttpErrorHandlerEntity(Pattern.compile(regex), - getEnumValueByString(RetryableErrorHandling.class, - entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); + getEnumValueByString(RetryableErrorHandling.class, + entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); } catch (PatternSyntaxException e) { // We embed causing exception message into this one. Since this message is shown on UI when validation fails. throw new InvalidConfigPropertyException( - String.format( - "Error handling regex '%s' is not valid. %s", regex, e.getMessage()), PROPERTY_HTTP_ERROR_HANDLING); + String.format( + "Error handling regex '%s' is not valid. %s", regex, e.getMessage()), + PROPERTY_HTTP_ERROR_HANDLING); } } return results; @@ -431,31 +433,31 @@ public void validate(FailureCollector collector) { new URL(url); } catch (MalformedURLException e) { collector.addFailure(String.format("URL '%s' is malformed: %s", url, e.getMessage()), null) - .withConfigProperty(URL); + .withConfigProperty(URL); } } if (!containsMacro(CONNECTION_TIMEOUT) && Objects.nonNull(connectTimeout) && connectTimeout < 0) { collector.addFailure("Connection Timeout cannot be a negative number.", null) - .withConfigProperty(CONNECTION_TIMEOUT); + .withConfigProperty(CONNECTION_TIMEOUT); } try { convertHeadersToMap(requestHeaders); } catch (IllegalArgumentException e) { collector.addFailure(e.getMessage(), null) - .withConfigProperty(REQUEST_HEADERS); + .withConfigProperty(REQUEST_HEADERS); } if (!containsMacro(METHOD) && !METHODS.contains(method.toUpperCase())) { collector.addFailure( - String.format("Invalid request method %s, must be one of %s.", method, Joiner.on(',').join(METHODS)), null) - .withConfigProperty(METHOD); + String.format("Invalid request method %s, must be one of %s.", method, Joiner.on(',') + .join(METHODS)), null).withConfigProperty(METHOD); } if (!containsMacro(BATCH_SIZE) && batchSize != null && batchSize < 1) { collector.addFailure("Batch size must be greater than 0.", null) - .withConfigProperty(BATCH_SIZE); + .withConfigProperty(BATCH_SIZE); } // Validate Linear Retry Interval @@ -464,18 +466,24 @@ public void validate(FailureCollector collector) { } if (!containsMacro(READ_TIMEOUT) && Objects.nonNull(readTimeout) && readTimeout < 0) { collector.addFailure("Read Timeout cannot be a negative number.", null) - .withConfigProperty(READ_TIMEOUT); + .withConfigProperty(READ_TIMEOUT); } if (!containsMacro(MESSAGE_FORMAT) && !containsMacro("body") && messageFormat.equalsIgnoreCase("Custom") - && body == null) { + && body == null) { collector.addFailure("For Custom message format, message cannot be null.", null) - .withConfigProperty(MESSAGE_FORMAT); + .withConfigProperty(MESSAGE_FORMAT); + } + + if (!containsMacro(PROPERTY_LINEAR_RETRY_INTERVAL) && Objects.nonNull(linearRetryInterval) + && linearRetryInterval < 0) { + collector.addFailure("Linear Retry Interval cannot be a negative number.", null) + .withConfigProperty(PROPERTY_LINEAR_RETRY_INTERVAL); } if (!containsMacro(PROPERTY_MAX_RETRY_DURATION) && Objects.nonNull(maxRetryDuration) && maxRetryDuration < 0) { collector.addFailure("Max Retry Duration cannot be a negative number.", null) - .withConfigProperty(PROPERTY_MAX_RETRY_DURATION); + .withConfigProperty(PROPERTY_MAX_RETRY_DURATION); } } @@ -492,15 +500,15 @@ public void validateSchema(@Nullable Schema schema, FailureCollector collector) if (containsMacro(URL) || containsMacro(METHOD)) { return; } - + if ((method.equals("PUT") || method.equals("PATCH") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) { Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); Matcher matcher = pattern.matcher(url); - List fieldNames = fields.stream().map(field -> field.getName()).collect(Collectors.toList()); + List fieldNames = fields.stream().map(Schema.Field::getName).collect(Collectors.toList()); while (matcher.find()) { if (!fieldNames.contains(matcher.group(1))) { collector.addFailure(String.format("Schema must contain '%s' field mentioned in the url", matcher.group(1)), - null).withConfigProperty(URL); + null).withConfigProperty(URL); } } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java index 63725d3f..cbbe963d 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java @@ -1,3 +1,4 @@ + /* * Copyright © 2023 Cask Data, Inc. * @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -97,10 +99,10 @@ public MessageBuffer( /** * Adds a record to the buffer. * - * @param record The record to be added. + * @param structuredRecord The record to be added. */ - public void add(StructuredRecord record) { - buffer.add(record); + public void add(StructuredRecord structuredRecord) { + buffer.add(structuredRecord); } /** @@ -148,7 +150,7 @@ private String formatAsJson(List buffer) { private String formatAsJsonInternal(List buffer) throws IOException { boolean useJsonBatchKey = !Strings.isNullOrEmpty(jsonBatchKey); - if (!shouldWriteJsonAsArray || !useJsonBatchKey) { + if (Boolean.TRUE.equals(!shouldWriteJsonAsArray) || !useJsonBatchKey) { return getBufferAsJsonList(); } StructuredRecord wrappedMessageRecord = StructuredRecord.builder(wrappedMessageSchema) @@ -170,18 +172,18 @@ private String formatAsCustom(List buffer) { private String getBufferAsJsonList() throws IOException { StringBuilder sb = new StringBuilder(); - String delimiter = shouldWriteJsonAsArray ? "," : delimiterForMessages; - if (shouldWriteJsonAsArray) { + String delimiter = Boolean.TRUE.equals(shouldWriteJsonAsArray) ? "," : delimiterForMessages; + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("["); } - for (StructuredRecord record : buffer) { - sb.append(StructuredRecordStringConverter.toJsonString(record)); + for (StructuredRecord structuredRecord : buffer) { + sb.append(StructuredRecordStringConverter.toJsonString(structuredRecord)); sb.append(delimiter); } if (!buffer.isEmpty()) { sb.setLength(sb.length() - delimiter.length()); } - if (shouldWriteJsonAsArray) { + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("]"); } return sb.toString(); @@ -190,16 +192,18 @@ private String getBufferAsJsonList() throws IOException { private String createFormMessage(StructuredRecord input) { boolean first = true; String formMessage = null; - StringBuilder sb = new StringBuilder(""); - for (Schema.Field field : input.getSchema().getFields()) { - if (first) { - first = false; - } else { - sb.append("&"); + StringBuilder sb = new StringBuilder(); + if (input != null && input.getSchema() != null) { + for (Schema.Field field : Objects.requireNonNull(input.getSchema().getFields())) { + if (first) { + first = false; + } else { + sb.append("&"); + } + sb.append(field.getName()); + sb.append("="); + sb.append((String) input.get(field.getName())); } - sb.append(field.getName()); - sb.append("="); - sb.append((String) input.get(field.getName())); } try { formMessage = URLEncoder.encode(sb.toString(), charset); @@ -212,7 +216,7 @@ private String createFormMessage(StructuredRecord input) { private String createCustomMessage(StructuredRecord input) { String customMessage = customMessageBody; Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); - HashMap findReplaceMap = new HashMap(); + HashMap findReplaceMap = new HashMap<>(); while (matcher.find()) { if (input.get(matcher.group(1)) != null) { findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1))); diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java index 1520f8e0..d219df1c 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java @@ -22,13 +22,12 @@ public class PlaceholderBean { private static final String PLACEHOLDER_FORMAT = "#%s"; private final String placeHolderKey; - private final String placeHolderKeyWithPrefix; private final int startIndex; private final int endIndex; public PlaceholderBean(String url, String placeHolderKey) { + String placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.placeHolderKey = placeHolderKey; - this.placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.startIndex = url.indexOf(placeHolderKeyWithPrefix); this.endIndex = startIndex + placeHolderKeyWithPrefix.length(); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java index 9fff34d5..3171f094 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java @@ -31,14 +31,19 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.http.common.pagination.page.BasePage; + +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -85,14 +90,16 @@ public void prepareRun(BatchSourceContext context) { config.setConfigSchema(schema); config.validateSchema(); Asset asset = Asset.builder(config.getReferenceNameOrNormalizedFQN()) - .setFqn(config.getUrl()).build(); + .setFqn(config.getUrl()).build(); LineageRecorder lineageRecorder = new LineageRecorder(context, asset); lineageRecorder.createExternalDataset(schema); - lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), - Preconditions.checkNotNull(schema.getFields()).stream() - .map(Schema.Field::getName) - .collect(Collectors.toList())); + List getNameList = Objects.nonNull(schema) ? Preconditions.checkNotNull(schema.getFields()).stream() + .map(Schema.Field::getName) + .collect(Collectors.toList()) : new ArrayList<>(); + lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), getNameList); + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(), new HttpInputFormatProvider(config))); } @@ -121,7 +128,7 @@ public void transform(KeyValue input, Emitter getHttpErrorHandlingEntries() { String regex = entry.getKey(); try { results.add(new HttpErrorHandlerEntity(Pattern.compile(regex), - getEnumValueByString(RetryableErrorHandling.class, - entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); + getEnumValueByString(RetryableErrorHandling.class, + entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); } catch (PatternSyntaxException e) { // We embed causing exception message into this one. Since this message is shown on UI when validation fails. throw new InvalidConfigPropertyException( - String.format( - "Error handling regex '%s' is not valid. %s", regex, e.getMessage()), PROPERTY_HTTP_ERROR_HANDLING); + String.format( + "Error handling regex '%s' is not valid. %s", regex, e.getMessage()), + PROPERTY_HTTP_ERROR_HANDLING); } } return results; @@ -545,7 +547,7 @@ public Map getFullFieldsMapping() { Map result = new HashMap<>(); if (!Strings.isNullOrEmpty(schema)) { - for (Schema.Field field : getSchema().getFields()) { + for (Schema.Field field : Objects.requireNonNull(Objects.requireNonNull(getSchema()).getFields())) { result.put(field.getName(), "/" + field.getName()); } } @@ -576,24 +578,24 @@ public void validate(FailureCollector failureCollector) { if (!containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) { List httpErrorsHandlingEntries = getHttpErrorHandlingEntries(); boolean supportsSkippingPages = PaginationIteratorFactory - .createInstance(this, null).supportsSkippingPages(); + .createInstance(this, null).supportsSkippingPages(); if (!supportsSkippingPages) { for (HttpErrorHandlerEntity httpErrorsHandlingEntry : httpErrorsHandlingEntries) { ErrorHandling postRetryStrategy = httpErrorsHandlingEntry.getStrategy().getAfterRetryStrategy(); if (postRetryStrategy.equals(ErrorHandling.SEND) || - postRetryStrategy.equals(ErrorHandling.SKIP)) { + postRetryStrategy.equals(ErrorHandling.SKIP)) { throw new InvalidConfigPropertyException( - String.format("Error handling strategy '%s' is not support in combination with pagination type", + String.format("Error handling strategy '%s' is not support in combination with pagination type", httpErrorsHandlingEntry.getStrategy(), getPaginationType()), - PROPERTY_HTTP_ERROR_HANDLING); + PROPERTY_HTTP_ERROR_HANDLING); } } } } } catch (MalformedURLException e) { throw new InvalidConfigPropertyException( - String.format("URL value is not valid: '%s'", getUrl()), e, PROPERTY_URL); + String.format("URL value is not valid: '%s'", getUrl()), e, PROPERTY_URL); } } @@ -621,31 +623,31 @@ public void validate(FailureCollector failureCollector) { switch (getPaginationType()) { case LINK_IN_RESPONSE_BODY: propertiesShouldBeNotNull.put(PROPERTY_NEXT_PAGE_FIELD_PATH, - propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_FIELD_PATH)); + propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_FIELD_PATH)); break; case TOKEN_IN_RESPONSE_BODY: propertiesShouldBeNotNull.put(PROPERTY_NEXT_PAGE_TOKEN_PATH, - propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_TOKEN_PATH)); + propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_TOKEN_PATH)); propertiesShouldBeNotNull.put(PROPERTY_NEXT_PAGE_URL_PARAMETER, - propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_URL_PARAMETER)); + propertiesShouldBeNull.remove(PROPERTY_NEXT_PAGE_URL_PARAMETER)); break; case INCREMENT_AN_INDEX: propertiesShouldBeNotNull.put(PROPERTY_START_INDEX, - propertiesShouldBeNull.remove(PROPERTY_START_INDEX)); + propertiesShouldBeNull.remove(PROPERTY_START_INDEX)); propertiesShouldBeNotNull.put(PROPERTY_INDEX_INCREMENT, - propertiesShouldBeNull.remove(PROPERTY_INDEX_INCREMENT)); + propertiesShouldBeNull.remove(PROPERTY_INDEX_INCREMENT)); propertiesShouldBeNull.remove(PROPERTY_MAX_INDEX); // can be both null and non null if (!containsMacro(PROPERTY_URL) && !url.contains(PAGINATION_INDEX_PLACEHOLDER)) { throw new InvalidConfigPropertyException( - String.format("Url '%s' must contain '%s' placeholder when pagination type is '%s'", getUrl(), + String.format("Url '%s' must contain '%s' placeholder when pagination type is '%s'", getUrl(), PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), - PROPERTY_URL); + PROPERTY_URL); } break; case CUSTOM: propertiesShouldBeNotNull.put(PROPERTY_CUSTOM_PAGINATION_CODE, - propertiesShouldBeNull.remove(PROPERTY_CUSTOM_PAGINATION_CODE)); + propertiesShouldBeNull.remove(PROPERTY_CUSTOM_PAGINATION_CODE)); break; // other types don't require any fields. Check for unknown values is already done. Do nothing here } @@ -680,7 +682,7 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), if (!containsMacro(PROPERTY_VERIFY_HTTPS) && !getVerifyHttps()) { assertIsNotSet(getTrustStoreFile(), PROPERTY_TRUSTSTORE_FILE, - String.format("trustore settings are ignored due to disabled %s", PROPERTY_VERIFY_HTTPS)); + String.format("trustore settings are ignored due to disabled %s", PROPERTY_VERIFY_HTTPS)); } } @@ -729,7 +731,7 @@ public void validateSchema() { List fields = schema.getFields(); if (fields == null || fields.size() != 1 || fields.get(0).getSchema().getType() != expectedFieldType) { throw new InvalidStageException(String.format("Schema must be a record with a single %s field.", - expectedFieldType.toString().toLowerCase())); + expectedFieldType.toString().toLowerCase())); } } } @@ -738,14 +740,14 @@ public void validateSchema() { public static void assertIsSet(Object propertyValue, String propertyName, String reason) { if (propertyValue == null) { throw new InvalidConfigPropertyException( - String.format("Property '%s' must be set, since %s", propertyName, reason), propertyName); + String.format("Property '%s' must be set, since %s", propertyName, reason), propertyName); } } public static void assertIsNotSet(Object propertyValue, String propertyName, String reason) { if (propertyValue != null) { throw new InvalidConfigPropertyException( - String.format("Property '%s' must not be set, since %s", propertyName, reason), propertyName); + String.format("Property '%s' must not be set, since %s", propertyName, reason), propertyName); } } @@ -753,10 +755,10 @@ public static void assertIsNotSet(Object propertyValue, String propertyName, Str public static T getEnumValueByString(Class enumClass, String stringValue, String propertyName) { return Stream.of(enumClass.getEnumConstants()) - .filter(keyType -> keyType.getValue().equalsIgnoreCase(stringValue)) - .findAny() - .orElseThrow(() -> new InvalidConfigPropertyException( - String.format("Unsupported value for '%s': '%s'", propertyName, stringValue), propertyName)); + .filter(keyType -> keyType.getValue().equalsIgnoreCase(stringValue)) + .findAny() + .orElseThrow(() -> new InvalidConfigPropertyException( + String.format("Unsupported value for '%s': '%s'", propertyName, stringValue), propertyName)); } @Nullable @@ -769,7 +771,7 @@ public static Long toLong(String value, String propertyName) { return Long.parseLong(value); } catch (NumberFormatException ex) { throw new InvalidConfigPropertyException(String.format("Unsupported value for '%s': '%s'", propertyName, value), - propertyName); + propertyName); } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java index d7bc3665..f78478eb 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java +++ b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java @@ -1,3 +1,4 @@ + /* * Copyright © 2024 Cask Data, Inc. * @@ -27,6 +28,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Objects; /** * Class that detects the schema of the delimited file. @@ -46,7 +48,7 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter if (rowIndex == 0) { columnNames = DataTypeDetectorUtils.setColumnNames(line, config.getCsvSkipFirstRow(), config.getEnableQuotesValues(), delimiter); - if (config.getCsvSkipFirstRow()) { + if (Boolean.TRUE.equals(config.getCsvSkipFirstRow())) { continue; } } @@ -61,7 +63,8 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter return null; } List fields = DataTypeDetectorUtils.detectDataTypeOfEachDatasetColumn( - new HashMap<>(), columnNames, dataTypeDetectorStatusKeeper); + new HashMap<>(), (Objects.nonNull(columnNames) ? columnNames : new String[0]), + dataTypeDetectorStatusKeeper); return Schema.recordOf("text", fields); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java index 2943d0d1..63b125ac 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java +++ b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java @@ -1,3 +1,4 @@ + /* * Copyright © 2024 Cask Data, Inc. * @@ -16,6 +17,9 @@ package io.cdap.plugin.http.source.common; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.http.common.http.HttpResponse; import java.io.BufferedReader; @@ -61,14 +65,16 @@ public boolean hasNext() { isLineRead = true; return lastLine != null; } catch (IOException e) { // we need to catch this, since hasNext() does not have "throws" in parent - throw new RuntimeException("Failed to read line from http page buffer", e); + String errorMessage = "Unable to read line from http page buffer"; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); } } @Override public String next() { if (!hasNext()) { // calling hasNext will also read the line; - throw new NoSuchElementException(); + throw new NoSuchElementException("Unable to read the next line."); } isLineRead = false; return lastLine;