From 79e027b726dbbf77c0d443d4711e0e7feb86683a Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 8 Apr 2024 08:36:55 +0100 Subject: [PATCH] AWS streaming transformer should use default chain to set S3 region In most places snowplow apps let the aws sdk figure out the region using the default chain. But AWS streaming transformer uses hadoop for writing to S3 (only for parquet output format) and infers the region slightly differently. With this change, we use the AWS SDK to infer the region and then explicitly pass it to hadoop. This means the transformer should work in other AWS partitions, e.g. aws-china. --- .../transformer/stream/kinesis/Main.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala index 1f0d76bb7..771a505a3 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/Main.scala @@ -12,10 +12,13 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kinesis import cats.Parallel import cats.effect._ +import org.apache.hadoop.conf.Configuration +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import com.snowplowanalytics.snowplow.rdbloader.aws.KinesisProducer.{BackoffPolicy, RequestLimits} import com.snowplowanalytics.snowplow.rdbloader.aws._ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Config, Run} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet.ParquetOps import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kinesis.generated.BuildInfo import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking @@ -38,7 +41,8 @@ object Main extends IOApp { c => mkSink(c), c => mkBadQueue(c), mkQueue, - KinesisCheckpointer.checkpointer + KinesisCheckpointer.checkpointer, + parquetOps ) private def mkSource[F[_]: Async]( @@ -106,4 +110,17 @@ object Main extends IOApp { case _ => Resource.eval(Async[F].raiseError(new IllegalArgumentException(s"Message queue is not SQS or SNS"))) } + + private def parquetOps: ParquetOps = new ParquetOps { + + override def transformPath(p: String): String = + ParquetOps.noop.transformPath(p) + + override def hadoopConf: Configuration = { + val s3Conf = new Configuration() + s3Conf.set("fs.s3a.endpoint.region", (new DefaultAwsRegionProviderChain).getRegion.id) + s3Conf + } + + } }