Skip to content

Commit

Permalink
Read Projections (#6)
Browse files Browse the repository at this point in the history
* Add SchemaEncoder.contramap

* Add vscode related file into gitignore

* Add .sbtopts with increased stack size to avoid compilation error during tests

* Support reading projected schema from file

* scalafix

* Update README
  • Loading branch information
grouzen authored Dec 19, 2023
1 parent 0dfbed1 commit 2b5f12e
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
target/
*/target/
private/

.bloop/
.metals/
.vscode/
metals.sbt
1 change: 1 addition & 0 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-J-Xss4M
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To be able to write/read data to/from parquet files you need to define the follo
You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver:

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -57,7 +57,7 @@ Alternatively, you can override the schemas of some fields in your record by def
and using `SchemaEncoderDeriver.summoned` deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
Expand Down Expand Up @@ -96,7 +96,7 @@ For converting Scala values into `Value` and back we need to define instances of
type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -129,7 +129,7 @@ Same as for `SchemaEncoder`, you can override the schemas of some fields in your
`ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
Expand Down Expand Up @@ -177,10 +177,10 @@ println(record)
## Reading/Writing files

Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader` and use either
`writeChunk`/`readChunk` or `writeStream`/`readStream` methods
`writeChunk`/`readChunk` or `writeStream`/`readStream` methods.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -227,4 +227,8 @@ Unsafe.unsafe { implicit unsafe =>
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
```
```

In the previous code snippet we used `ParquetReader.configured[A]()` to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file.

In case you need to read only part of the columns, use `ParquetReader.projected[A]()` that always will use the schema of the provided type.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ object Schemas {
def repetition(optional: Boolean): Repetition =
if (optional) Repetition.OPTIONAL else Repetition.REQUIRED

def asMessageType(schema: Type): MessageType = {
val groupSchema = schema.asGroupType()
val name = groupSchema.getName
val fields = groupSchema.getFields

new MessageType(name, fields)
}

import PrimitiveTypeName._
import LogicalTypeAnnotation._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import org.apache.parquet.schema.Type
import zio._
import zio.schema._

trait SchemaEncoder[A] {
trait SchemaEncoder[A] { self =>

def encode(schema: Schema[A], name: String, optional: Boolean): Type

def encodeZIO(schema: Schema[A], name: String, optional: Boolean): Task[Type] =
ZIO.attempt(encode(schema, name, optional))

def contramap[B](f: Schema[B] => Schema[A]): SchemaEncoder[B] =
new SchemaEncoder[B] {
override def encode(schema: Schema[B], name: String, optional: Boolean): Type =
self.encode(f(schema), name, optional)
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.ValueDecoder
import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueDecoder }
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport }
import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader }
import org.apache.parquet.io.InputFile
import zio._
import zio.schema.Schema
import zio.stream._

import java.io.IOException
import scala.annotation.nowarn

trait ParquetReader[+A <: Product] {

Expand All @@ -20,7 +20,11 @@ trait ParquetReader[+A <: Product] {

}

final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit decoder: ValueDecoder[A])
final class ParquetReaderLive[A <: Product: Tag](
hadoopConf: Configuration,
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
)(implicit decoder: ValueDecoder[A])
extends ParquetReader[A] {

override def readStream(path: Path): ZStream[Scope, Throwable, A] =
Expand Down Expand Up @@ -64,7 +68,7 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit
inputFile <- path.toInputFileZIO(hadoopConf)
reader <- ZIO.fromAutoCloseable(
ZIO.attemptBlockingIO(
new ParquetReader.Builder(inputFile).withConf(hadoopConf).build()
new ParquetReader.Builder(inputFile, schema, schemaEncoder).withConf(hadoopConf).build()
)
)
} yield reader
Expand All @@ -73,16 +77,25 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit

object ParquetReader {

final class Builder(file: InputFile) extends HadoopParquetReader.Builder[RecordValue](file) {
final class Builder[A: Tag](
file: InputFile,
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
) extends HadoopParquetReader.Builder[RecordValue](file) {

override protected def getReadSupport: HadoopReadSupport[RecordValue] =
new ReadSupport
new ReadSupport(schema, schemaEncoder)

}

def configured[A <: Product: ValueDecoder](
hadoopConf: Configuration = new Configuration()
)(implicit @nowarn tag: Tag[A]): ULayer[ParquetReader[A]] =
)(implicit tag: Tag[A]): ULayer[ParquetReader[A]] =
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf))

def projected[A <: Product: ValueDecoder](
hadoopConf: Configuration = new Configuration()
)(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], tag: Tag[A]): ULayer[ParquetReader[A]] =
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf, Some(schema), Some(schemaEncoder)))

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Schemas
import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueEncoder }
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ WriteSupport => HadoopWriteSupport }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter => HadoopParquetWriter }
import org.apache.parquet.io.OutputFile
import org.apache.parquet.schema.{ MessageType, Type }
import org.apache.parquet.schema.MessageType
import zio._
import zio.schema.Schema
import zio.stream._
Expand Down Expand Up @@ -55,20 +56,10 @@ final class ParquetWriterLive[A <: Product](
_ <- ZIO.attemptBlockingIO(writer.write(record.asInstanceOf[RecordValue]))
} yield ()

