Skip to content

Commit

Permalink
Support for all types by derivers (#4)
Browse files Browse the repository at this point in the history
* Implement enum case

* Add new Path constructor

* Support exhaustive list of primitive types in SchemaEncoder

* Support exhaustive list of primitive types for ValueEncoder/ValueDecoder (WIP)

* scalafix
  • Loading branch information
grouzen authored Dec 17, 2023
1 parent 9d3e080 commit 0dfbed1
Show file tree
Hide file tree
Showing 11 changed files with 701 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ object Schemas {

case class PrimitiveDef(
typeName: PrimitiveTypeName,
annotation: LogicalTypeAnnotation,
annotation: Option[LogicalTypeAnnotation] = None,
isOptional: Boolean = false,
length: Int = 0
) extends Def[PrimitiveDef] {

def named(name: String): Type =
Types
.primitive(typeName, repetition(isOptional))
.as(annotation)
def named(name: String): Type = {
val builder = Types.primitive(typeName, repetition(isOptional))

annotation
.fold(builder)(builder.as)
.length(length)
.named(name)
}

def length(len: Int): PrimitiveDef =
this.copy(length = len)
Expand Down Expand Up @@ -104,13 +106,37 @@ object Schemas {
import PrimitiveTypeName._
import LogicalTypeAnnotation._

val string: PrimitiveDef = PrimitiveDef(BINARY, stringType())
val boolean: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val byte: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val short: PrimitiveDef = PrimitiveDef(INT32, intType(16, true))
val int: PrimitiveDef = PrimitiveDef(INT32, intType(32, true))
val long: PrimitiveDef = PrimitiveDef(INT64, intType(64, true))
val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, uuidType()).length(16)
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
def enum0: PrimitiveDef = PrimitiveDef(BINARY, Some(enumType()))
val string: PrimitiveDef = PrimitiveDef(BINARY, Some(stringType()))
val boolean: PrimitiveDef = PrimitiveDef(BOOLEAN)
val byte: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
val short: PrimitiveDef = PrimitiveDef(INT32, Some(intType(16, true)))
val int: PrimitiveDef = PrimitiveDef(INT32, Some(intType(32, true)))
val long: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, true)))
val float: PrimitiveDef = PrimitiveDef(FLOAT)
val double: PrimitiveDef = PrimitiveDef(DOUBLE)
val binary: PrimitiveDef = PrimitiveDef(BINARY)
val char: PrimitiveDef = byte
val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, Some(uuidType())).length(16)
val bigDecimal: PrimitiveDef = PrimitiveDef(INT64, Some(decimalType(DECIMAL_PRECISION, DECIMAL_SCALE)))
val bigInteger: PrimitiveDef = PrimitiveDef(BINARY)
val dayOfWeek: PrimitiveDef = byte
val monthType: PrimitiveDef = byte
val monthDay: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(2)
val period: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(12)
val year: PrimitiveDef = PrimitiveDef(INT32, Some(intType(16, false)))
val yearMonth: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(4)
val zoneId: PrimitiveDef = string
val zoneOffset: PrimitiveDef = string
val duration: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, false)))
val instant: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, false)))
val localDate: PrimitiveDef = PrimitiveDef(INT32, Some(dateType()))
val localTime: PrimitiveDef = PrimitiveDef(INT32, Some(timeType(true, TimeUnit.MILLIS)))
val localDateTime: PrimitiveDef = PrimitiveDef(INT64, Some(timestampType(true, TimeUnit.MILLIS)))
val offsetTime: PrimitiveDef = PrimitiveDef(INT32, Some(timeType(false, TimeUnit.MILLIS)))
val offsetDateTime: PrimitiveDef = PrimitiveDef(INT64, Some(timestampType(false, TimeUnit.MILLIS)))
val zonedDateTime: PrimitiveDef = offsetDateTime

def record(fields: Chunk[Type]): RecordDef = RecordDef(fields)
def list(element: Type): ListDef = ListDef(element)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,26 @@ import org.apache.parquet.io.api.{ Binary, RecordConsumer }
import org.apache.parquet.schema.Type
import zio.Chunk

