diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala index 1f0d76bb7..771a505a3 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala @@ -12,10 +12,13 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kinesis import cats.Parallel import cats.effect._ +import org.apache.hadoop.conf.Configuration +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import com.snowplowanalytics.snowplow.rdbloader.aws.KinesisProducer.{BackoffPolicy, RequestLimits} import com.snowplowanalytics.snowplow.rdbloader.aws._ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Config, Run} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet.ParquetOps import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kinesis.generated.BuildInfo import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking @@ -38,7 +41,8 @@ object Main extends IOApp { c => mkSink(c), c => mkBadQueue(c), mkQueue, - KinesisCheckpointer.checkpointer + KinesisCheckpointer.checkpointer, + parquetOps ) private def mkSource[F[_]: Async]( @@ -106,4 +110,17 @@ object Main extends IOApp { case _ => Resource.eval(Async[F].raiseError(new IllegalArgumentException(s"Message queue is not SQS or SNS"))) } + + private def parquetOps: ParquetOps = new ParquetOps { + + override def transformPath(p: String): String = + ParquetOps.noop.transformPath(p) + + override def hadoopConf: Configuration = { + val s3Conf = new Configuration() + s3Conf.set("fs.s3a.endpoint.region", (new DefaultAwsRegionProviderChain).getRegion.id) + s3Conf + } + + } }