private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] = {

def castToMessageSchema(schema: Type) =
ZIO.attempt {
val groupSchema = schema.asGroupType()
val name = groupSchema.getName
val fields = groupSchema.getFields

new MessageType(name, fields)
}

private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] =
for {
schema <- schemaEncoder.encodeZIO(schema, tag.tag.shortName, optional = false)
messageSchema <- castToMessageSchema(schema)
messageSchema <- ZIO.attempt(Schemas.asMessageType(schema))
outputFile <- path.toOutputFileZIO(hadoopConf)
builder = new ParquetWriter.Builder(outputFile, messageSchema)
.withWriteMode(writeMode)
Expand All @@ -82,7 +73,6 @@ final class ParquetWriterLive[A <: Product](
.withConf(hadoopConf)
writer <- ZIO.fromAutoCloseable(ZIO.attemptBlockingIO(builder.build()))
} yield writer
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Schemas
import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.SchemaEncoder
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ InitContext, ReadSupport => HadoopReadSupport }
import org.apache.parquet.io.api.{ GroupConverter, RecordMaterializer }
import org.apache.parquet.schema.MessageType
import zio.Tag
import zio.prelude._
import zio.schema.Schema

import java.util

class ReadSupport extends HadoopReadSupport[RecordValue] {
class ReadSupport[A](
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
)(implicit tag: Tag[A])
extends HadoopReadSupport[RecordValue] {

override def prepareForRead(
configuration: Configuration,
keyValueMetaData: util.Map[String, String],
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType,
readContext: HadoopReadSupport.ReadContext
): RecordMaterializer[RecordValue] = new RecordMaterializer[RecordValue] {

private val converter =
GroupValueConverter.root(fileSchema)
GroupValueConverter.root(resolveSchema(fileSchema))

override def getCurrentRecord: RecordValue =
converter.get
Expand All @@ -29,6 +36,11 @@ class ReadSupport extends HadoopReadSupport[RecordValue] {
}

override def init(context: InitContext): HadoopReadSupport.ReadContext =
new HadoopReadSupport.ReadContext(context.getFileSchema)
new HadoopReadSupport.ReadContext(resolveSchema(context.getFileSchema))

private def resolveSchema(contextSchema: MessageType): MessageType =
(schema <*> schemaEncoder).fold(contextSchema) { case (schema0, schemaEncoder0) =>
Schemas.asMessageType(schemaEncoder0.encode(schema0, tag.tag.shortName, optional = false))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ object ParquetIOSpec extends ZIOSpecDefault {
Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned)
}

case class ProjectedRecord(a: Int, c: Option[Long], d: List[Int], e: Map[String, Int])
object ProjectedRecord {
implicit val schema: Schema[ProjectedRecord] =
DeriveSchema.gen[ProjectedRecord]
implicit val schemaEncoder: SchemaEncoder[ProjectedRecord] =
Derive.derive[SchemaEncoder, ProjectedRecord](SchemaEncoderDeriver.summoned)
implicit val valueEncoder: ValueEncoder[ProjectedRecord] =
Derive.derive[ValueEncoder, ProjectedRecord](ValueEncoderDeriver.summoned)
implicit val valueDecoder: ValueDecoder[ProjectedRecord] =
Derive.derive[ValueDecoder, ProjectedRecord](ValueDecoderDeriver.summoned)
}

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ParquetIOSpec")(
test("write and read - chunk") {
Expand Down Expand Up @@ -55,10 +67,27 @@ object ParquetIOSpec extends ZIOSpecDefault {
_ <- writer.writeStream(tmpPath, ZStream.fromChunk(payload))
resultStream <- ZIO.scoped[Any](reader.readStream(tmpPath).runCollect)
} yield assertTrue(resultStream == payload)
} @@ after(cleanTmpFile(tmpDir)),
test("write full and read projected") {
val payload = Chunk(
Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)),
Record(2, "bar", Some(3L), List.empty, Map("third" -> 3))
)
val projectedPayload = payload.map { r =>
ProjectedRecord(r.a, r.c, r.d, r.e)
}

for {
writer <- ZIO.service[ParquetWriter[Record]]
reader <- ZIO.service[ParquetReader[ProjectedRecord]]
_ <- writer.writeChunk(tmpPath, payload)
result <- reader.readChunk(tmpPath)
} yield assertTrue(result == projectedPayload)
} @@ after(cleanTmpFile(tmpDir))
).provide(
ParquetWriter.configured[Record](),
ParquetReader.configured[Record]()
ParquetReader.configured[Record](),
ParquetReader.projected[ProjectedRecord]()
) @@ sequential

private def cleanTmpFile(path: Path) =
Expand Down

0 comments on commit 2b5f12e

Please sign in to comment.