import java.nio.ByteBuffer
import java.math.{ BigDecimal, BigInteger }
import java.nio.{ ByteBuffer, ByteOrder }
import java.time.{
DayOfWeek,
Duration,
Instant,
LocalDate,
LocalDateTime,
LocalTime,
Month,
MonthDay,
OffsetDateTime,
OffsetTime,
Period,
Year,
YearMonth,
ZoneId,
ZoneOffset,
ZonedDateTime
}
import java.util.UUID

sealed trait Value {
Expand Down Expand Up @@ -179,8 +198,11 @@ object Value {
def boolean(v: Boolean) =
PrimitiveValue.BooleanValue(v)

def byte(v: Byte) =
int(v.toInt)

def short(v: Short) =
PrimitiveValue.Int32Value(v.toInt)
int(v.toInt)

def int(v: Int) =
PrimitiveValue.Int32Value(v)
Expand All @@ -198,7 +220,7 @@ object Value {
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toArray))

def char(v: Char) =
PrimitiveValue.Int32Value(v.toInt)
int(v.toInt)

def uuid(v: UUID) = {
val bb = ByteBuffer.wrap(Array.ofDim(16))
Expand All @@ -209,6 +231,101 @@ object Value {
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(bb.array()))
}

def bigDecimal(v: BigDecimal) =
long(v.unscaledValue.longValue)

def bigInteger(v: BigInteger) =
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toByteArray))

def dayOfWeek(v: DayOfWeek) =
byte(v.getValue.toByte)

def month(v: Month) =
byte(v.getValue.toByte)

def monthDay(v: MonthDay) = {
val bb = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN)

bb.put(v.getMonthValue.toByte)
bb.put(v.getDayOfMonth.toByte)

PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
}

def period(v: Period) = {
val bb = ByteBuffer.allocate(12).order(ByteOrder.LITTLE_ENDIAN)

bb.putInt(v.getYears)
bb.putInt(v.getMonths)
bb.putInt(v.getDays)

PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
}

def year(v: Year) =
short(v.getValue.toShort)

def yearMonth(v: YearMonth) = {
val bb = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN)

bb.putShort(v.getYear.toShort)
bb.putShort(v.getMonthValue.toShort)

PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
}

def zoneId(v: ZoneId) =
string(v.getId)

def zoneOffset(v: ZoneOffset) =
string(v.getId)

def duration(v: Duration) =
long(v.toMillis)

def instant(v: Instant) =
long(v.toEpochMilli)

def localDate(v: LocalDate) =
int(v.toEpochDay.toInt)

def localTime(v: LocalTime) =
int((v.toNanoOfDay / MICROS_FACTOR).toInt)

def localDateTime(v: LocalDateTime) = {
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
val epochMillis = dateMillis + timeMillis

long(epochMillis)
}

def offsetTime(v: OffsetTime) = {
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
val dayMillis = timeMillis - offsetMillis

int(dayMillis.toInt)
}

def offsetDateTime(v: OffsetDateTime) = {
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
val epochMillis = dateMillis + timeMillis - offsetMillis

long(epochMillis)
}

def zonedDateTime(v: ZonedDateTime) = {
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
val epochMillis = dateMillis + timeMillis - offsetMillis

long(epochMillis)
}

def record(r: Map[String, Value]) =
GroupValue.RecordValue(r)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.mnedokushev.zio.apache.parquet.core.codec

import me.mnedokushev.zio.apache.parquet.core.Schemas
import me.mnedokushev.zio.apache.parquet.core.Schemas.PrimitiveDef
import org.apache.parquet.schema.Type
import zio.Chunk
import zio.schema.{ Deriver, Schema, StandardType }
Expand Down Expand Up @@ -31,32 +32,83 @@ object SchemaEncoderDeriver {
`enum`: Schema.Enum[A],
cases: => Chunk[Deriver.WrappedF[SchemaEncoder, _]],
summoned: => Option[SchemaEncoder[A]]
): SchemaEncoder[A] = ???
): SchemaEncoder[A] = new SchemaEncoder[A] {
override def encode(schema: Schema[A], name: String, optional: Boolean): Type =
Schemas.enum0.optionality(optional).named(name)
}

