From 69e5ab2fc2bc50eb720fe73437b058e9f698097c Mon Sep 17 00:00:00 2001 From: Corine Liang <> Date: Tue, 12 Apr 2022 12:59:19 +1000 Subject: [PATCH 1/4] doc: Update Readme file --- README.md | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/README.md b/README.md index 71750f2..5b5253b 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,116 @@ Example * Before: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 0` * After: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 7` +## Unify Legacy SlackIntegration Payload + +This is a custom transformer to unify `SlackIntegration` collections coming out of Murmur + +We have 2 document variants for the `SlackIntegration` (Thanks MongoDB!) + +**OAuth V1** +``` +{ + "_id": { + "$oid": "5e703178fff837001ffaf052" + }, + "account_aggregate_id": "d8fb1ad0-6308-47c8-84d6-1eb0f367ca0d", + "account_id": { + "$oid": "5e2a754ddd99fd0022333044" + }, + "created_at": "2020-03-17T02:10:00.181Z", + "oauth_response_data": { + "ok": true, + "access_token": "xoxp-993008094547-1006686967558-1004086920580-f7723a0d45deb6a482350428918781e3", + "scope": "identify,bot", + "user_id": "U0106L6UFGE", + "team_id": "TV7082SG3", + "enterprise_id": null, + "team_name": "Slack Testing", + "bot": { + "bot_user_id": "UV8DT789F", + "bot_access_token": "xoxb-993008094547-994469246321-XgSvWxmFSfRHP8gf55Y7dS28" + } + }, + "status": "active", + "updated_at": "2022-04-07T03:29:39.800Z", + "v": 0 +} +``` +**OAuth V2** +``` +{ + "_id": { + "$oid": "624fd8c271efe5001fa984d5" + }, + "account_aggregate_id": "8c6110c8-a879-40e7-9d3a-0fd91ee999a2", + "account_id": { + "$oid": "5ff62e4ce1cda00025f80576" + }, + "created_at": "2022-04-08T06:40:02.649Z", + "oauth_response_data": { + "ok": true, + "app_id": "A037SJ77B41", + "authed_user": { + "id": "U02L4T6TH42", + "scope": "identity.basic,identity.email", + "access_token": "xoxp-2681941652837-2684924935138-3363184295571-0912805e1a9fb49b9f36e070bfdea9cd", + "token_type": "user" + }, + "scope": "chat:write,commands,im:history,im:read,im:write,users.profile:read,users:read,users:read.email", + "token_type": "bot", + "access_token": "xoxb-2681941652837-3360300471349-0yNRFusOQ6ciye0X5AyDAB6S", + "bot_user_id": "U03AL8UDVA9", + "team": { + "id": "T02L1TPK6QM", + "name": "Angel CA Test Workspace" + }, + "enterprise": null, + "is_enterprise_install": false + }, + "status": "active", + "updated_at": "2022-04-08T06:40:02.657Z", + "v": 0 +} +``` +What this transformer does is to make sure we can get a unified payload into the topic. Without a custom transformer we will have multiple attribute in the topic which will not make sense to anyone without context + +### Examples + +Assume the following configuration: + +```json +"transforms": "UnifyLegacySlackIntegrationPayload", +"transforms.UnifyLegacySlackIntegrationPayload.type":"com.cultureamp.kafka.connect.transforms.UnifyLegacySlackIntegrationPayload", +``` + +Avro Schema: + +``` +{ + "type": "record", + "name": "ConnectDefault", + "namespace": "io.confluent.connect.avro", + "fields": [ + { + "name": "account_aggregate_id", + "type": "string" + }, + { + "name": "access_token", + "type": "string" + }, + { + "name": "team_id", + "type": "string" + }, + { + "name": "team_name", + "type": "string" + } + ] +} +``` + ## Installation This library is built as a single `.jar` and published as a Github release. To install in your Connect cluster, add the JAR file to a directory that is on the clusters `plugin.path`. From bbe73cfd0fc63194389f5034f32263f6eaa6ddf9 Mon Sep 17 00:00:00 2001 From: Corine Liang <> Date: Tue, 12 Apr 2022 14:52:07 +1000 Subject: [PATCH 2/4] feat: Custom transformer to unify slack data --- build.gradle.kts | 2 +- .../UnifyLegacySlackIntegrationPayload.kt | 103 ++++++++++++++++++ .../UnifyLegacySlackIntegrationPayloadTest.kt | 98 +++++++++++++++++ 3 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt create mode 100644 src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 6835875..176afc1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -15,7 +15,7 @@ plugins { } // Package version -version = "0.1.0" +version = "0.2.0" repositories { // Use Maven Central for resolving dependencies. diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt new file mode 100644 index 0000000..9c91231 --- /dev/null +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt @@ -0,0 +1,103 @@ +package com.cultureamp.kafka.connect.plugins.transforms + +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.ConnectRecord +import org.apache.kafka.connect.data.Field +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.DataException +import org.apache.kafka.connect.transforms.Transformation +import org.apache.kafka.connect.transforms.util.Requirements + +class UnifyLegacySlackIntegrationPayload> : Transformation { + val attributesToIgnoreList = arrayListOf("account_aggregate_id", "oauth_response_data.access_token", "oauth_response_data.team_id", "oauth_response_data.team_name", "oauth_response_data.bot.bot_access_token", "oauth_response_data.team.id", "oauth_response_data.team.name") + val PURPOSE = "unify legacy slack integration data" + override fun configure(configs: MutableMap?) {} + + override fun close() {} + + fun removeIgnoredAttributes(fields: List, builder: SchemaBuilder, hierarchy: String = ""): SchemaBuilder { + for (field in fields) { + if ("$hierarchy${field.name()}" !in attributesToIgnoreList) { + if (field.schema().type().getName() == "struct") { + val childSchema = removeIgnoredAttributes(field.schema().fields(), SchemaBuilder.struct(), "$hierarchy${field.name()}.") + // Only add child schema if is not empty + if (childSchema.fields().count() > 0) { + builder.field(field.name(), childSchema.build()) + } + } else { + builder + .field(field.name(), field.schema()) + } + } + } + return builder + } + + fun populateValue(originalValues: Struct, updatedValues: Struct): Struct { + var newFields = updatedValues.schema().fields() + for (field in newFields) { + try { + if (field.schema().type().getName() == "struct") { + val childValue = populateValue(Requirements.requireStruct(originalValues.get(field.name()), PURPOSE), Struct(field.schema())) + updatedValues.put(field.name(), childValue) + } else { + updatedValues + .put(field.name(), originalValues.get(field.name())) + } + } catch (e: DataException) { + } + } + return updatedValues + } + + fun extractUnifiedValue(oauthResponseData: Struct): Triple { + var teamId: String + var teamName: String + var accessToken: String + + try { + // Only Slack Integration OAuth V1 has "bot" child element + val dot: Struct = Requirements.requireStruct(oauthResponseData.get("bot"), PURPOSE) + teamId = oauthResponseData.get("team_id") as String + teamName = oauthResponseData.get("team_name") as String + accessToken = dot.get("bot_access_token") as String + } catch (e: DataException) { + // Slack Integration OAuth V2 Payload + val team: Struct = Requirements.requireStruct(oauthResponseData.get("team"), PURPOSE) + teamId = team.get("id") as String + teamName = team.get("name") as String + accessToken = oauthResponseData.get("access_token") as String + } + return Triple(teamId, teamName, accessToken) + } + + override fun apply(record: R): R { + val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE) + val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE) + val updatedSchemaBuilder: SchemaBuilder = removeIgnoredAttributes(valueStruct.schema().fields(), SchemaBuilder.struct()) + val(teamId, teamName, accessToken) = extractUnifiedValue(oauthResponseData) + + // Add back the unified fields + val modifiedPayloadSchema = updatedSchemaBuilder + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("access_token", Schema.STRING_SCHEMA) + .field("team_id", Schema.STRING_SCHEMA) + .field("team_name", Schema.STRING_SCHEMA) + .build() + + val updatedValuesStruct: Struct = populateValue(valueStruct, Struct(modifiedPayloadSchema)) + + val modifiedPayloadStruct = updatedValuesStruct + .put("access_token", accessToken) + .put("team_id", teamId) + .put("team_name", teamName) + + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp()) + } + + override fun config(): ConfigDef { + return ConfigDef() + } +} diff --git a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt new file mode 100644 index 0000000..87439fa --- /dev/null +++ b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt @@ -0,0 +1,98 @@ +package com.cultureamp.kafka.connect.plugins.transforms + +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.source.SourceRecord +import kotlin.test.Test +import kotlin.test.assertEquals + +class UnifyLegacySlackIntegrationPayloadTest { + + fun createSlackIntegrationPayload(): Pair { + val botSchema = SchemaBuilder.struct() + .field("bot_access_token", Schema.STRING_SCHEMA) + .field("bot_user_id", Schema.STRING_SCHEMA) + .build() + + val botStruct = Struct(botSchema) + .put("bot_access_token", "xoxb-12345") + .put("bot_user_id", "UV8DT789F") + + val oauthResponseDataSchema = SchemaBuilder.struct() + .field("access_token", Schema.STRING_SCHEMA) + .field("team_id", Schema.STRING_SCHEMA) + .field("team_name", Schema.STRING_SCHEMA) + .field("bot", botSchema) + .build() + + val oauthResponseDataStruct = Struct(oauthResponseDataSchema) + .put("access_token", "a-b-c") + .put("team_id", "a-b-c") + .put("team_name", "a-b-c") + .put("bot", botStruct) + + val payloacSchema = SchemaBuilder.struct() + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("created_at", Schema.STRING_SCHEMA) + .field("oauth_response_data", oauthResponseDataSchema) + .build() + + val payloadStruct: Struct = Struct(payloacSchema) + .put("oauth_response_data", oauthResponseDataStruct) + .put("account_aggregate_id", "1-2-3") + .put("created_at", "2022-04-08T06:40:02.649Z") + + return payloacSchema to payloadStruct + } + + fun createLagacySlackIntegrationPayload(): Pair { + val teamSchema = SchemaBuilder.struct() + .field("id", Schema.STRING_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .build() + + val teamStruct = Struct(teamSchema) + .put("id", "a-b-c") + .put("name", "a-b-c") + + val oauthResponseDataSchema = SchemaBuilder.struct() + .field("access_token", Schema.STRING_SCHEMA) + .field("team", teamSchema) + .build() + + val oauthResponseDataStruct = Struct(oauthResponseDataSchema) + .put("access_token", "xoxb-12345") + .put("team", teamStruct) + + val payloacSchema = SchemaBuilder.struct() + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("oauth_response_data", oauthResponseDataSchema) + .build() + + val payloadStruct: Struct = Struct(payloacSchema) + .put("oauth_response_data", oauthResponseDataStruct) + .put("account_aggregate_id", "1-2-3") + + return payloacSchema to payloadStruct + } + + @Test + fun `With slack Integration Data`() { + val partitionSmt: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() + val (payloacSchema, payloadStruct) = createSlackIntegrationPayload() + + val transformedRecord: SourceRecord = partitionSmt.apply(SourceRecord(null, null, "test", payloacSchema, payloadStruct)) + assertEquals(transformedRecord.value().toString(), "Struct{created_at=2022-04-08T06:40:02.649Z,oauth_response_data=Struct{bot=Struct{bot_user_id=UV8DT789F}},account_aggregate_id=1-2-3,access_token=xoxb-12345,team_id=a-b-c,team_name=a-b-c}") + } + + @Test + fun `With legacy Slack Integration Data`() { + val partitionSmt: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() + + val (payloacSchema, payloadStruct) = createLagacySlackIntegrationPayload() + + val transformedRecord: SourceRecord = partitionSmt.apply(SourceRecord(null, null, "test", payloacSchema, payloadStruct)) + assertEquals(transformedRecord.value().toString(), "Struct{account_aggregate_id=1-2-3,access_token=xoxb-12345,team_id=a-b-c,team_name=a-b-c}") + } +} From ca53aebd293014590dc03d5f66baaaacc7b9ccc8 Mon Sep 17 00:00:00 2001 From: Corine Liang <> Date: Tue, 12 Apr 2022 15:33:01 +1000 Subject: [PATCH 3/4] fix: Add a namespace --- .../plugins/transforms/UnifyLegacySlackIntegrationPayload.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt index 9c91231..6014e01 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt @@ -81,6 +81,7 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< // Add back the unified fields val modifiedPayloadSchema = updatedSchemaBuilder + .name("com.cultureamp.murmur.slack_integrations") .field("account_aggregate_id", Schema.STRING_SCHEMA) .field("access_token", Schema.STRING_SCHEMA) .field("team_id", Schema.STRING_SCHEMA) From a3fe46bfd9ea1a389185be1ceffd36af60e015e6 Mon Sep 17 00:00:00 2001 From: Corine Liang <> Date: Tue, 12 Apr 2022 21:55:39 +1000 Subject: [PATCH 4/4] fix: address PR comments --- .../UnifyLegacySlackIntegrationPayload.kt | 19 +-- .../UnifyLegacySlackIntegrationPayloadTest.kt | 114 +++++++++++++----- 2 files changed, 97 insertions(+), 36 deletions(-) diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt index 6014e01..4c3289b 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt @@ -11,19 +11,19 @@ import org.apache.kafka.connect.transforms.Transformation import org.apache.kafka.connect.transforms.util.Requirements class UnifyLegacySlackIntegrationPayload> : Transformation { - val attributesToIgnoreList = arrayListOf("account_aggregate_id", "oauth_response_data.access_token", "oauth_response_data.team_id", "oauth_response_data.team_name", "oauth_response_data.bot.bot_access_token", "oauth_response_data.team.id", "oauth_response_data.team.name") - val PURPOSE = "unify legacy slack integration data" + private val ignoredAttributes = arrayListOf("account_aggregate_id", "oauth_response_data.access_token", "oauth_response_data.team_id", "oauth_response_data.team_name", "oauth_response_data.bot.bot_access_token", "oauth_response_data.team.id", "oauth_response_data.team.name") + private val PURPOSE = "unify legacy slack integration data" override fun configure(configs: MutableMap?) {} override fun close() {} - fun removeIgnoredAttributes(fields: List, builder: SchemaBuilder, hierarchy: String = ""): SchemaBuilder { + private fun removeIgnoredAttributes(fields: List, builder: SchemaBuilder, hierarchy: String = ""): SchemaBuilder { for (field in fields) { - if ("$hierarchy${field.name()}" !in attributesToIgnoreList) { + if ("$hierarchy${field.name()}" !in ignoredAttributes) { if (field.schema().type().getName() == "struct") { val childSchema = removeIgnoredAttributes(field.schema().fields(), SchemaBuilder.struct(), "$hierarchy${field.name()}.") // Only add child schema if is not empty - if (childSchema.fields().count() > 0) { + if (childSchema.fields().isNotEmpty()) { builder.field(field.name(), childSchema.build()) } } else { @@ -35,8 +35,8 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< return builder } - fun populateValue(originalValues: Struct, updatedValues: Struct): Struct { - var newFields = updatedValues.schema().fields() + private fun populateValue(originalValues: Struct, updatedValues: Struct): Struct { + val newFields = updatedValues.schema().fields() for (field in newFields) { try { if (field.schema().type().getName() == "struct") { @@ -47,12 +47,13 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< .put(field.name(), originalValues.get(field.name())) } } catch (e: DataException) { + // This is catch exception thrown when field.name() in .get(field.name())) does not exists } } return updatedValues } - fun extractUnifiedValue(oauthResponseData: Struct): Triple { + private fun extractUnifiedValues(oauthResponseData: Struct): Triple { var teamId: String var teamName: String var accessToken: String @@ -77,7 +78,7 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE) val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE) val updatedSchemaBuilder: SchemaBuilder = removeIgnoredAttributes(valueStruct.schema().fields(), SchemaBuilder.struct()) - val(teamId, teamName, accessToken) = extractUnifiedValue(oauthResponseData) + val(teamId, teamName, accessToken) = extractUnifiedValues(oauthResponseData) // Add back the unified fields val modifiedPayloadSchema = updatedSchemaBuilder diff --git a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt index 87439fa..19481f3 100644 --- a/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt +++ b/src/test/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayloadTest.kt @@ -8,15 +8,20 @@ import kotlin.test.Test import kotlin.test.assertEquals class UnifyLegacySlackIntegrationPayloadTest { + private val CREATED_AT = "2022-04-08T06:40:02.649Z" + private val ACCESS_TOKEN = "xoxb-12345" + private val ACCOUNT_ID = "account-id" + private val TEAM_ID = "team-id" + private val TEAM_NAME = "team-name" - fun createSlackIntegrationPayload(): Pair { + private fun createOAuthV1Payload(): Pair { val botSchema = SchemaBuilder.struct() .field("bot_access_token", Schema.STRING_SCHEMA) .field("bot_user_id", Schema.STRING_SCHEMA) .build() val botStruct = Struct(botSchema) - .put("bot_access_token", "xoxb-12345") + .put("bot_access_token", ACCESS_TOKEN) .put("bot_user_id", "UV8DT789F") val oauthResponseDataSchema = SchemaBuilder.struct() @@ -28,33 +33,33 @@ class UnifyLegacySlackIntegrationPayloadTest { val oauthResponseDataStruct = Struct(oauthResponseDataSchema) .put("access_token", "a-b-c") - .put("team_id", "a-b-c") - .put("team_name", "a-b-c") + .put("team_id", TEAM_ID) + .put("team_name", TEAM_NAME) .put("bot", botStruct) - val payloacSchema = SchemaBuilder.struct() + val payloadSchema = SchemaBuilder.struct() .field("account_aggregate_id", Schema.STRING_SCHEMA) .field("created_at", Schema.STRING_SCHEMA) .field("oauth_response_data", oauthResponseDataSchema) .build() - val payloadStruct: Struct = Struct(payloacSchema) + val payloadStruct: Struct = Struct(payloadSchema) .put("oauth_response_data", oauthResponseDataStruct) - .put("account_aggregate_id", "1-2-3") - .put("created_at", "2022-04-08T06:40:02.649Z") + .put("account_aggregate_id", ACCOUNT_ID) + .put("created_at", CREATED_AT) - return payloacSchema to payloadStruct + return payloadSchema to payloadStruct } - fun createLagacySlackIntegrationPayload(): Pair { + private fun createOAuthV2Payload(): Pair { val teamSchema = SchemaBuilder.struct() .field("id", Schema.STRING_SCHEMA) .field("name", Schema.STRING_SCHEMA) .build() val teamStruct = Struct(teamSchema) - .put("id", "a-b-c") - .put("name", "a-b-c") + .put("id", TEAM_ID) + .put("name", TEAM_NAME) val oauthResponseDataSchema = SchemaBuilder.struct() .field("access_token", Schema.STRING_SCHEMA) @@ -62,37 +67,92 @@ class UnifyLegacySlackIntegrationPayloadTest { .build() val oauthResponseDataStruct = Struct(oauthResponseDataSchema) - .put("access_token", "xoxb-12345") + .put("access_token", ACCESS_TOKEN) .put("team", teamStruct) - val payloacSchema = SchemaBuilder.struct() + val payloadSchema = SchemaBuilder.struct() .field("account_aggregate_id", Schema.STRING_SCHEMA) .field("oauth_response_data", oauthResponseDataSchema) .build() - val payloadStruct: Struct = Struct(payloacSchema) + val payloadStruct: Struct = Struct(payloadSchema) .put("oauth_response_data", oauthResponseDataStruct) - .put("account_aggregate_id", "1-2-3") + .put("account_aggregate_id", ACCOUNT_ID) - return payloacSchema to payloadStruct + return payloadSchema to payloadStruct + } + + private fun oAuthV1ExpectedValue(): Pair { + val botSchema = SchemaBuilder.struct() + .field("bot_user_id", Schema.STRING_SCHEMA) + .build() + + val botStruct = Struct(botSchema) + .put("bot_user_id", "UV8DT789F") + + val oauthResponseDataSchema = SchemaBuilder.struct() + .field("bot", botSchema) + .build() + + val oauthResponseDataStruct = Struct(oauthResponseDataSchema) + .put("bot", botStruct) + + val expectedSchema = SchemaBuilder.struct() + .name("com.cultureamp.murmur.slack_integrations") + .field("created_at", Schema.STRING_SCHEMA) + .field("oauth_response_data", oauthResponseDataSchema) + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("access_token", Schema.STRING_SCHEMA) + .field("team_id", Schema.STRING_SCHEMA) + .field("team_name", Schema.STRING_SCHEMA) + .build() + + val expectedValue = Struct(expectedSchema) + .put("created_at", CREATED_AT) + .put("oauth_response_data", oauthResponseDataStruct) + .put("account_aggregate_id", ACCOUNT_ID) + .put("access_token", ACCESS_TOKEN) + .put("team_id", TEAM_ID) + .put("team_name", TEAM_NAME) + + return expectedSchema to expectedValue } @Test - fun `With slack Integration Data`() { - val partitionSmt: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() - val (payloacSchema, payloadStruct) = createSlackIntegrationPayload() + fun `With Legacy Slack Integration Data`() { + val transformer: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() + val (payloadSchema, payloadStruct) = createOAuthV1Payload() - val transformedRecord: SourceRecord = partitionSmt.apply(SourceRecord(null, null, "test", payloacSchema, payloadStruct)) - assertEquals(transformedRecord.value().toString(), "Struct{created_at=2022-04-08T06:40:02.649Z,oauth_response_data=Struct{bot=Struct{bot_user_id=UV8DT789F}},account_aggregate_id=1-2-3,access_token=xoxb-12345,team_id=a-b-c,team_name=a-b-c}") + val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct)) + + val (expectedSchema, expectedValue) = oAuthV1ExpectedValue() + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } @Test - fun `With legacy Slack Integration Data`() { - val partitionSmt: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() + fun `With Slack Integration Data`() { + val transformer: UnifyLegacySlackIntegrationPayload = UnifyLegacySlackIntegrationPayload() - val (payloacSchema, payloadStruct) = createLagacySlackIntegrationPayload() + val (payloadSchema, payloadStruct) = createOAuthV2Payload() - val transformedRecord: SourceRecord = partitionSmt.apply(SourceRecord(null, null, "test", payloacSchema, payloadStruct)) - assertEquals(transformedRecord.value().toString(), "Struct{account_aggregate_id=1-2-3,access_token=xoxb-12345,team_id=a-b-c,team_name=a-b-c}") + val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct)) + + val expectedSchema = SchemaBuilder.struct() + .name("com.cultureamp.murmur.slack_integrations") + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("access_token", Schema.STRING_SCHEMA) + .field("team_id", Schema.STRING_SCHEMA) + .field("team_name", Schema.STRING_SCHEMA) + .build() + val expectedValue = Struct(expectedSchema) + .put("account_aggregate_id", ACCOUNT_ID) + .put("access_token", ACCESS_TOKEN) + .put("team_id", TEAM_ID) + .put("team_name", TEAM_NAME) + + assertEquals(expectedValue, transformedRecord.value()) + assertEquals(expectedSchema, transformedRecord.valueSchema()) } }