Skip to content

Commit

Permalink
Merge pull request #8 from cultureamp/add-logging
Browse files Browse the repository at this point in the history
Do not throw exception for invalid data type
  • Loading branch information
niciqy authored May 6, 2022
2 parents a73f493 + c729ada commit 14001d5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
5 changes: 4 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
}

// Package version
version = "0.4.0"
version = "0.4.1"

repositories {
// Use Maven Central for resolving dependencies.
Expand All @@ -38,6 +38,9 @@ dependencies {

// Use the Kotlin JUnit integration.
testImplementation("org.jetbrains.kotlin:kotlin-test-junit")

implementation("ch.qos.logback:logback-classic:1.2.11")
implementation("ch.qos.logback:logback-core:1.2.11")
}

// A full list of config options can be found here:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,20 @@ import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.DataException
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.slf4j.LoggerFactory

data class Quintuple<T1, T2, T3, T4, T5>(val t1: T1, val t2: T2, val t3: T3, val t4: T4, val t5: T5)

class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<R> {
private val ignoredAttributes = arrayListOf(
"account_aggregate_id",
"oauth_response_data.access_token",
"oauth_response_data.team_id",
"oauth_response_data.team_name",
"oauth_response_data.enterprise_id",
"oauth_response_data.scope",
"oauth_response_data.bot.bot_access_token",
"oauth_response_data.team.id",
"oauth_response_data.team.name",
"oauth_response_data.enterprise.id",
)
val logger = LoggerFactory.getLogger(this::class.java.canonicalName)
private val PURPOSE = "unify legacy slack integration data"
override fun configure(configs: MutableMap<String, *>?) {}

override fun close() {}

private fun populateValue(originalValues: Struct, updatedValues: Struct): Struct {
val newFields = updatedValues.schema().fields()

for (field in newFields) {
try {
if (field.schema().type().getName() == "struct") {
Expand Down Expand Up @@ -83,30 +74,35 @@ class UnifyLegacySlackIntegrationPayload<R : ConnectRecord<R>> : Transformation<
}

override fun apply(record: R): R {
val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE)
val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE)
val(teamId, teamName, accessToken, scope, enterpriseId) = extractUnifiedValues(oauthResponseData)
var accountAggregateId = valueStruct.get("account_aggregate_id") as String

val modifiedPayloadSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("access_token_scopes", Schema.STRING_SCHEMA)
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.build()
try {
val valueStruct: Struct = Requirements.requireStruct(record.value(), PURPOSE)
val oauthResponseData: Struct = Requirements.requireStruct(valueStruct.get("oauth_response_data"), PURPOSE)
var accountAggregateId = valueStruct.get("account_aggregate_id") as String
val(teamId, teamName, accessToken, scope, enterpriseId) = extractUnifiedValues(oauthResponseData)
val modifiedPayloadSchema = SchemaBuilder.struct()
.name("com.cultureamp.murmur.slack_integrations")
.field("account_aggregate_id", Schema.STRING_SCHEMA)
.field("access_token", Schema.STRING_SCHEMA)
.field("team_id", Schema.STRING_SCHEMA)
.field("team_name", Schema.STRING_SCHEMA)
.field("access_token_scopes", Schema.STRING_SCHEMA)
.field("enterprise_id", Schema.OPTIONAL_STRING_SCHEMA)
.build()

val modifiedPayloadStruct = Struct(modifiedPayloadSchema)
.put("account_aggregate_id", accountAggregateId)
.put("access_token", accessToken)
.put("team_id", teamId)
.put("team_name", teamName)
.put("access_token_scopes", scope)
.put("enterprise_id", enterpriseId)
val modifiedPayloadStruct = Struct(modifiedPayloadSchema)
.put("account_aggregate_id", accountAggregateId)
.put("access_token", accessToken)
.put("team_id", teamId)
.put("team_name", teamName)
.put("access_token_scopes", scope)
.put("enterprise_id", enterpriseId)

return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp())
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), modifiedPayloadSchema, modifiedPayloadStruct, record.timestamp())
} catch (e: DataException) {
logger.error("DataException: ", e)
logger.error("Record Received: " + record.value())
throw e
}
}

override fun config(): ConfigDef {
Expand Down

0 comments on commit 14001d5

Please sign in to comment.