override def derivePrimitive[A](
st: StandardType[A],
summoned: => Option[SchemaEncoder[A]]
): SchemaEncoder[A] =
new SchemaEncoder[A] {
override def encode(schema: Schema[A], name: String, optional: Boolean): Type =
override def encode(schema: Schema[A], name: String, optional: Boolean): Type = {
def tpe(prim: PrimitiveDef) =
prim.optionality(optional).named(name)

st match {
case StandardType.StringType =>
Schemas.string.optionality(optional).named(name)
case StandardType.BoolType =>
Schemas.boolean.optionality(optional).named(name)
case StandardType.ByteType =>
Schemas.byte.optionality(optional).named(name)
case StandardType.ShortType =>
Schemas.short.optionality(optional).named(name)
case StandardType.IntType =>
Schemas.int.optionality(optional).named(name)
case StandardType.LongType =>
Schemas.long.optionality(optional).named(name)
// TODO: add the other types
case StandardType.UUIDType =>
Schemas.uuid.optionality(optional).named(name)
case _ => ???
case StandardType.StringType =>
tpe(Schemas.string)
case StandardType.BoolType =>
tpe(Schemas.boolean)
case StandardType.ByteType =>
tpe(Schemas.byte)
case StandardType.ShortType =>
tpe(Schemas.short)
case StandardType.IntType =>
tpe(Schemas.int)
case StandardType.LongType =>
tpe(Schemas.long)
case StandardType.FloatType =>
tpe(Schemas.float)
case StandardType.DoubleType =>
tpe(Schemas.double)
case StandardType.BinaryType =>
tpe(Schemas.binary)
case StandardType.CharType =>
tpe(Schemas.char)
case StandardType.UUIDType =>
tpe(Schemas.uuid)
case StandardType.BigDecimalType =>
tpe(Schemas.bigDecimal)
case StandardType.BigIntegerType =>
tpe(Schemas.bigInteger)
case StandardType.DayOfWeekType =>
tpe(Schemas.dayOfWeek)
case StandardType.MonthType =>
tpe(Schemas.monthType)
case StandardType.MonthDayType =>
tpe(Schemas.monthDay)
case StandardType.PeriodType =>
tpe(Schemas.period)
case StandardType.YearType =>
tpe(Schemas.year)
case StandardType.YearMonthType =>
tpe(Schemas.yearMonth)
case StandardType.ZoneIdType =>
tpe(Schemas.zoneId)
case StandardType.ZoneOffsetType =>
tpe(Schemas.zoneOffset)
case StandardType.DurationType =>
tpe(Schemas.duration)
case StandardType.InstantType =>
tpe(Schemas.instant)
case StandardType.LocalDateType =>
tpe(Schemas.localDate)
case StandardType.LocalTimeType =>
tpe(Schemas.localTime)
case StandardType.LocalDateTimeType =>
tpe(Schemas.localDateTime)
case StandardType.OffsetTimeType =>
tpe(Schemas.offsetTime)
case StandardType.OffsetDateTimeType =>
tpe(Schemas.offsetDateTime)
case StandardType.ZonedDateTimeType =>
tpe(Schemas.zonedDateTime)
case StandardType.UnitType =>
throw EncoderError("Unit type is unsupported")
}
}
}

override def deriveOption[A](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package me.mnedokushev.zio.apache.parquet.core.codec
import me.mnedokushev.zio.apache.parquet.core.Value
import zio._

trait ValueDecoder[+A] {
trait ValueDecoder[+A] { self =>

def decode(value: Value): A

def decodeZIO(value: Value): Task[A] =
ZIO.attempt(decode(value))

def map[B](f: A => B): ValueDecoder[B] =
new ValueDecoder[B] {
override def decode(value: Value): B =
f(self.decode(value))
}

}
Loading

0 comments on commit 0dfbed1

Please sign in to comment.