Skip to content

Commit

Permalink
Bulk Load CDK: S3V2: Spec Aligns Perfectly with V1 (#48584)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 21, 2024
1 parent e5e9b60 commit f4ad588
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import java.nio.file.Path
*
* - Add any required custom fields to the spec w/ jackson annotations
*
* - Add annotation overrides (note that this will replace the original annotation, so to extend an
* existing annotation, you must copy the original annotation and add the new fields).
*
* - Create a class `{MyDestination}Configuration` extending [DestinationConfiguration]
*
* - Add the corresponding mixin `...ConfigurationProvider`s for any added spec mixins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ interface AWSAccessKeySpecification {
"The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>."
)
@get:JsonProperty("access_key_id")
@get:JsonSchemaInject(json = """{"examples":["A012345678910EXAMPLE"]}""")
@get:JsonSchemaInject(
json =
"""{"examples":["A012345678910EXAMPLE"],"airbyte_secret": true,"always_show": true}"""
)
val accessKeyId: String?

@get:JsonSchemaTitle("S3 Access Key")
@get:JsonPropertyDescription(
"The corresponding secret to the access key ID. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>"
)
@get:JsonProperty("secret_access_key")
@get:JsonSchemaInject(json = """{"examples":["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"]}""")
@get:JsonSchemaInject(
json =
"""{"examples":["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"],"airbyte_secret": true,"always_show": true}"""
)
val secretAccessKey: String?

fun toAWSAccessKeyConfiguration(): AWSAccessKeyConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ package io.airbyte.cdk.load.command.aws

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

interface AWSArnRoleSpecification {
@get:JsonSchemaTitle("Role ARN")
@get:JsonPropertyDescription("The Role ARN.")
@get:JsonProperty("role_arn")
@get:JsonSchemaInject(
json = """{"examples":["arn:aws:iam::123456789:role/ExternalIdIsYourWorkspaceId"]}"""
)
val roleArn: String?

fun toAWSArnRoleConfiguration(): AWSArnRoleConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ interface ObjectStorageCompressionSpecificationProvider {
"Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").",
)
@get:JsonProperty("compression")
val compression: ObjectStorageCompressionSpecification
val compression: ObjectStorageCompressionSpecification?

fun toCompressionConfiguration(): ObjectStorageCompressionConfiguration<*> {
return when (compression) {
return when (compression ?: NoCompressionSpecification()) {
is NoCompressionSpecification -> ObjectStorageCompressionConfiguration(NoopProcessor)
is GZIPCompressionSpecification -> ObjectStorageCompressionConfiguration(GZIPProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.avro.AvroCompressionConfiguration
import io.airbyte.cdk.load.command.avro.AvroCompressionConfigurationProvider
Expand Down Expand Up @@ -89,17 +90,17 @@ interface ObjectStorageFormatSpecificationProvider {
property = "format_type"
)
@JsonSubTypes(
JsonSubTypes.Type(value = JsonFormatSpecification::class, name = "JSONL"),
JsonSubTypes.Type(value = CSVFormatSpecification::class, name = "CSV"),
JsonSubTypes.Type(value = JsonFormatSpecification::class, name = "JSONL"),
JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "Avro"),
JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "Parquet")
)
sealed class ObjectStorageFormatSpecification(
@JsonSchemaTitle("Format Type") open val formatType: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
JSONL("JSONL"),
CSV("CSV"),
JSONL("JSONL"),
AVRO("Avro"),
PARQUET("Parquet")
}
Expand All @@ -109,46 +110,47 @@ interface FlatteningSpecificationProvider {
@get:JsonSchemaTitle("Flattening") @get:JsonProperty("flattening") val flattening: Flattening?

enum class Flattening(@get:JsonValue val flatteningName: String) {
NO_FLATTENING("No Flattening"),
NO_FLATTENING("No flattening"),
ROOT_LEVEL_FLATTENING("Root level flattening")
}
}

/** JSONL */
@JsonSchemaTitle("JSON Lines: Newline-delimited JSON")
class JsonFormatSpecification(
/** CSV */
@JsonSchemaTitle("CSV: Comma-Separated Values")
class CSVFormatSpecification(
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.JSONL
override val formatType: Type = Type.CSV
) :
ObjectStorageFormatSpecification(formatType),
ObjectStorageCompressionSpecificationProvider,
FlatteningSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
override val flattening: FlatteningSpecificationProvider.Flattening? =
FlatteningSpecificationProvider,
ObjectStorageCompressionSpecificationProvider {
override val flattening: FlatteningSpecificationProvider.Flattening =
FlatteningSpecificationProvider.Flattening.NO_FLATTENING
override val compression: ObjectStorageCompressionSpecification? = NoCompressionSpecification()
}

/** CSV */
@JsonSchemaTitle("CSV: Comma-Separated Values")
class CSVFormatSpecification(
/** JSONL */
@JsonSchemaTitle("JSON Lines: Newline-delimited JSON")
class JsonFormatSpecification(
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.CSV
override val formatType: Type = Type.JSONL
) :
ObjectStorageFormatSpecification(formatType),
ObjectStorageCompressionSpecificationProvider,
FlatteningSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
FlatteningSpecificationProvider,
ObjectStorageCompressionSpecificationProvider {
override val flattening: FlatteningSpecificationProvider.Flattening? =
FlatteningSpecificationProvider.Flattening.NO_FLATTENING
override val compression: ObjectStorageCompressionSpecification? = NoCompressionSpecification()
}

/** AVRO */
@JsonSchemaTitle("Avro: Apache Avro")
class AvroFormatSpecification(
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
@JsonSchemaInject(json = """{"order":0}""")
override val formatType: Type = Type.AVRO
) : ObjectStorageFormatSpecification(formatType) {

Expand All @@ -157,6 +159,7 @@ class AvroFormatSpecification(
"The compression algorithm used to compress data. Default to no compression."
)
@JsonProperty("compression_codec")
@JsonSchemaInject(json = """{"order":1}""")
val compressionCodec: AvroFormatCompressionCodecSpecification =
AvroFormatNoCompressionCodecSpecification()
}
Expand All @@ -180,36 +183,36 @@ class ParquetFormatSpecification(

@JsonSchemaTitle("Compression Codec")
@JsonPropertyDescription("The compression algorithm used to compress data pages.")
@JsonProperty("compression_codec")
@JsonProperty("compression_codec", defaultValue = "UNCOMPRESSED")
val compressionCodec: ParquetFormatCompressionCodec? =
ParquetFormatCompressionCodec.UNCOMPRESSED

@JsonSchemaTitle("Block Size (Row Group Size) (MB)")
@JsonPropertyDescription(
"This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB."
)
@JsonProperty("block_size_mb")
@JsonProperty("block_size_mb", defaultValue = "128")
val blockSizeMb: Int? = 128

@JsonSchemaTitle("Max Padding Size (MB)")
@JsonPropertyDescription(
"Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB."
)
@JsonProperty("max_padding_size_mb")
@JsonProperty("max_padding_size_mb", defaultValue = "8")
val maxPaddingSizeMb: Int? = 8

@JsonSchemaTitle("Page Size (KB)")
@JsonPropertyDescription(
"The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB."
)
@JsonProperty("page_size_kb")
@JsonProperty("page_size_kb", defaultValue = "1024")
val pageSizeKb: Int? = 1024

@JsonSchemaTitle("Dictionary Page Size (KB)")
@JsonPropertyDescription(
"There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB."
)
@JsonProperty("dictionary_page_size_kb")
@JsonProperty("dictionary_page_size_kb", defaultValue = "1024")
val dictionaryPageSizeKb: Int? = 1024

@JsonSchemaTitle("Dictionary Encoding")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,46 @@ package io.airbyte.cdk.load.command.s3

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

enum class S3BucketRegion {
`af-south-1`,
`ap-east-1`,
`ap-northeast-1`,
`ap-northeast-2`,
`ap-northeast-3`,
`ap-south-1`,
`ap-south-2`,
`ap-southeast-1`,
`ap-southeast-2`,
`ap-southeast-3`,
`ap-southeast-4`,
`ca-central-1`,
`ca-west-1`,
`cn-north-1`,
`cn-northwest-1`,
`eu-central-1`,
`eu-central-2`,
`eu-north-1`,
`eu-south-1`,
`eu-south-2`,
`eu-west-1`,
`eu-west-2`,
`eu-west-3`,
`il-central-1`,
`me-central-1`,
`me-south-1`,
`sa-east-1`,
`us-east-1`,
`us-east-2`,
`us-gov-east-1`,
`us-gov-west-1`,
`us-west-1`,
`us-west-2`
// First region is a kotlin-legal empty string
enum class S3BucketRegion(@get:JsonValue val region: String) {
NO_REGION(""),
`af-south-1`("af-south-1"),
`ap-east-1`("ap-east-1"),
`ap-northeast-1`("ap-northeast-1"),
`ap-northeast-2`("ap-northeast-2"),
`ap-northeast-3`("ap-northeast-3"),
`ap-south-1`("ap-south-1"),
`ap-south-2`("ap-south-2"),
`ap-southeast-1`("ap-southeast-1"),
`ap-southeast-2`("ap-southeast-2"),
`ap-southeast-3`("ap-southeast-3"),
`ap-southeast-4`("ap-southeast-4"),
`ca-central-1`("ca-central-1"),
`ca-west-1`("ca-west-1"),
`cn-north-1`("cn-north-1"),
`cn-northwest-1`("cn-northwest-1"),
`eu-central-1`("eu-central-1"),
`eu-central-2`("eu-central-2"),
`eu-north-1`("eu-north-1"),
`eu-south-1`("eu-south-1"),
`eu-south-2`("eu-south-2"),
`eu-west-1`("eu-west-1"),
`eu-west-2`("eu-west-2"),
`eu-west-3`("eu-west-3"),
`il-central-1`("il-central-1"),
`me-central-1`("me-central-1"),
`me-south-1`("me-south-1"),
`sa-east-1`("sa-east-1"),
`us-east-1`("us-east-1"),
`us-east-2`("us-east-2"),
`us-gov-east-1`("us-gov-east-1"),
`us-gov-west-1`("us-gov-west-1"),
`us-west-1`("us-west-1"),
`us-west-2`("us-west-2")
}

/**
Expand All @@ -66,7 +69,7 @@ interface S3BucketSpecification {
)
@get:JsonProperty("s3_bucket_region", defaultValue = "")
@get:JsonSchemaInject(json = """{"examples":["us-east-1"]}""")
val s3BucketRegion: S3BucketRegion
val s3BucketRegion: S3BucketRegion?

@get:JsonSchemaTitle("S3 Endpoint")
@get:JsonPropertyDescription(
Expand All @@ -77,7 +80,11 @@ interface S3BucketSpecification {
val s3Endpoint: String?

fun toS3BucketConfiguration(): S3BucketConfiguration {
return S3BucketConfiguration(s3BucketName, s3BucketRegion, s3Endpoint)
return S3BucketConfiguration(
s3BucketName,
s3BucketRegion ?: S3BucketRegion.NO_REGION,
s3Endpoint
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ interface S3PathSpecification {
@get:JsonPropertyDescription(
"Path to use when staging data in the bucket directory. Airbyte will stage data here during sync and/or write small manifest/recovery files."
)
@get:JsonProperty("s3_staging_prefix", defaultValue = "{s3_bucket_path}/__airbyte_tmp")
@get:JsonProperty("s3_staging_prefix")
@get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"]}""")
val s3StagingPrefix: String?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ dependencies {
integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-s3-destinations"))
// integrationTestLegacyImplementation testFixtures("io.airbyte.cdk:airbyte-cdk-db-destinations:0.47.0")
}

// Exclude conflicting log4j-over-slf4j dependency
configurations {
all {
exclude group: 'log4j-over-slf4j', module: 'log4j-over-slf4j'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down
Loading

0 comments on commit f4ad588

Please sign in to comment.