From e3e33d8794da5f3597b8d706b734af5025360939 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 16 Dec 2018 11:02:00 +0800 Subject: [PATCH] [SPARK-26372][SQL] Don't reuse value from previous row when parsing bad CSV input field ## What changes were proposed in this pull request? CSV parsing accidentally uses the previous good value for a bad input field. See example in Jira. This PR ensures that the associated column is set to null when an input field cannot be converted. ## How was this patch tested? Added new test. Ran all SQL unit tests (testOnly org.apache.spark.sql.*). Ran pyspark tests for pyspark-sql Closes #23323 from bersprockets/csv-bad-field. Authored-by: Bruce Robbins Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/csv/UnivocityParser.scala | 1 + .../resources/test-data/bad_after_good.csv | 2 ++ .../execution/datasources/csv/CSVSuite.scala | 19 +++++++++++++++++++ 3 files changed, 22 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/bad_after_good.csv diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index ed089120055e2..82a5b3c302b18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -239,6 +239,7 @@ class UnivocityParser( } catch { case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) + row.setNullAt(i) } i += 1 } diff --git a/sql/core/src/test/resources/test-data/bad_after_good.csv b/sql/core/src/test/resources/test-data/bad_after_good.csv new file mode 100644 index 0000000000000..4621a7d23714d --- /dev/null +++ b/sql/core/src/test/resources/test-data/bad_after_good.csv @@ -0,0 +1,2 @@ +"good record",1999-08-01 +"bad record",1999-088-01 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3b977d74053e6..d9e5d7af19671 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -63,6 +63,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" + private val badAfterGoodFile = "test-data/bad_after_good.csv" /** Verifies data and schema. */ private def verifyCars( @@ -2012,4 +2013,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(!files.exists(_.getName.endsWith("csv"))) } } + + test("Do not reuse last good value for bad input field") { + val schema = StructType( + StructField("col1", StringType) :: + StructField("col2", DateType) :: + Nil + ) + val rows = spark.read + .schema(schema) + .format("csv") + .load(testFile(badAfterGoodFile)) + + val expectedRows = Seq( + Row("good record", java.sql.Date.valueOf("1999-08-01")), + Row("bad record", null)) + + checkAnswer(rows, expectedRows) + } }