Skip to content

Commit

Permalink
Merge pull request #7 from cultureamp/fix-avro-schema
Browse files Browse the repository at this point in the history
Fix avro schema
  • Loading branch information
niciqy authored May 5, 2022
2 parents dcd9878 + 8032e97 commit a73f493
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 59 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.3.0"
version = "0.4.0"

repositories {
// Use Maven Central for resolving dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.cultureamp.kafka.connect.plugins.transforms

import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Field
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
Expand Down Expand Up @@ -30,24 +29,6 @@ class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<

override fun close() {}

private fun removeIgnoredAttributes(fields: List<Field>, builder: SchemaBuilder, hierarchy: String = ""): SchemaBuilder {
for (field in fields) {
if ("$hierarchy${field.name()}" !in ignoredAttributes) {
if (field.schema().type().getName() == "struct") {
val childSchema = removeIgnoredAttributes(field.schema().fields(), SchemaBuilder.struct(), "$hierarchy${field.name()}.")
// Only add child schema if is not empty
if (childSchema.fields().isNotEmpty()) {
builder.field(field.name(), childSchema.build())
}
} else {
builder
.field(field.name(), field.schema())
}
}
}
return builder
}

private fun populateValue(originalValues: Struct, updatedValues: Struct): Struct {
val newFields = updatedValues.schema().fields()
for (field in newFields) {
Expand All @@ -71,7 +52,7 @@ class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<
var teamName: String
var accessToken: String
var scope: String
var enterpriseId: String? = null
var enterpriseId: String?

try {
// Only Slack Integration OAuth V1 has "bot" child element
Expand Down Expand Up @@ -104,11 +85,10 @@ class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<
override fun apply(record: R): R {
val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE)
val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE)
val updatedSchemaBuilder: SchemaBuilder = removeIgnoredAttributes(valueStruct.schema().fields(), SchemaBuilder.struct())
val(teamId, teamName, accessToken, scope, enterpriseId) = extractUnifiedValues(oauthResponseData)
var accountAggregateId = valueStruct.get("account_aggregate_id") as String

// Add back the unified fields
val modifiedPayloadSchema = updatedSchemaBuilder
val modifiedPayloadSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
Expand All @@ -118,9 +98,8 @@ class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.build()

val updatedValuesStruct: Struct = populateValue(valueStruct, Struct(modifiedPayloadSchema))

val modifiedPayloadStruct = updatedValuesStruct
val modifiedPayloadStruct = Struct(modifiedPayloadSchema)
.put("account_aggregate_id", accountAggregateId)
.put("access_token", accessToken)
.put("team_id", teamId)
.put("team_name", teamName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,54 +111,29 @@ class UnifyLegacySlackIntegrationPayloadTest {
return payloadSchema to payloadStruct
}

private fun oAuthV1ExpectedValue(enterpriseId: String?): Pair<Schema, Struct> {
val botSchema = SchemaBuilder.struct()
.field("bot_user_id", Schema.STRING_SCHEMA)
.build()

val botStruct = Struct(botSchema)
.put("bot_user_id", "UV8DT789F")

val oauthResponseDataSchema = SchemaBuilder.struct()
.field("bot", botSchema)
.build()
@Test
fun `With Legacy Slack Integration Data without an enterprise id`() {
val transformer: UnifyLegacySlackIntegrationPayload<SourceRecord> = UnifyLegacySlackIntegrationPayload()
val (payloadSchema, payloadStruct) = createOAuthV1Payload(null)

val oauthResponseDataStruct = Struct(oauthResponseDataSchema)
.put("bot", botStruct)
val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct))

val expectedSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("created_at", Schema.STRING_SCHEMA)
.field("oauth_response_data", oauthResponseDataSchema)
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("access_token_scopes", Schema.STRING_SCHEMA)
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.build()

val expectedValue = Struct(expectedSchema)
.put("created_at", CREATED_AT)
.put("oauth_response_data", oauthResponseDataStruct)
.put("account_aggregate_id", ACCOUNT_ID)
.put("access_token", ACCESS_TOKEN)
.put("team_id", TEAM_ID)
.put("team_name", TEAM_NAME)
.put("access_token_scopes", SCOPE)
.put("enterprise_id", enterpriseId)

return expectedSchema to expectedValue
}

@Test
fun `With Legacy Slack Integration Data without an enterprise id`() {
val transformer: UnifyLegacySlackIntegrationPayload<SourceRecord> = UnifyLegacySlackIntegrationPayload()
val (payloadSchema, payloadStruct) = createOAuthV1Payload(null)

val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct))

val (expectedSchema, expectedValue) = oAuthV1ExpectedValue(null)
.put("enterprise_id", null)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
Expand All @@ -171,7 +146,22 @@ class UnifyLegacySlackIntegrationPayloadTest {

val transformedRecord: SourceRecord = transformer.apply(SourceRecord(null, null, "test", payloadSchema, payloadStruct))

val (expectedSchema, expectedValue) = oAuthV1ExpectedValue(ENTERPRISE_ID)
val expectedSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("access_token_scopes", Schema.STRING_SCHEMA)
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.build()
val expectedValue = Struct(expectedSchema)
.put("account_aggregate_id", ACCOUNT_ID)
.put("access_token", ACCESS_TOKEN)
.put("team_id", TEAM_ID)
.put("team_name", TEAM_NAME)
.put("access_token_scopes", SCOPE)
.put("enterprise_id", ENTERPRISE_ID)

assertEquals(expectedValue, transformedRecord.value())
assertEquals(expectedSchema, transformedRecord.valueSchema())
Expand Down

0 comments on commit a73f493

Please sign in to comment.