diff --git a/.gitignore b/.gitignore index 902e3d7..29a3f7d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,8 @@ target/ */target/ private/ + +.bloop/ +.metals/ +.vscode/ +metals.sbt diff --git a/.sbtopts b/.sbtopts new file mode 100644 index 0000000..5461754 --- /dev/null +++ b/.sbtopts @@ -0,0 +1 @@ +-J-Xss4M \ No newline at end of file diff --git a/README.md b/README.md index 8334b38..b60842e 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ To be able to write/read data to/from parquet files you need to define the follo You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver: ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2 +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 import zio.schema._ import me.mnedokushev.zio.apache.parquet.core.codec._ @@ -57,7 +57,7 @@ Alternatively, you can override the schemas of some fields in your record by def and using `SchemaEncoderDeriver.summoned` deriver. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2 +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 import me.mnedokushev.zio.apache.parquet.core.Schemas import zio.schema._ @@ -96,7 +96,7 @@ For converting Scala values into `Value` and back we need to define instances of type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2 +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 import zio.schema._ import me.mnedokushev.zio.apache.parquet.core.codec._ @@ -129,7 +129,7 @@ Same as for `SchemaEncoder`, you can override the schemas of some fields in your `ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2 +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 import me.mnedokushev.zio.apache.parquet.core.Value import zio.schema._ @@ -177,10 +177,10 @@ println(record) ## Reading/Writing files Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader` and use either -`writeChunk`/`readChunk` or `writeStream`/`readStream` methods +`writeChunk`/`readChunk` or `writeStream`/`readStream` methods. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2 +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 import zio.schema._ import me.mnedokushev.zio.apache.parquet.core.codec._ @@ -227,4 +227,8 @@ Unsafe.unsafe { implicit unsafe => } // Outputs: // Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None)) -``` \ No newline at end of file +``` + +In the previous code snippet we used `ParquetReader.configured[A]()` to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file. + +In case you need to read only part of the columns, use `ParquetReader.projected[A]()` that always will use the schema of the provided type. diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala index 632dbfa..2a91fa4 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala @@ -103,6 +103,14 @@ object Schemas { def repetition(optional: Boolean): Repetition = if (optional) Repetition.OPTIONAL else Repetition.REQUIRED + def asMessageType(schema: Type): MessageType = { + val groupSchema = schema.asGroupType() + val name = groupSchema.getName + val fields = groupSchema.getFields + + new MessageType(name, fields) + } + import PrimitiveTypeName._ import LogicalTypeAnnotation._ diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala index 84d3c8f..2bdc1da 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala @@ -4,11 +4,17 @@ import org.apache.parquet.schema.Type import zio._ import zio.schema._ -trait SchemaEncoder[A] { +trait SchemaEncoder[A] { self => def encode(schema: Schema[A], name: String, optional: Boolean): Type def encodeZIO(schema: Schema[A], name: String, optional: Boolean): Task[Type] = ZIO.attempt(encode(schema, name, optional)) + def contramap[B](f: Schema[B] => Schema[A]): SchemaEncoder[B] = + new SchemaEncoder[B] { + override def encode(schema: Schema[B], name: String, optional: Boolean): Type = + self.encode(f(schema), name, optional) + } + } diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala index fcabf1b..1ff6c0b 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala @@ -1,16 +1,16 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue -import me.mnedokushev.zio.apache.parquet.core.codec.ValueDecoder +import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueDecoder } import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport } import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader } import org.apache.parquet.io.InputFile import zio._ +import zio.schema.Schema import zio.stream._ import java.io.IOException -import scala.annotation.nowarn trait ParquetReader[+A <: Product] { @@ -20,7 +20,11 @@ trait ParquetReader[+A <: Product] { } -final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit decoder: ValueDecoder[A]) +final class ParquetReaderLive[A <: Product: Tag]( + hadoopConf: Configuration, + schema: Option[Schema[A]] = None, + schemaEncoder: Option[SchemaEncoder[A]] = None +)(implicit decoder: ValueDecoder[A]) extends ParquetReader[A] { override def readStream(path: Path): ZStream[Scope, Throwable, A] = @@ -64,7 +68,7 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit inputFile <- path.toInputFileZIO(hadoopConf) reader <- ZIO.fromAutoCloseable( ZIO.attemptBlockingIO( - new ParquetReader.Builder(inputFile).withConf(hadoopConf).build() + new ParquetReader.Builder(inputFile, schema, schemaEncoder).withConf(hadoopConf).build() ) ) } yield reader @@ -73,16 +77,25 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit object ParquetReader { - final class Builder(file: InputFile) extends HadoopParquetReader.Builder[RecordValue](file) { + final class Builder[A: Tag]( + file: InputFile, + schema: Option[Schema[A]] = None, + schemaEncoder: Option[SchemaEncoder[A]] = None + ) extends HadoopParquetReader.Builder[RecordValue](file) { override protected def getReadSupport: HadoopReadSupport[RecordValue] = - new ReadSupport + new ReadSupport(schema, schemaEncoder) } def configured[A <: Product: ValueDecoder]( hadoopConf: Configuration = new Configuration() - )(implicit @nowarn tag: Tag[A]): ULayer[ParquetReader[A]] = + )(implicit tag: Tag[A]): ULayer[ParquetReader[A]] = ZLayer.succeed(new ParquetReaderLive[A](hadoopConf)) + def projected[A <: Product: ValueDecoder]( + hadoopConf: Configuration = new Configuration() + )(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], tag: Tag[A]): ULayer[ParquetReader[A]] = + ZLayer.succeed(new ParquetReaderLive[A](hadoopConf, Some(schema), Some(schemaEncoder))) + } diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala index 4a6f991..91712ed 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala @@ -1,5 +1,6 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop +import me.mnedokushev.zio.apache.parquet.core.Schemas import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueEncoder } import org.apache.hadoop.conf.Configuration @@ -7,7 +8,7 @@ import org.apache.parquet.hadoop.api.{ WriteSupport => HadoopWriteSupport } import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter => HadoopParquetWriter } import org.apache.parquet.io.OutputFile -import org.apache.parquet.schema.{ MessageType, Type } +import org.apache.parquet.schema.MessageType import zio._ import zio.schema.Schema import zio.stream._ @@ -55,20 +56,10 @@ final class ParquetWriterLive[A <: Product]( _ <- ZIO.attemptBlockingIO(writer.write(record.asInstanceOf[RecordValue])) } yield () - private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] = { - - def castToMessageSchema(schema: Type) = - ZIO.attempt { - val groupSchema = schema.asGroupType() - val name = groupSchema.getName - val fields = groupSchema.getFields - - new MessageType(name, fields) - } - + private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] = for { schema <- schemaEncoder.encodeZIO(schema, tag.tag.shortName, optional = false) - messageSchema <- castToMessageSchema(schema) + messageSchema <- ZIO.attempt(Schemas.asMessageType(schema)) outputFile <- path.toOutputFileZIO(hadoopConf) builder = new ParquetWriter.Builder(outputFile, messageSchema) .withWriteMode(writeMode) @@ -82,7 +73,6 @@ final class ParquetWriterLive[A <: Product]( .withConf(hadoopConf) writer <- ZIO.fromAutoCloseable(ZIO.attemptBlockingIO(builder.build())) } yield writer - } } diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala index b51a499..a4cc246 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala @@ -1,24 +1,31 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop +import me.mnedokushev.zio.apache.parquet.core.Schemas import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import me.mnedokushev.zio.apache.parquet.core.codec.SchemaEncoder import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.{ InitContext, ReadSupport => HadoopReadSupport } import org.apache.parquet.io.api.{ GroupConverter, RecordMaterializer } import org.apache.parquet.schema.MessageType +import zio.Tag +import zio.prelude._ +import zio.schema.Schema -import java.util - -class ReadSupport extends HadoopReadSupport[RecordValue] { +class ReadSupport[A]( + schema: Option[Schema[A]] = None, + schemaEncoder: Option[SchemaEncoder[A]] = None +)(implicit tag: Tag[A]) + extends HadoopReadSupport[RecordValue] { override def prepareForRead( configuration: Configuration, - keyValueMetaData: util.Map[String, String], + keyValueMetaData: java.util.Map[String, String], fileSchema: MessageType, readContext: HadoopReadSupport.ReadContext ): RecordMaterializer[RecordValue] = new RecordMaterializer[RecordValue] { private val converter = - GroupValueConverter.root(fileSchema) + GroupValueConverter.root(resolveSchema(fileSchema)) override def getCurrentRecord: RecordValue = converter.get @@ -29,6 +36,11 @@ class ReadSupport extends HadoopReadSupport[RecordValue] { } override def init(context: InitContext): HadoopReadSupport.ReadContext = - new HadoopReadSupport.ReadContext(context.getFileSchema) + new HadoopReadSupport.ReadContext(resolveSchema(context.getFileSchema)) + + private def resolveSchema(contextSchema: MessageType): MessageType = + (schema <*> schemaEncoder).fold(contextSchema) { case (schema0, schemaEncoder0) => + Schemas.asMessageType(schemaEncoder0.encode(schema0, tag.tag.shortName, optional = false)) + } } diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala index b7c3311..11f4e73 100644 --- a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala @@ -28,6 +28,18 @@ object ParquetIOSpec extends ZIOSpecDefault { Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned) } + case class ProjectedRecord(a: Int, c: Option[Long], d: List[Int], e: Map[String, Int]) + object ProjectedRecord { + implicit val schema: Schema[ProjectedRecord] = + DeriveSchema.gen[ProjectedRecord] + implicit val schemaEncoder: SchemaEncoder[ProjectedRecord] = + Derive.derive[SchemaEncoder, ProjectedRecord](SchemaEncoderDeriver.summoned) + implicit val valueEncoder: ValueEncoder[ProjectedRecord] = + Derive.derive[ValueEncoder, ProjectedRecord](ValueEncoderDeriver.summoned) + implicit val valueDecoder: ValueDecoder[ProjectedRecord] = + Derive.derive[ValueDecoder, ProjectedRecord](ValueDecoderDeriver.summoned) + } + override def spec: Spec[TestEnvironment with Scope, Any] = suite("ParquetIOSpec")( test("write and read - chunk") { @@ -55,10 +67,27 @@ object ParquetIOSpec extends ZIOSpecDefault { _ <- writer.writeStream(tmpPath, ZStream.fromChunk(payload)) resultStream <- ZIO.scoped[Any](reader.readStream(tmpPath).runCollect) } yield assertTrue(resultStream == payload) + } @@ after(cleanTmpFile(tmpDir)), + test("write full and read projected") { + val payload = Chunk( + Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + Record(2, "bar", Some(3L), List.empty, Map("third" -> 3)) + ) + val projectedPayload = payload.map { r => + ProjectedRecord(r.a, r.c, r.d, r.e) + } + + for { + writer <- ZIO.service[ParquetWriter[Record]] + reader <- ZIO.service[ParquetReader[ProjectedRecord]] + _ <- writer.writeChunk(tmpPath, payload) + result <- reader.readChunk(tmpPath) + } yield assertTrue(result == projectedPayload) } @@ after(cleanTmpFile(tmpDir)) ).provide( ParquetWriter.configured[Record](), - ParquetReader.configured[Record]() + ParquetReader.configured[Record](), + ParquetReader.projected[ProjectedRecord]() ) @@ sequential private def cleanTmpFile(path: Path) =