Skip to content

Commit

Permalink
Merge pull request #4 from cultureamp/FW-283/unify-legacy-slack-data
Browse files Browse the repository at this point in the history
feat: Custom transformer to unify slack data
  • Loading branch information
niciqy authored Apr 13, 2022
2 parents 90aa6b6 + a3fe46b commit 231ab8d
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 1 deletion.
110 changes: 110 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,116 @@ Example
* Before: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 0`
* After: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 7`

## Unify Legacy SlackIntegration Payload

This is a custom transformer to unify `SlackIntegration` collections coming out of Murmur

We have 2 document variants for the `SlackIntegration` (Thanks MongoDB!)

**OAuth V1**
```
{
"_id": {
"$oid": "5e703178fff837001ffaf052"
},
"account_aggregate_id": "d8fb1ad0-6308-47c8-84d6-1eb0f367ca0d",
"account_id": {
"$oid": "5e2a754ddd99fd0022333044"
},
"created_at": "2020-03-17T02:10:00.181Z",
"oauth_response_data": {
"ok": true,
"access_token": "xoxp-993008094547-1006686967558-1004086920580-f7723a0d45deb6a482350428918781e3",
"scope": "identify,bot",
"user_id": "U0106L6UFGE",
"team_id": "TV7082SG3",
"enterprise_id": null,
"team_name": "Slack Testing",
"bot": {
"bot_user_id": "UV8DT789F",
"bot_access_token": "xoxb-993008094547-994469246321-XgSvWxmFSfRHP8gf55Y7dS28"
}
},
"status": "active",
"updated_at": "2022-04-07T03:29:39.800Z",
"v": 0
}
```
**OAuth V2**
```
{
"_id": {
"$oid": "624fd8c271efe5001fa984d5"
},
"account_aggregate_id": "8c6110c8-a879-40e7-9d3a-0fd91ee999a2",
"account_id": {
"$oid": "5ff62e4ce1cda00025f80576"
},
"created_at": "2022-04-08T06:40:02.649Z",
"oauth_response_data": {
"ok": true,
"app_id": "A037SJ77B41",
"authed_user": {
"id": "U02L4T6TH42",
"scope": "identity.basic,identity.email",
"access_token": "xoxp-2681941652837-2684924935138-3363184295571-0912805e1a9fb49b9f36e070bfdea9cd",
"token_type": "user"
},
"scope": "chat:write,commands,im:history,im:read,im:write,users.profile:read,users:read,users:read.email",
"token_type": "bot",
"access_token": "xoxb-2681941652837-3360300471349-0yNRFusOQ6ciye0X5AyDAB6S",
"bot_user_id": "U03AL8UDVA9",
"team": {
"id": "T02L1TPK6QM",
"name": "Angel CA Test Workspace"
},
"enterprise": null,
"is_enterprise_install": false
},
"status": "active",
"updated_at": "2022-04-08T06:40:02.657Z",
"v": 0
}
```
What this transformer does is to make sure we can get a unified payload into the topic. Without a custom transformer we will have multiple attribute in the topic which will not make sense to anyone without context

### Examples

Assume the following configuration:

```json
"transforms": "UnifyLegacySlackIntegrationPayload",
"transforms.UnifyLegacySlackIntegrationPayload.type":"com.cultureamp.kafka.connect.transforms.UnifyLegacySlackIntegrationPayload",
```

Avro Schema:

```
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "account_aggregate_id",
"type": "string"
},
{
"name": "access_token",
"type": "string"
},
{
"name": "team_id",
"type": "string"
},
{
"name": "team_name",
"type": "string"
}
]
}
```


## Installation
This library is built as a single `.jar` and published as a Github release. To install in your Connect cluster, add the JAR file to a directory that is on the clusters `plugin.path`.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.1.0"
version = "0.2.0"

