Skip to content

Commit

Permalink
[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates…
Browse files Browse the repository at this point in the history
… from JSON

## What changes were proposed in this pull request?

In the PR, I propose to switch on **java.time API** for parsing timestamps and dates from JSON inputs with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behavior with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates.

## How was this patch tested?

It was tested by `JsonExpressionsSuite`, `JsonFunctionsSuite` and `JsonSuite`.

Closes apache#23196 from MaxGekk/json-time-parser.

Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
2 people authored and cloud-fan committed Dec 16, 2018
1 parent 860f449 commit 8a27952
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 335 deletions.
2 changes: 1 addition & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide

- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
- Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.

- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.DateTimeFormatter
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {

@transient
private lazy val timeParser = DateTimeFormatter(
private lazy val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
Expand Down Expand Up @@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {

private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt timeParser.parse(field)).isDefined) {
if ((allCatch opt timestampParser.parse(field)).isDefined) {
TimestampType
} else {
tryParseBoolean(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.types._

class UnivocityGenerator(
Expand All @@ -41,18 +41,18 @@ class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray

private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))

case TimestampType =>
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
(row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))

case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class UnivocityParser(

private val row = new GenericInternalRow(requiredSchema.length)

private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand Down Expand Up @@ -158,7 +158,7 @@ class UnivocityParser(
}

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)

case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -82,13 +81,10 @@ private[sql] class JSONOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
val timestampFormat: String =
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(

private val lineSeparator: String = options.lineSeparatorInWrite

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down Expand Up @@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
val timestampString = timestampFormatter.format(row.getLong(ordinal))
gen.writeString(timestampString)

case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
val dateString =
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
val dateString = dateFormatter.format(row.getInt(ordinal))
gen.writeString(dateString)

case BinaryType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -218,17 +224,7 @@ class JacksonParser(
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}
timestampFormatter.parse(parser.getText)

case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
Expand All @@ -237,22 +233,7 @@ class JacksonParser(
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Int.box {
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
.orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
}
.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
stringValue.toInt
}
}
dateFormatter.parse(parser.getText)
}

case BinaryType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util

import java.time._
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.{ChronoField, TemporalQueries}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.{Locale, TimeZone}

import scala.util.Try
Expand All @@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.internal.SQLConf

sealed trait DateTimeFormatter {
sealed trait TimestampFormatter {
def parse(s: String): Long // returns microseconds since epoch
def format(us: Long): String
}

class Iso8601DateTimeFormatter(
trait FormatterUtils {
protected def zoneId: ZoneId
protected def buildFormatter(
pattern: String,
locale: Locale): java.time.format.DateTimeFormatter = {
new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
}
protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
}

class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
val formatter = new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
locale: Locale) extends TimestampFormatter with FormatterUtils {
val zoneId = timeZone.toZoneId
val formatter = buildFormatter(pattern, locale)

def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
Instant.from(zonedDateTime)
toInstantWithZoneId(temporalAccessor)
} else {
Instant.from(temporalAccessor)
}
Expand All @@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
}
}

class LegacyDateTimeFormatter(
class LegacyTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
locale: Locale) extends TimestampFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)

protected def toMillis(s: String): Long = format.parse(s).getTime
Expand All @@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
}
}

class LegacyFallbackDateTimeFormatter(
class LegacyFallbackTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
override def toMillis(s: String): Long = {
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
}
}

object DateTimeFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
object TimestampFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
new Iso8601DateTimeFormatter(format, timeZone, locale)
new Iso8601TimestampFormatter(format, timeZone, locale)
}
}
}
Expand All @@ -116,29 +129,32 @@ sealed trait DateFormatter {

class Iso8601DateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
locale: Locale) extends DateFormatter with FormatterUtils {

val zoneId = ZoneId.of("UTC")

val formatter = buildFormatter(pattern, locale)

val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
toInstantWithZoneId(temporalAccessor)
}

override def parse(s: String): Int = {
val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)

days.toInt
}

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
formatter.withZone(zoneId).format(instant)
}
}

class LegacyDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, locale)

def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
Expand All @@ -153,8 +169,7 @@ class LegacyDateFormatter(

class LegacyFallbackDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
Expand All @@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
}

object DateFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, timeZone, locale)
new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, timeZone, locale)
new Iso8601DateFormatter(format, locale)
}
}
}
Loading

0 comments on commit 8a27952

Please sign in to comment.