Skip to content

Commit

Permalink
copy over legacy DATs + enable remaining tests
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Nov 14, 2024
1 parent 7253ebd commit 880286a
Show file tree
Hide file tree
Showing 12 changed files with 485 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ application {
dependencies {
// temporary dependencies so that we can continue running the legacy test suite.
// eventually we should remove those tests + rely solely on the bulk CDK tests.
// integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations"))
// integrationTestLegacyImplementation testFixtures("io.airbyte.cdk:airbyte-cdk-db-destinations:0.47.0")
integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-s3-destinations"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseAvroDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator

class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest() {
override fun getProtocolVersion(): ProtocolVersion {
return ProtocolVersion.V1
}

override fun getTestDataComparator(): TestDataComparator {
return S3V2AvroParquetTestDataComparator()
}

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator
import java.nio.charset.StandardCharsets
import java.time.*
import java.time.format.DateTimeFormatter
import java.util.*

class S3V2AvroParquetTestDataComparator : AdvancedTestDataComparator() {
override fun compareDateValues(airbyteMessageValue: String, destinationValue: String): Boolean {
val destinationDate = LocalDate.ofEpochDay(destinationValue.toLong())
val expectedDate = LocalDate.parse(airbyteMessageValue, DateTimeFormatter.ISO_LOCAL_DATE)
return expectedDate == destinationDate
}

private fun getInstantFromEpoch(epochValue: String): Instant {
return Instant.ofEpochMilli(epochValue.toLong() / 1000)
}

override fun parseDestinationDateWithTz(destinationValue: String): ZonedDateTime {
return ZonedDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC)
}

override fun compareDateTimeValues(
airbyteMessageValue: String,
destinationValue: String
): Boolean {
val destinationDate =
LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC)
return super.compareDateTimeValues(airbyteMessageValue, destinationDate.toString())
}

override fun compareTimeWithoutTimeZone(
airbyteMessageValue: String,
destinationValue: String
): Boolean {
val destinationDate =
LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC)
val expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME)
return expectedDate == destinationDate
}

override fun compareString(expectedValue: JsonNode, actualValue: JsonNode): Boolean {
// to handle base64 encoded strings
return expectedValue.asText() == actualValue.asText() ||
decodeBase64(expectedValue.asText()) == actualValue.asText()
}

private fun decodeBase64(string: String): String {
val decoded = Base64.getDecoder().decode(string)
return String(decoded, StandardCharsets.UTF_8)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest

class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.assumeRoleConfig

override fun getConnectorEnv(): Map<String, String> {
return S3V2DestinationTestUtils.assumeRoleInternalCredentials
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion

class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override fun getProtocolVersion(): ProtocolVersion {
return ProtocolVersion.V1
}

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvGzipDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion

class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceTest() {
override fun getProtocolVersion(): ProtocolVersion {
return ProtocolVersion.V1
}

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.io.IOs.readFile
import io.airbyte.commons.json.Jsons.deserialize
import java.nio.file.Path

object S3V2DestinationTestUtils {
private const val ACCESS_KEY_CONFIG_SECRET_PATH =
"secrets/s3_dest_min_required_permissions_creds.json"
private const val ASSUME_ROLE_CONFIG_SECRET_PATH = "secrets/s3_dest_assume_role_config.json"
private const val ASSUME_ROLE_INTERNAL_CREDENTIALS_SECRET_PATH =
"secrets/s3_dest_iam_role_credentials_for_assume_role_auth.json"

private const val POLICY_MANAGER_CREDENTIALS_SECRET_PATH =
"secrets/s3_dest_policy_manager_credentials.json"

val baseConfigJsonFilePath: JsonNode
get() = deserialize(readFile(Path.of(ACCESS_KEY_CONFIG_SECRET_PATH)))

val assumeRoleConfig: JsonNode
get() = deserialize(readFile(Path.of(ASSUME_ROLE_CONFIG_SECRET_PATH)))

private fun getCredentials(secretPath: String): Map<String, String> {
val retVal = HashMap<String, String>()
for ((key, value) in deserialize(readFile(Path.of(secretPath))).properties()) {
retVal[key] = value.textValue()
}
return retVal
}

val assumeRoleInternalCredentials: Map<String, String>
get() = getCredentials(ASSUME_ROLE_INTERNAL_CREDENTIALS_SECRET_PATH)

val policyManagerCredentials: Map<String, String>
get() = getCredentials(POLICY_MANAGER_CREDENTIALS_SECRET_PATH)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.constant.S3Constants
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.*
import io.airbyte.workers.exception.TestHarnessException
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.time.Instant
import kotlin.io.path.createDirectories
import kotlin.io.path.createFile
import kotlin.io.path.writeText
import kotlin.random.Random
import kotlin.test.*
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Test

private val LOGGER = KotlinLogging.logger {}

class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
override val supportsFileTransfer = true
override val formatConfig: JsonNode
get() =
Jsons.jsonNode(
java.util.Map.of(
"format_type",
FileUploadFormat.CSV,
"flattening",
Flattening.ROOT_LEVEL.value,
"compression",
Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")),
)
)
override val baseConfigJson: JsonNode
get() =
(super.baseConfigJson as ObjectNode).put(
S3Constants.S_3_PATH_FORMAT,
"\${NAMESPACE}/\${STREAM_NAME}/"
)

private fun getStreamCompleteMessage(streamName: String): AirbyteMessage {
return AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(
AirbyteTraceMessage()
.withStreamStatus(
AirbyteStreamStatusTraceMessage()
.withStatus(
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
)
.withStreamDescriptor(StreamDescriptor().withName(streamName))
)
)
}

private fun createFakeFile(): Path {
val depth = Random.nextInt(10)
val dirPath =
(0..depth).joinToString("/") {
"dir" + RandomStringUtils.insecure().nextAlphanumeric(5)
}
val fileName = "fakeFile" + RandomStringUtils.insecure().nextAlphanumeric(5)
val filePath = "$dirPath/$fileName"
val fileSize = 1_024 * 1_024

fileTransferMountSource!!.resolve(dirPath).createDirectories()
val absoluteFilePath =
fileTransferMountSource!!
.resolve(filePath)
.createFile()
.writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize))
return Path.of(filePath)
}

private fun configureCatalog(streamName: String, generationId: Long): ConfiguredAirbyteCatalog {
val streamSchema = JsonNodeFactory.instance.objectNode()
streamSchema.set<JsonNode>("properties", JsonNodeFactory.instance.objectNode())
return ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.withGenerationId(generationId)
.withMinimumGenerationId(generationId)
.withSyncId(0)
.withStream(
AirbyteStream().withName(streamName).withJsonSchema(streamSchema)
),
),
)
}