repositories {
// Use Maven Central for resolving dependencies.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.cultureamp.kafka.connect.plugins.transforms

import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.DataException
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements

class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<R> {
private val ignoredAttributes = arrayListOf("account_aggregate_id", "oauth_response_data.access_token", "oauth_response_data.team_id", "oauth_response_data.team_name", "oauth_response_data.bot.bot_access_token", "oauth_response_data.team.id", "oauth_response_data.team.name")
private val PURPOSE = "unify legacy slack integration data"
override fun configure(configs: MutableMap<String, *>?) {}

override fun close() {}

private fun removeIgnoredAttributes(fields: List<Field>, builder: SchemaBuilder, hierarchy: String = ""): SchemaBuilder {
for (field in fields) {
if ("$hierarchy${field.name()}" !in ignoredAttributes) {
if (field.schema().type().getName() == "struct") {
val childSchema = removeIgnoredAttributes(field.schema().fields(), SchemaBuilder.struct(), "$hierarchy${field.name()}.")
// Only add child schema if is not empty
if (childSchema.fields().isNotEmpty()) {
builder.field(field.name(), childSchema.build())
}
} else {
builder
.field(field.name(), field.schema())
}
}
}
return builder
}

private fun populateValue(originalValues: Struct, updatedValues: Struct): Struct {
val newFields = updatedValues.schema().fields()
for (field in newFields) {
try {
if (field.schema().type().getName() == "struct") {
val childValue = populateValue(Requirements.requireStruct(originalValues.get(field.name()), PURPOSE), Struct(field.schema()))
updatedValues.put(field.name(), childValue)
} else {
updatedValues
.put(field.name(), originalValues.get(field.name()))
}
} catch (e: DataException) {
// This is catch exception thrown when field.name() in .get(field.name())) does not exists
}
}
return updatedValues
}

private fun extractUnifiedValues(oauthResponseData: Struct): Triple<String, String, String> {
var teamId: String
var teamName: String
var accessToken: String

try {
// Only Slack Integration OAuth V1 has "bot" child element
val dot: Struct = Requirements.requireStruct(oauthResponseData.get("bot"), PURPOSE)
teamId = oauthResponseData.get("team_id") as String
teamName = oauthResponseData.get("team_name") as String
accessToken = dot.get("bot_access_token") as String
} catch (e: DataException) {
// Slack Integration OAuth V2 Payload
val team: Struct = Requirements.requireStruct(oauthResponseData.get("team"), PURPOSE)
teamId = team.get("id") as String
teamName = team.get("name") as String
accessToken = oauthResponseData.get("access_token") as String
}
return Triple(teamId, teamName, accessToken)
}

override fun apply(record: R): R {
val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE)
val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE)
val updatedSchemaBuilder: SchemaBuilder = removeIgnoredAttributes(valueStruct.schema().fields(), SchemaBuilder.struct())
val(teamId, teamName, accessToken) = extractUnifiedValues(oauthResponseData)

// Add back the unified fields
val modifiedPayloadSchema = updatedSchemaBuilder
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.build()

val updatedValuesStruct: Struct = populateValue(valueStruct, Struct(modifiedPayloadSchema))

val modifiedPayloadStruct = updatedValuesStruct
.put("access_token", accessToken)
.put("team_id", teamId)
.put("team_name", teamName)

return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp())
}

