Skip to content

Commit

Permalink
Merge pull request #10 from cultureamp/FLW-200_II
Browse files Browse the repository at this point in the history
FLW-200: Determine when an app is removed from a CA Admin;
  • Loading branch information
mavaji authored Aug 19, 2022
2 parents 14001d5 + 769cf2c commit 1462a47
Show file tree
Hide file tree
Showing 13 changed files with 660 additions and 440 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

# Ignore Gradle build output directory
build
.idea
111 changes: 10 additions & 101 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,116 +35,25 @@ Example
* Before: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 0`
* After: `Headers = { account_id: "04a96f30-3dfa-11ec-9bbc-0242ac130002" }, Partition = 7`

## Unify Legacy SlackIntegration Payload
## SlackIntegration Payload Transformer

This is a custom transformer to unify `SlackIntegration` collections coming out of Murmur
This is a custom transformer to extract and reshape fields from `SlackIntegration` collections coming out of Murmur.

We have 2 document variants for the `SlackIntegration` (Thanks MongoDB!)
There are 2 document variants for the `SlackIntegration`
- OAuth V1: `resources/com/cultureamp/slack-integration-insert-v1.json`
- OAuth V2: `resources/com/cultureamp/slack-integration-insert-v2.json`

**OAuth V1**
```
{
"_id": {
"$oid": "5e703178fff837001ffaf052"
},
"account_aggregate_id": "d8fb1ad0-6308-47c8-84d6-1eb0f367ca0d",
"account_id": {
"$oid": "5e2a754ddd99fd0022333044"
},
"created_at": "2020-03-17T02:10:00.181Z",
"oauth_response_data": {
"ok": true,
"access_token": "xoxp-993008094547-1006686967558-1004086920580-f7723a0d45deb6a482350428918781e3",
"scope": "identify,bot",
"user_id": "U0106L6UFGE",
"team_id": "TV7082SG3",
"enterprise_id": null,
"team_name": "Slack Testing",
"bot": {
"bot_user_id": "UV8DT789F",
"bot_access_token": "xoxb-993008094547-994469246321-XgSvWxmFSfRHP8gf55Y7dS28"
}
},
"status": "active",
"updated_at": "2022-04-07T03:29:39.800Z",
"v": 0
}
```
**OAuth V2**
```
{
"_id": {
"$oid": "624fd8c271efe5001fa984d5"
},
"account_aggregate_id": "8c6110c8-a879-40e7-9d3a-0fd91ee999a2",
"account_id": {
"$oid": "5ff62e4ce1cda00025f80576"
},
"created_at": "2022-04-08T06:40:02.649Z",
"oauth_response_data": {
"ok": true,
"app_id": "A037SJ77B41",
"authed_user": {
"id": "U02L4T6TH42",
"scope": "identity.basic,identity.email",
"access_token": "xoxp-2681941652837-2684924935138-3363184295571-0912805e1a9fb49b9f36e070bfdea9cd",
"token_type": "user"
},
"scope": "chat:write,commands,im:history,im:read,im:write,users.profile:read,users:read,users:read.email",
"token_type": "bot",
"access_token": "xoxb-2681941652837-3360300471349-0yNRFusOQ6ciye0X5AyDAB6S",
"bot_user_id": "U03AL8UDVA9",
"team": {
"id": "T02L1TPK6QM",
"name": "Angel CA Test Workspace"
},
"enterprise": null,
"is_enterprise_install": false
},
"status": "active",
"updated_at": "2022-04-08T06:40:02.657Z",
"v": 0
}
```
What this transformer does is to make sure we can get a unified payload into the topic. Without a custom transformer we will have multiple attribute in the topic which will not make sense to anyone without context
What this transformer does is to make sure we can get a unified payload into the topic. Without a custom transformer we will have multiple attributes in the topic which will not make sense to anyone without context.

### Examples

Assume the following configuration:

```json
"transforms": "UnifyLegacySlackIntegrationPayload",
"transforms.UnifyLegacySlackIntegrationPayload.type":"com.cultureamp.kafka.connect.transforms.UnifyLegacySlackIntegrationPayload",
```

Avro Schema:

```
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "account_aggregate_id",
"type": "string"
},
{
"name": "access_token",
"type": "string"
},
{
"name": "team_id",
"type": "string"
},
{
"name": "team_name",
"type": "string"
}
]
}
```yaml
"transforms": "SlackIntegrationPayloadTransformer"
"transforms.SlackIntegrationPayloadTransformer.type":"com.cultureamp.kafka.connect.transforms.SlackIntegrationPayloadTransformer"
```

Target Avro Schema: `resources/com/cultureamp/slack-integration-target-schema.avsc`

