diff --git a/build.gradle.kts b/build.gradle.kts index 05b38bc..7aa8e09 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -15,7 +15,7 @@ plugins { } // Package version -version = "0.4.0" +version = "0.4.1" repositories { // Use Maven Central for resolving dependencies. @@ -38,6 +38,9 @@ dependencies { // Use the Kotlin JUnit integration. testImplementation("org.jetbrains.kotlin:kotlin-test-junit") + + implementation("ch.qos.logback:logback-classic:1.2.11") + implementation("ch.qos.logback:logback-core:1.2.11") } // A full list of config options can be found here: diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt index d575dff..453c8e5 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/UnifyLegacySlackIntegrationPayload.kt @@ -8,22 +8,12 @@ import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.errors.DataException import org.apache.kafka.connect.transforms.Transformation import org.apache.kafka.connect.transforms.util.Requirements +import org.slf4j.LoggerFactory data class Quintuple(val t1: T1, val t2: T2, val t3: T3, val t4: T4, val t5: T5) class UnifyLegacySlackIntegrationPayload> : Transformation { - private val ignoredAttributes = arrayListOf( - "account_aggregate_id", - "oauth_response_data.access_token", - "oauth_response_data.team_id", - "oauth_response_data.team_name", - "oauth_response_data.enterprise_id", - "oauth_response_data.scope", - "oauth_response_data.bot.bot_access_token", - "oauth_response_data.team.id", - "oauth_response_data.team.name", - "oauth_response_data.enterprise.id", - ) + val logger = LoggerFactory.getLogger(this::class.java.canonicalName) private val PURPOSE = "unify legacy slack integration data" override fun configure(configs: MutableMap?) {} @@ -31,6 +21,7 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< private fun populateValue(originalValues: Struct, updatedValues: Struct): Struct { val newFields = updatedValues.schema().fields() + for (field in newFields) { try { if (field.schema().type().getName() == "struct") { @@ -83,30 +74,35 @@ class UnifyLegacySlackIntegrationPayload> : Transformation< } override fun apply(record: R): R { - val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE) - val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE) - val(teamId, teamName, accessToken, scope, enterpriseId) = extractUnifiedValues(oauthResponseData) - var accountAggregateId = valueStruct.get("account_aggregate_id") as String - - val modifiedPayloadSchema = SchemaBuilder.struct() - .name("com.cultureamp.murmur.slack_integrations") - .field("account_aggregate_id", Schema.STRING_SCHEMA) - .field("access_token", Schema.STRING_SCHEMA) - .field("team_id", Schema.STRING_SCHEMA) - .field("team_name", Schema.STRING_SCHEMA) - .field("access_token_scopes", Schema.STRING_SCHEMA) - .field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA) - .build() + try { + val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE) + val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE) + var accountAggregateId = valueStruct.get("account_aggregate_id") as String + val(teamId, teamName, accessToken, scope, enterpriseId) = extractUnifiedValues(oauthResponseData) + val modifiedPayloadSchema = SchemaBuilder.struct() + .name("com.cultureamp.murmur.slack_integrations") + .field("account_aggregate_id", Schema.STRING_SCHEMA) + .field("access_token", Schema.STRING_SCHEMA) + .field("team_id", Schema.STRING_SCHEMA) + .field("team_name", Schema.STRING_SCHEMA) + .field("access_token_scopes", Schema.STRING_SCHEMA) + .field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA) + .build() - val modifiedPayloadStruct = Struct(modifiedPayloadSchema) - .put("account_aggregate_id", accountAggregateId) - .put("access_token", accessToken) - .put("team_id", teamId) - .put("team_name", teamName) - .put("access_token_scopes", scope) - .put("enterprise_id", enterpriseId) + val modifiedPayloadStruct = Struct(modifiedPayloadSchema) + .put("account_aggregate_id", accountAggregateId) + .put("access_token", accessToken) + .put("team_id", teamId) + .put("team_name", teamName) + .put("access_token_scopes", scope) + .put("enterprise_id", enterpriseId) - return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp()) + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp()) + } catch (e: DataException) { + logger.error("DataException: ", e) + logger.error("Record Received: " + record.value()) + throw e + } } override fun config(): ConfigDef {