diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index dfdb2609..2180737b 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -22,9 +22,11 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.http.common.RetryPolicy; +import io.cdap.plugin.http.common.error.ErrorHandling; import io.cdap.plugin.http.common.error.HttpErrorHandler; import io.cdap.plugin.http.common.error.RetryableErrorHandling; import io.cdap.plugin.http.common.http.HttpRequest; +import io.cdap.plugin.http.common.http.HttpResponse; import io.cdap.plugin.http.common.http.OAuthUtil; import org.apache.hadoop.mapreduce.RecordWriter; @@ -83,6 +85,7 @@ public class HTTPRecordWriter extends RecordWriter getPlaceholderListFromURL() { List placeholderList = new ArrayList<>(); - if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_DELETE))) { + if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) || + config.getMethod().equals(REQUEST_METHOD_DELETE))) { return placeholderList; } Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); @@ -351,6 +360,25 @@ private void flushMessageBuffer() { "after the batch execution. " + e); } messageBuffer.clear(); + + ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode) + .getAfterRetryStrategy(); + + switch (postRetryStrategy) { + case SUCCESS: + break; + case STOP: + throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", + config.getUrl(), httpStatusCode, httpResponseBody)); + case SKIP: + case SEND: + LOG.warn(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", + config.getUrl(), httpStatusCode, httpResponseBody)); + break; + default: + throw new IllegalArgumentException(String.format("Unexpected http error handling: '%s'", postRetryStrategy)); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java index 2c04d253..cb3cc9ff 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java @@ -80,7 +80,7 @@ public class HTTPSinkConfig extends BaseHttpConfig { private static final String REGEX_HASHED_VAR = "#(\\w+)"; private static final String PLACEHOLDER = "#"; private static final Set METHODS = ImmutableSet.of(HttpMethod.GET, HttpMethod.POST, - HttpMethod.PUT, HttpMethod.DELETE); + HttpMethod.PUT, HttpMethod.DELETE, "PATCH"); @Name(URL) @Description("The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that " + @@ -462,7 +462,6 @@ public void validate(FailureCollector collector) { if (!containsMacro(PROPERTY_RETRY_POLICY) && getRetryPolicy() == RetryPolicy.LINEAR) { assertIsSet(getLinearRetryInterval(), PROPERTY_LINEAR_RETRY_INTERVAL, "retry policy is linear"); } - if (!containsMacro(READ_TIMEOUT) && Objects.nonNull(readTimeout) && readTimeout < 0) { collector.addFailure("Read Timeout cannot be a negative number.", null) .withConfigProperty(READ_TIMEOUT); @@ -494,7 +493,7 @@ public void validateSchema(@Nullable Schema schema, FailureCollector collector) return; } - if ((method.equals("PUT") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) { + if ((method.equals("PUT") || method.equals("PATCH") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) { Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); Matcher matcher = pattern.matcher(url); List fieldNames = fields.stream().map(field -> field.getName()).collect(Collectors.toList()); diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormatProvider.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormatProvider.java index 0ca10d2e..59b15e8d 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormatProvider.java @@ -28,6 +28,7 @@ import io.cdap.plugin.http.common.pagination.page.PageFormat; import io.cdap.plugin.http.source.common.DelimitedSchemaDetector; import io.cdap.plugin.http.source.common.RawStringPerLine; +import org.apache.http.client.methods.CloseableHttpResponse; import java.io.IOException; import java.util.Map; @@ -68,8 +69,14 @@ public Schema getSchema(FailureCollector failureCollector) { case TSV: String delimiter = format == PageFormat.CSV ? "," : "\t"; try (HttpClient client = new HttpClient(config)) { - RawStringPerLine rawStringPerLine = new RawStringPerLine( - new HttpResponse(client.executeHTTP(config.getUrl()))); + CloseableHttpResponse closeableHttpResponse = client.executeHTTP(config.getUrl()); + int statusCode = closeableHttpResponse.getStatusLine().getStatusCode(); + if (statusCode < 200 || statusCode >= 300) { + failureCollector.addFailure(String.format("Failed to read the file, non 2xx status code received: %s", + statusCode), null); + return null; + } + RawStringPerLine rawStringPerLine = new RawStringPerLine(new HttpResponse(closeableHttpResponse)); return DelimitedSchemaDetector.detectSchema(config, delimiter, rawStringPerLine, failureCollector); } catch (IOException e) { String errorMessage = e.getMessage(); diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java index ecc3c9df..c994657b 100644 --- a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java @@ -154,6 +154,18 @@ public void testValidInputSchema() { Assert.assertTrue(collector.getValidationFailures().isEmpty()); } + @Test() + public void testValidInputWithPlaceHoldersWithPATCH() { + Schema schema = Schema.recordOf("record", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING))); + String dynamicUrl = "http://example.com/api/v1/book/#id"; + HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG).setMethod("PATCH").setUrl(dynamicUrl).build(); + MockFailureCollector collector = new MockFailureCollector("httpsinkwithvalidinputschema"); + config.validateSchema(schema, collector); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + @Test(expected = ValidationException.class) public void testHTTPSinkWithNegativeBatchSize() { HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) diff --git a/widgets/HTTP-batchsink.json b/widgets/HTTP-batchsink.json index c2eec1ea..4741833a 100644 --- a/widgets/HTTP-batchsink.json +++ b/widgets/HTTP-batchsink.json @@ -25,7 +25,8 @@ "POST", "GET", "PUT", - "DELETE" + "DELETE", + "PATCH" ], "default": "POST" } diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json index 341b4650..9eed8feb 100644 --- a/widgets/HTTP-batchsource.json +++ b/widgets/HTTP-batchsource.json @@ -26,7 +26,8 @@ "POST", "PUT", "DELETE", - "HEAD" + "HEAD", + "PATCH" ], "default": "GET" } diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json index 1f142ae8..e47c0b0a 100644 --- a/widgets/HTTP-streamingsource.json +++ b/widgets/HTTP-streamingsource.json @@ -26,7 +26,8 @@ "POST", "PUT", "DELETE", - "HEAD" + "HEAD", + "PATCH" ], "default": "GET" }