private fun createMessageForFile(streamName: String, relativeFilePath: Path): AirbyteMessage {
val absoluteFilePath =
EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY.resolve(relativeFilePath)
return AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(streamName)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(ObjectMapper().readTree("{}"))
.withAdditionalProperty(
"file",
AirbyteRecordMessageFile(
fileUrl = absoluteFilePath.toString(),
bytes = absoluteFilePath.toFile().length(),
fileRelativePath = "$relativeFilePath",
modified = 123456L,
sourceFileUrl = "//sftp-testing-for-file-transfer/$relativeFilePath",
)
)
)
}

@Test
fun checkRecordSyncFails() {
val streamName = "str" + RandomStringUtils.insecure().nextAlphanumeric(5)
val catalog = configureCatalog(streamName, 0)
val recordBasedMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(streamName)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(
Jsons.jsonNode(
ImmutableMap.builder<Any, Any>()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-03-31T00:00:00Z")
.put("HKD", 10.1)
.put("NZD", 700.1)
.build(),
)
)
)
try {
runSyncAndVerifyStateOutput(
getConfig(),
listOf(recordBasedMessage, getStreamCompleteMessage(streamName)),
catalog,
false
)
fail("should have failed!")
} catch (e: TestHarnessException) {
assertContains(
e.outputMessages!![0].trace.error.internalMessage,
S3ConsumerFactory.MISSING_FILE_FIELD_IN_FILE_TRANSFER_ERROR_MESSAGE
)
}
}

@Test
fun testFakeFileTransfer() {
LOGGER.info {
"${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY} is mounted from $fileTransferMountSource"
}
val streamName = "str" + RandomStringUtils.insecure().nextAlphanumeric(5)
val filePath = createFakeFile()
val file = fileTransferMountSource!!.resolve(filePath).toFile()
val fileLength = file.length()
val fileContent = file.readBytes()
val catalog = configureCatalog(streamName, 32)
val recordMessage = createMessageForFile(streamName, filePath)

runSyncAndVerifyStateOutput(
getConfig(),
listOf(recordMessage, getStreamCompleteMessage(streamName)),
catalog,
false
)
val allObjectsInStore = getAllSyncedObjects(streamName)
val objectInStore = allObjectsInStore[0]
val objectMetadata =
s3Client!!.getObjectMetadata(objectInStore.bucketName, objectInStore.key)
val generationId =
objectMetadata
.getUserMetaDataOf(S3StorageOperations.GENERATION_ID_USER_META_KEY)
.toLong()
assertEquals(generationId, 32L)
assertFalse(file.exists(), "file should have been deleted by the connector")
assertEquals(fileLength, objectInStore.size)
assertEquals("$testBucketPath/$streamName/${filePath.toString()}", objectInStore.key)
assertContentEquals(
fileContent,
s3Client!!
.getObject(objectInStore.bucketName, objectInStore.key)
.objectContent
.readBytes()
)
}
}
Loading

0 comments on commit 880286a

Please sign in to comment.