diff --git a/pom.xml b/pom.xml index 2fa12018..3446ad87 100644 --- a/pom.xml +++ b/pom.xml @@ -76,20 +76,20 @@ 3.1.6 - 6.8.0-SNAPSHOT + 6.11.0-SNAPSHOT 3.9 1.12 2.8.5 0.4.0 - 2.3.0 + 3.3.6 4.5.9 - 2.10.0-SNAPSHOT + 2.13.0-SNAPSHOT 2.12.0 - 2.9.9 + 2.13.0 4.11 2.7.1 2.24.0 - 2.1.3 + 3.3.2 0.9 1.49 @@ -150,6 +150,10 @@ org.slf4j slf4j-log4j12 + + org.slf4j + slf4j-reload4j + org.apache.avro avro @@ -212,7 +216,7 @@ org.slf4j - slf4j-log4j12 + slf4j-reload4j com.google.inject.extensions @@ -253,14 +257,14 @@ org.apache.spark - spark-streaming_2.11 - ${spark2.version} + spark-streaming_2.12 + ${spark3.version} provided org.apache.spark - spark-core_2.11 - ${spark2.version} + spark-core_2.12 + ${spark3.version} provided @@ -410,19 +414,19 @@ io.cdap.cdap - cdap-data-streams2_2.11 + cdap-data-streams3_2.12 ${cdap.version} test io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test io.cdap.cdap - cdap-spark-core2_2.11 + cdap-spark-core3_2.12 ${cdap.version} test diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java index 7f45b73d..f33036f5 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java @@ -102,8 +102,10 @@ public String getOutputFormatClassName() { @Override public Map getOutputFormatConfiguration() { + Schema defaultValidSchema = Schema.recordOf("schema", Schema.Field.of("body", Schema.of(Schema.Type.STRING))); return ImmutableMap.of("http.sink.config", GSON.toJson(config), - "http.sink.input.schema", inputSchema == null ? "" : inputSchema.toString()); + "http.sink.input.schema", + inputSchema == null ? defaultValidSchema.toString() : inputSchema.toString()); } } diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java index c2295562..f5345a44 100644 --- a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java +++ b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java @@ -135,6 +135,7 @@ public void testIncrementAnIndexXml() throws Exception { .put(BaseHttpSourceConfig.PROPERTY_PAGINATION_TYPE, "Increment an index") .put(BaseHttpSourceConfig.PROPERTY_START_INDEX, "0") .put(BaseHttpSourceConfig.PROPERTY_INDEX_INCREMENT, "20") + .put(BaseHttpSourceConfig.PROPERTY_MAX_INDEX, "100") .build(); wireMockRule.stubFor(WireMock.get( diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java index 9488a523..80ffc272 100644 --- a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java +++ b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java @@ -19,7 +19,6 @@ import io.cdap.cdap.api.artifact.ArtifactSummary; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.dataset.table.Table; -import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.datastreams.DataStreamsApp; import io.cdap.cdap.datastreams.DataStreamsSparkLauncher; import io.cdap.cdap.etl.api.streaming.StreamingSource; @@ -27,7 +26,6 @@ import io.cdap.cdap.etl.proto.v2.DataStreamsConfig; import io.cdap.cdap.etl.proto.v2.ETLPlugin; import io.cdap.cdap.etl.proto.v2.ETLStage; -import io.cdap.cdap.etl.spark.Compat; import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.proto.artifact.AppRequest; import io.cdap.cdap.proto.id.ApplicationId; @@ -37,12 +35,10 @@ import io.cdap.cdap.test.DataSetManager; import io.cdap.cdap.test.ProgramManager; import io.cdap.cdap.test.SparkManager; -import io.cdap.cdap.test.TestConfiguration; import io.cdap.plugin.http.source.streaming.HttpStreamingSource; import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +53,6 @@ public class HttpStreamingSourceETLTest extends HttpSourceETLTest { private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 60; private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 100; - @ClassRule - public static final TestConfiguration CONFIG = - new TestConfiguration(Constants.Explore.EXPLORE_ENABLED, false, - Constants.AppFabric.SPARK_COMPAT, Compat.SPARK_COMPAT); - @BeforeClass public static void setupTest() throws Exception { LOG.info("Setting up application"); diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java index a87bbb61..a26d3d78 100644 --- a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java +++ b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java @@ -75,8 +75,7 @@ public class HTTPSinkTest extends HydratorTestBase { protected static final ArtifactId BATCH_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "4.0.0"); protected static final ArtifactSummary BATCH_ARTIFACT = new ArtifactSummary("data-pipeline", "4.0.0"); private static final Schema inputSchema = Schema.recordOf( - "input-record", - Schema.Field.of("id", Schema.of(Schema.Type.STRING))); + "input-record", Schema.Field.of("id", Schema.of(Schema.Type.STRING))); private static NettyHttpService httpService; protected static String baseURL;