From feaca2073bd779abfae0609bbe8b1a841e1a6c57 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 22 Nov 2024 01:29:13 -0800 Subject: [PATCH] alsotestwith --- .../streaming/state/RocksDBSuite.scala | 19 ------- .../TransformWithListStateSuite.scala | 21 ++++---- .../TransformWithListStateTTLSuite.scala | 6 +-- .../TransformWithMapStateSuite.scala | 15 +++--- .../TransformWithMapStateTTLSuite.scala | 4 +- .../streaming/TransformWithStateSuite.scala | 54 +++++++++---------- .../streaming/TransformWithStateTTLTest.scala | 5 +- .../TransformWithValueStateTTLSuite.scala | 4 +- 8 files changed, 56 insertions(+), 72 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 3a25fa73afc31..61ca8e7c32f61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -141,25 +141,6 @@ trait AlsoTestWithChangelogCheckpointingEnabled } } - def testWithEncodingTypes( - testName: String, - testTags: Tag*) - (testBody: => Any): Unit = { - Seq("unsaferow", "avro").foreach { encoding => - super.test(testName + s" (encoding = $encoding)", testTags: _*) { - // in case tests have any code that needs to execute before every test - // super.beforeEach() - withSQLConf( - SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> - encoding) { - testBody - } - // in case tests have any code that needs to execute after every test - // super.afterEach() - } - } - } - def testWithColumnFamilies( testName: String, testMode: TestMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala index 9f26de126e0dd..5d88db0d01ba4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.Encoders import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, AlsoTestWithEncodingTypes, RocksDBStateStoreProvider} import org.apache.spark.sql.internal.SQLConf case class InputRow(key: String, action: String, value: String) @@ -127,10 +127,11 @@ class ToggleSaveAndEmitProcessor } class TransformWithListStateSuite extends StreamTest - with AlsoTestWithChangelogCheckpointingEnabled { + with AlsoTestWithChangelogCheckpointingEnabled + with AlsoTestWithEncodingTypes { import testImplicits._ - testWithEncodingTypes("test appending null value in list state throw exception") { + test("test appending null value in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -150,7 +151,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test putting null value in list state throw exception") { + test("test putting null value in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -170,7 +171,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test putting null list in list state throw exception") { + test("test putting null list in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -190,7 +191,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test appending null list in list state throw exception") { + test("test appending null list in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -210,7 +211,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test putting empty list in list state throw exception") { + test("test putting empty list in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -230,7 +231,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test appending empty list in list state throw exception") { + test("test appending empty list in list state throw exception") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -250,7 +251,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test list state correctness") { + test("test list state correctness") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -307,7 +308,7 @@ class TransformWithListStateSuite extends StreamTest } } - testWithEncodingTypes("test ValueState And ListState in Processor") { + test("test ValueState And ListState in Processor") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala index ebd29bff5d354..409a255ae3e64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala @@ -105,7 +105,7 @@ class TransformWithListStateTTLSuite extends TransformWithStateTTLTest { override def getStateTTLMetricName: String = "numListStateWithTTLVars" - testWithEncodingTypes("verify iterator works with expired values in beginning of list") { + test("verify iterator works with expired values in beginning of list") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> "1") { @@ -195,7 +195,7 @@ class TransformWithListStateTTLSuite extends TransformWithStateTTLTest { // ascending order of TTL by stopping the query, setting the new TTL, and restarting // the query to check that the expired elements in the middle or end of the list // are not returned. - testWithEncodingTypes("verify iterator works with expired values in middle of list") { + test("verify iterator works with expired values in middle of list") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> "1") { @@ -343,7 +343,7 @@ class TransformWithListStateTTLSuite extends TransformWithStateTTLTest { } } - testWithEncodingTypes("verify iterator works with expired values in end of list") { + test("verify iterator works with expired values in end of list") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala index 410ad55cad480..ec6ff4fcceb67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.Encoders import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, AlsoTestWithEncodingTypes, RocksDBStateStoreProvider} import org.apache.spark.sql.internal.SQLConf case class InputMapRow(key: String, action: String, value: (String, String)) @@ -81,7 +81,8 @@ class TestMapStateProcessor * operators such as transformWithState. */ class TransformWithMapStateSuite extends StreamTest - with AlsoTestWithChangelogCheckpointingEnabled { + with AlsoTestWithChangelogCheckpointingEnabled + with AlsoTestWithEncodingTypes { import testImplicits._ private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = { @@ -110,7 +111,7 @@ class TransformWithMapStateSuite extends StreamTest } } - testWithEncodingTypes("Test retrieving value with non-existing user key") { + test("Test retrieving value with non-existing user key") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -129,12 +130,12 @@ class TransformWithMapStateSuite extends StreamTest } Seq("getValue", "containsKey", "updateValue", "removeKey").foreach { mapImplFunc => - testWithEncodingTypes(s"Test $mapImplFunc with null user key") { + test(s"Test $mapImplFunc with null user key") { testMapStateWithNullUserKey(InputMapRow("k1", mapImplFunc, (null, ""))) } } - testWithEncodingTypes("Test put value with null value") { + test("Test put value with null value") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -158,7 +159,7 @@ class TransformWithMapStateSuite extends StreamTest } } - testWithEncodingTypes("Test map state correctness") { + test("Test map state correctness") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { val inputData = MemoryStream[InputMapRow] @@ -219,7 +220,7 @@ class TransformWithMapStateSuite extends StreamTest } } - testWithEncodingTypes("transformWithMapState - batch should succeed") { + test("transformWithMapState - batch should succeed") { val inputData = Seq( InputMapRow("k1", "updateValue", ("v1", "10")), InputMapRow("k1", "getValue", ("v1", ""))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala index a68632534c001..022280eb3bcef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala @@ -182,7 +182,7 @@ class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest { override def getStateTTLMetricName: String = "numMapStateWithTTLVars" - testWithEncodingTypes("validate state is evicted with multiple user keys") { + test("validate state is evicted with multiple user keys") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> "1") { @@ -224,7 +224,7 @@ class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest { } } - testWithEncodingTypes("verify iterator doesn't return expired keys") { + test("verify iterator doesn't return expired keys") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index b5e2104d86251..91a47645f4179 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -429,11 +429,11 @@ class SleepingTimerProcessor extends StatefulProcessor[String, String, String] { * Class that adds tests for transformWithState stateful streaming operator */ class TransformWithStateSuite extends StateStoreMetricsTest - with AlsoTestWithChangelogCheckpointingEnabled { + with AlsoTestWithChangelogCheckpointingEnabled with AlsoTestWithEncodingTypes { import testImplicits._ - testWithEncodingTypes("transformWithState - streaming with rocksdb and" + + test("transformWithState - streaming with rocksdb and" + " invalid processor should fail") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, @@ -455,7 +455,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - lazy iterators can properly get/set keyed state") { + test("transformWithState - lazy iterators can properly get/set keyed state") { val spark = this.spark import spark.implicits._ @@ -533,7 +533,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - streaming with rocksdb should succeed") { + test("transformWithState - streaming with rocksdb should succeed") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -571,7 +571,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - streaming with rocksdb and processing time timer " + + test("transformWithState - streaming with rocksdb and processing time timer " + "should succeed") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -616,7 +616,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - streaming with rocksdb and processing time timer " + + test("transformWithState - streaming with rocksdb and processing time timer " + "and updating timers should succeed") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -652,7 +652,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - streaming with rocksdb and processing time timer " + + test("transformWithState - streaming with rocksdb and processing time timer " + "and multiple timers should succeed") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -689,7 +689,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - streaming with rocksdb and event " + + test("transformWithState - streaming with rocksdb and event " + "time based timer") { val inputData = MemoryStream[(String, Int)] val result = @@ -780,7 +780,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest ) } - testWithEncodingTypes("Use statefulProcessor without transformWithState -" + + test("Use statefulProcessor without transformWithState -" + " handle should be absent") { val processor = new RunningCountStatefulProcessor() val ex = intercept[Exception] { @@ -793,7 +793,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest ) } - testWithEncodingTypes("transformWithState - batch should succeed") { + test("transformWithState - batch should succeed") { val inputData = Seq("a", "b") val result = inputData.toDS() .groupByKey(x => x) @@ -805,7 +805,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest checkAnswer(df, Seq(("a", "1"), ("b", "1")).toDF()) } - testWithEncodingTypes("transformWithState - test deleteIfExists operator") { + test("transformWithState - test deleteIfExists operator") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -846,7 +846,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - two input streams") { + test("transformWithState - two input streams") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -876,7 +876,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - three input streams") { + test("transformWithState - three input streams") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -911,7 +911,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - two input streams, different key type") { + test("transformWithState - two input streams, different key type") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -958,7 +958,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest OutputMode.Update()) } - testWithEncodingTypes("transformWithState - availableNow trigger mode, rate limit is respected") { + test("transformWithState - availableNow trigger mode, rate limit is respected") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { withTempDir { srcDir => @@ -999,7 +999,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - availableNow trigger mode, multiple restarts") { + test("transformWithState - availableNow trigger mode, multiple restarts") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { withTempDir { srcDir => @@ -1037,7 +1037,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - verify StateSchemaV3 writes " + + test("transformWithState - verify StateSchemaV3 writes " + "correct SQL schema of key/value") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, @@ -1120,7 +1120,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("transformWithState - verify that OperatorStateMetadataV2" + + test("transformWithState - verify that OperatorStateMetadataV2" + " file is being written correctly") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, @@ -1164,7 +1164,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test that invalid schema evolution fails query for column family") { + test("test that invalid schema evolution fails query for column family") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1201,7 +1201,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test that different outputMode after query restart fails") { + test("test that different outputMode after query restart fails") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1244,7 +1244,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test that changing between different state variable types fails") { + test("test that changing between different state variable types fails") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1286,7 +1286,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test that different timeMode after query restart fails") { + test("test that different timeMode after query restart fails") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1333,7 +1333,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test that introducing TTL after restart fails query") { + test("test that introducing TTL after restart fails query") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1387,7 +1387,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test query restart with new state variable succeeds") { + test("test query restart with new state variable succeeds") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1433,7 +1433,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("test query restart succeeds") { + test("test query restart succeeds") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> @@ -1468,7 +1468,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } - testWithEncodingTypes("SPARK-49070: transformWithState - valid initial state plan") { + test("SPARK-49070: transformWithState - valid initial state plan") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { withTempDir { srcDir => @@ -1518,7 +1518,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest new Path(stateCheckpointPath, "_stateSchema/default/") } - testWithEncodingTypes("transformWithState - verify that metadata and schema logs are purged") { + test("transformWithState - verify that metadata and schema logs are purged") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, SQLConf.SHUFFLE_PARTITIONS.key -> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala index 92a06f1183935..75fda9630779e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala @@ -21,7 +21,7 @@ import java.sql.Timestamp import java.time.Duration import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, AlsoTestWithEncodingTypes, RocksDBStateStoreProvider} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock @@ -41,7 +41,8 @@ case class OutputEvent( * Test suite base for TransformWithState with TTL support. */ abstract class TransformWithStateTTLTest - extends StreamTest with AlsoTestWithChangelogCheckpointingEnabled { + extends StreamTest with AlsoTestWithChangelogCheckpointingEnabled + with AlsoTestWithEncodingTypes { import testImplicits._ def getProcessor(ttlConfig: TTLConfig): StatefulProcessor[String, InputEvent, OutputEvent] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 9727c2dc8c113..b19c126c7386b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -195,7 +195,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { override def getStateTTLMetricName: String = "numValueStateWithTTLVars" - testWithEncodingTypes("validate multiple value states") { + test("validate multiple value states") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { val ttlKey = "k1" @@ -262,7 +262,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { } } - testWithEncodingTypes("verify StateSchemaV3 writes correct SQL " + + test("verify StateSchemaV3 writes correct SQL " + "schema of key/value and with TTL") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,