## Installation
This library is built as a single `.jar` and published as a Github release. To install in your Connect cluster, add the JAR file to a directory that is on the clusters `plugin.path`.
Expand Down
6 changes: 5 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.4.1"
version = "0.5.0"

repositories {
// Use Maven Central for resolving dependencies.
Expand All @@ -32,6 +32,7 @@ dependencies {
// Kafka dependencies
implementation("org.apache.kafka:connect-api:$kafkaVersion")
implementation("org.apache.kafka:connect-transforms:$kafkaVersion")
implementation("org.apache.avro:avro:1.11.1")

// Use the Kotlin test library.
testImplementation("org.jetbrains.kotlin:kotlin-test")
Expand All @@ -41,6 +42,9 @@ dependencies {

implementation("ch.qos.logback:logback-classic:1.2.11")
implementation("ch.qos.logback:logback-core:1.2.11")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.3")
implementation("org.mongodb.kafka:mongo-kafka-connect:1.7.0")
implementation("org.mongodb:bson:4.5.1")
}

// A full list of config options can be found here:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.cultureamp.kafka.connect.plugins.transforms

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.slf4j.LoggerFactory

class SlackIntegrationPayloadTransformer<R : ConnectRecord<R>> : Transformation<R> {
private val logger = LoggerFactory.getLogger(this::class.java.canonicalName)
private val purpose = "slack integration payload transformer"

override fun configure(configs: MutableMap<String, *>?) {}

override fun config(): ConfigDef {
return ConfigDef()
}

override fun close() {}

override fun apply(record: R): R {
try {
val sourceValue = Requirements.requireStruct(record.value(), purpose)
val targetSchema = targetSchema()
val targetPayload = targetPayload(sourceValue, targetSchema)

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
targetSchema,
targetPayload,
record.timestamp()
)
} catch (e: Exception) {
logger.error("Exception: ", e)
logger.error("Record Received: " + record.value())
throw e
}
}

private fun targetPayload(sourceValue: Struct, targetSchema: Schema): Struct {
val targetPayload = Struct(targetSchema)

targetPayload
.put("account_aggregate_id", accountAggregateId(sourceValue))
.put("is_deleted", isDeleted(sourceValue))
.put("status", status(sourceValue))

with(oauthPayload(oauthResponseData(sourceValue))) {
targetPayload
.put("access_token", accessToken)
.put("team_id", teamId)
.put("team_name", teamName)
.put("access_token_scopes", scope)
.put("enterprise_id", enterpriseId)
}

return targetPayload
}

private fun oauthPayload(oauthResponseData: Struct): OauthPayload {
return try {
// Only Slack Integration OAuth V1 has "bot" child element
val bot = Requirements.requireStruct(oauthResponseData["bot"], purpose)

OauthPayload(
teamId = oauthResponseData["team_id"] as String,
teamName = oauthResponseData["team_name"] as String,
accessToken = bot["bot_access_token"] as String,
scope = oauthResponseData["scope"] as String,
enterpriseId = oauthResponseData["enterprise_id"] as String?
)
} catch (e: Exception) {
// Slack Integration OAuth V2 Payload
val team = Requirements.requireStruct(oauthResponseData["team"], purpose)

OauthPayload(
teamId = team["id"] as String,
teamName = team["name"] as String,
accessToken = oauthResponseData["access_token"] as String,
scope = oauthResponseData["scope"] as String,
enterpriseId = enterpriseId(oauthResponseData)
)
}
}

private fun enterpriseId(oauthResponseData: Struct): String? {
return try {
Requirements.requireStruct(oauthResponseData["enterprise"], purpose)["id"] as String
} catch (e: Exception) {
null
}
}

private fun status(fullDocument: Struct): String =
objectMapper.readValue(fullDocument["status"] as String, Map::class.java)["\$symbol"] as String

private fun oauthResponseData(fullDocument: Struct): Struct =
Requirements.requireStruct(fullDocument["oauth_response_data"], purpose)

private fun accountAggregateId(fullDocument: Struct): String = fullDocument["account_aggregate_id"] as String

private fun isDeleted(fullDocument: Struct): Boolean = fullDocument["deleted_at"] != null

private fun targetSchema(): Schema {
return SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("access_token_scopes", Schema.STRING_SCHEMA)
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.field("is_deleted", Schema.BOOLEAN_SCHEMA)
.field("status", Schema.STRING_SCHEMA)
.build()
}

private val objectMapper = ObjectMapper()
}

data class OauthPayload(
val teamId: String,
val teamName: String,
val accessToken: String,
val scope: String,
val enterpriseId: String?,
)
Loading

0 comments on commit 1462a47

Please sign in to comment.