diff --git a/pom.xml b/pom.xml
index 2fa12018..3446ad87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,20 +76,20 @@
3.1.6
- 6.8.0-SNAPSHOT
+ 6.11.0-SNAPSHOT
3.9
1.12
2.8.5
0.4.0
- 2.3.0
+ 3.3.6
4.5.9
- 2.10.0-SNAPSHOT
+ 2.13.0-SNAPSHOT
2.12.0
- 2.9.9
+ 2.13.0
4.11
2.7.1
2.24.0
- 2.1.3
+ 3.3.2
0.9
1.49
@@ -150,6 +150,10 @@
org.slf4j
slf4j-log4j12
+
+ org.slf4j
+ slf4j-reload4j
+
org.apache.avro
avro
@@ -212,7 +216,7 @@
org.slf4j
- slf4j-log4j12
+ slf4j-reload4j
com.google.inject.extensions
@@ -253,14 +257,14 @@
org.apache.spark
- spark-streaming_2.11
- ${spark2.version}
+ spark-streaming_2.12
+ ${spark3.version}
provided
org.apache.spark
- spark-core_2.11
- ${spark2.version}
+ spark-core_2.12
+ ${spark3.version}
provided
@@ -410,19 +414,19 @@
io.cdap.cdap
- cdap-data-streams2_2.11
+ cdap-data-streams3_2.12
${cdap.version}
test
io.cdap.cdap
- cdap-data-pipeline2_2.11
+ cdap-data-pipeline3_2.12
${cdap.version}
test
io.cdap.cdap
- cdap-spark-core2_2.11
+ cdap-spark-core3_2.12
${cdap.version}
test
diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java
index 7f45b73d..f33036f5 100644
--- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java
+++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java
@@ -102,8 +102,10 @@ public String getOutputFormatClassName() {
@Override
public Map getOutputFormatConfiguration() {
+ Schema defaultValidSchema = Schema.recordOf("schema", Schema.Field.of("body", Schema.of(Schema.Type.STRING)));
return ImmutableMap.of("http.sink.config", GSON.toJson(config),
- "http.sink.input.schema", inputSchema == null ? "" : inputSchema.toString());
+ "http.sink.input.schema",
+ inputSchema == null ? defaultValidSchema.toString() : inputSchema.toString());
}
}
diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
index c2295562..f5345a44 100644
--- a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
+++ b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
@@ -135,6 +135,7 @@ public void testIncrementAnIndexXml() throws Exception {
.put(BaseHttpSourceConfig.PROPERTY_PAGINATION_TYPE, "Increment an index")
.put(BaseHttpSourceConfig.PROPERTY_START_INDEX, "0")
.put(BaseHttpSourceConfig.PROPERTY_INDEX_INCREMENT, "20")
+ .put(BaseHttpSourceConfig.PROPERTY_MAX_INDEX, "100")
.build();
wireMockRule.stubFor(WireMock.get(
diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java
index 9488a523..80ffc272 100644
--- a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java
+++ b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java
@@ -19,7 +19,6 @@
import io.cdap.cdap.api.artifact.ArtifactSummary;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.dataset.table.Table;
-import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.datastreams.DataStreamsApp;
import io.cdap.cdap.datastreams.DataStreamsSparkLauncher;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
@@ -27,7 +26,6 @@
import io.cdap.cdap.etl.proto.v2.DataStreamsConfig;
import io.cdap.cdap.etl.proto.v2.ETLPlugin;
import io.cdap.cdap.etl.proto.v2.ETLStage;
-import io.cdap.cdap.etl.spark.Compat;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.artifact.AppRequest;
import io.cdap.cdap.proto.id.ApplicationId;
@@ -37,12 +35,10 @@
import io.cdap.cdap.test.DataSetManager;
import io.cdap.cdap.test.ProgramManager;
import io.cdap.cdap.test.SparkManager;
-import io.cdap.cdap.test.TestConfiguration;
import io.cdap.plugin.http.source.streaming.HttpStreamingSource;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,11 +53,6 @@ public class HttpStreamingSourceETLTest extends HttpSourceETLTest {
private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 60;
private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 100;
- @ClassRule
- public static final TestConfiguration CONFIG =
- new TestConfiguration(Constants.Explore.EXPLORE_ENABLED, false,
- Constants.AppFabric.SPARK_COMPAT, Compat.SPARK_COMPAT);
-
@BeforeClass
public static void setupTest() throws Exception {
LOG.info("Setting up application");
diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java
index a87bbb61..a26d3d78 100644
--- a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java
+++ b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java
@@ -75,8 +75,7 @@ public class HTTPSinkTest extends HydratorTestBase {
protected static final ArtifactId BATCH_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "4.0.0");
protected static final ArtifactSummary BATCH_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0");
private static final Schema inputSchema = Schema.recordOf(
- "input-record",
- Schema.Field.of("id", Schema.of(Schema.Type.STRING)));
+ "input-record", Schema.Field.of("id", Schema.of(Schema.Type.STRING)));
private static NettyHttpService httpService;
protected static String baseURL;