override fun config(): ConfigDef {
return ConfigDef()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package com.cultureamp.kafka.connect.plugins.transforms

import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import kotlin.test.Test
import kotlin.test.assertEquals

class UnifyLegacySlackIntegrationPayloadTest {
private val CREATED_AT = "2022-04-08T06:40:02.649Z"
private val ACCESS_TOKEN = "xoxb-12345"
private val ACCOUNT_ID = "account-id"
private val TEAM_ID = "team-id"
private val TEAM_NAME = "team-name"

private fun createOAuthV1Payload(): Pair<Schema, Struct> {
val botSchema = SchemaBuilder.struct()
.field("bot_access_token", Schema.STRING_SCHEMA)
.field("bot_user_id", Schema.STRING_SCHEMA)
.build()

val botStruct = Struct(botSchema)
.put("bot_access_token", ACCESS_TOKEN)
.put("bot_user_id", "UV8DT789F")

val oauthResponseDataSchema = SchemaBuilder.struct()
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("bot", botSchema)
.build()

val oauthResponseDataStruct = Struct(oauthResponseDataSchema)
.put("access_token", "a-b-c")
.put("team_id", TEAM_ID)
.put("team_name", TEAM_NAME)
.put("bot", botStruct)

val payloadSchema = SchemaBuilder.struct()
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("created_at", Schema.STRING_SCHEMA)
.field("oauth_response_data", oauthResponseDataSchema)
.build()

val payloadStruct: Struct = Struct(payloadSchema)
.put("oauth_response_data", oauthResponseDataStruct)
.put("account_aggregate_id", ACCOUNT_ID)
.put("created_at", CREATED_AT)

return payloadSchema to payloadStruct
}

private fun createOAuthV2Payload(): Pair<Schema, Struct> {
val teamSchema = SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build()

val teamStruct = Struct(teamSchema)
.put("id", TEAM_ID)
.put("name", TEAM_NAME)

val oauthResponseDataSchema = SchemaBuilder.struct()
.field("access_token", Schema.STRING_SCHEMA)
.field("team", teamSchema)
.build()

val oauthResponseDataStruct = Struct(oauthResponseDataSchema)
.put("access_token", ACCESS_TOKEN)
.put("team", teamStruct)

val payloadSchema = SchemaBuilder.struct()
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("oauth_response_data", oauthResponseDataSchema)
.build()

val payloadStruct: Struct = Struct(payloadSchema)
.put("oauth_response_data", oauthResponseDataStruct)
.put("account_aggregate_id", ACCOUNT_ID)

return payloadSchema to payloadStruct
}

private fun oAuthV1ExpectedValue(): Pair<Schema, Struct> {
val botSchema = SchemaBuilder.struct()
.field("bot_user_id", Schema.STRING_SCHEMA)
.build()

val botStruct = Struct(botSchema)
.put("bot_user_id", "UV8DT789F")

val oauthResponseDataSchema = SchemaBuilder.struct()
.field("bot", botSchema)
.build()

val oauthResponseDataStruct = Struct(oauthResponseDataSchema)
.put("bot", botStruct)

val expectedSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("created_at", Schema.STRING_SCHEMA)
.field("oauth_response_data", oauthResponseDataSchema)
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.build()

val expectedValue = Struct(expectedSchema)
.put("created_at", CREATED_AT)
.put("oauth_response_data", oauthResponseDataStruct)
.put("account_aggregate_id", ACCOUNT_ID)
.put("access_token", ACCESS_TOKEN)
.put("team_id", TEAM_ID)
.put("team_name", TEAM_NAME)

return expectedSchema to expectedValue
}

@Test
fun `With Legacy Slack Integration Data`() {
val transformer: UnifyLegacySlackIntegrationPayload<SourceRecord> = UnifyLegacySlackIntegrationPayload()
val (payloadSchema, payloadStruct) = createOAuthV1Payload()

val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct))

val (expectedSchema, expectedValue) = oAuthV1ExpectedValue()

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}

@Test
fun `With Slack Integration Data`() {
val transformer: UnifyLegacySlackIntegrationPayload<SourceRecord> = UnifyLegacySlackIntegrationPayload()

val (payloadSchema, payloadStruct) = createOAuthV2Payload()

val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct))

val expectedSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.build()
val expectedValue = Struct(expectedSchema)
.put("account_aggregate_id", ACCOUNT_ID)
.put("access_token", ACCESS_TOKEN)
.put("team_id", TEAM_ID)
.put("team_name", TEAM_NAME)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
}
}

0 comments on commit 231ab8d

Please sign in to comment.