Skip to content

Commit

Permalink
#6 Persist a summary of each reconciliation run which can link all th…
Browse files Browse the repository at this point in the history
…e rows together

Allows running reconciliation for the same dataset multiple times
  • Loading branch information
chadlwilson committed Oct 13, 2021
1 parent 8834a5e commit 8100e91
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,38 @@ private val logger = KotlinLogging.logger {}
@Singleton
open class DataSetService(
@Inject private val config: ReconciliationConfiguration,
private val runRepository: MigrationRunRepository,
private val recordRepository: MigrationRecordRepository
) {
fun start(dataSetName: String): Mono<DataSetResults> {
fun start(dataSetId: String): Mono<MigrationRun> {

val source = config.datasets[dataSetName]?.source
?: throw IllegalArgumentException("[$dataSetName] not found!")
val source = config.datasets[dataSetId]?.source
?: throw IllegalArgumentException("[$dataSetId] not found!")

logger.info { "Streaming [$dataSetName] from [source]..." }
logger.info { "Starting reconciliation run for [$dataSetId]..." }

// TODO Make this properly async/reactive
val migrationRun = runRepository.save(MigrationRun(dataSetId)).block()!!

logger.info { "Streaming $migrationRun from [source]..." }

return Mono.from(source.dbOperations.connectionFactory().create())
.flatMapMany { it.createStatement(source.query).execute() }
.flatMap { result -> result.map(::toHashedRow) }
.map { row ->
MigrationRecord(MigrationRecordKey(dataSetName, row.migrationKey)).apply {
MigrationRecord(MigrationRecordKey(migrationRun.id!!, row.migrationKey)).apply {
sourceData = row.hashedValue
}
}
.flatMap { record -> recordRepository.save(record) }
.count()
.map { DataSetResults(it) }
.map { migrationRun.apply { results = DataSetResults(it) } }
}

@EventListener
@Async
open fun doOnStart(event: ServiceReadyEvent): Mono<DataSetResults> {
open fun doOnStart(event: ServiceReadyEvent): Mono<MigrationRun> {
return start("test-dataset")
.doOnEach { logger.info { it.toString() } }
}
}

data class DataSetResults(val sourceRowsInserted: Long)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.thoughtworks.recce.server.dataset

import io.micronaut.data.annotation.DateCreated
import io.micronaut.data.annotation.DateUpdated
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.r2dbc.annotation.R2dbcRepository
import io.micronaut.data.repository.reactive.ReactorCrudRepository
import java.io.Serializable
import java.time.LocalDateTime
import javax.persistence.*

@R2dbcRepository(dialect = Dialect.H2)
interface MigrationRunRepository : ReactorCrudRepository<MigrationRun, Int>

@R2dbcRepository(dialect = Dialect.H2)
interface MigrationRecordRepository : ReactorCrudRepository<MigrationRecord, MigrationRecordKey>

@Entity
@Table(name = "data_set_migration_run")
data class MigrationRun(
@Id @GeneratedValue val id: Int?,
val dataSetId: String,
@DateCreated val createdTime: LocalDateTime?,
) {
@DateUpdated
var updatedTime: LocalDateTime? = null
var completedTime: LocalDateTime? = null

constructor(dataSetId: String) : this(null, dataSetId, null)

@Transient
var results: DataSetResults? = null
}

data class DataSetResults(val sourceRowsInserted: Long)

@Entity
@Table(name = "data_set_migration_record")
data class MigrationRecord(
@EmbeddedId val id: MigrationRecordKey,
var sourceData: String? = null,
var targetData: String? = null
)

@Embeddable
data class MigrationRecordKey(
@Column(name = "migration_id") val migrationId: Int,
@Column(name = "migration_key") val migrationKey: String
) : Serializable
14 changes: 12 additions & 2 deletions src/main/resources/db/migration/V1__CREATE_MIGRATION_RECORD.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
CREATE TABLE data_set_migration_run
(
id SERIAL PRIMARY KEY,
data_set_id VARCHAR(255) NOT NULL,
created_time TIMESTAMP NOT NULL,
updated_time TIMESTAMP NOT NULL,
completed_time TIMESTAMP
);

CREATE TABLE data_set_migration_record
(
data_set_id VARCHAR(255) NOT NULL,
migration_id INTEGER NOT NULL,
migration_key VARCHAR(255) NOT NULL,
source_data VARCHAR(1024),
target_data VARCHAR(1024),
PRIMARY KEY (data_set_id, migration_key)
PRIMARY KEY (migration_id, migration_key),
FOREIGN KEY (migration_id) REFERENCES data_set_migration_run (id)
);
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import reactor.test.StepVerifier
internal class DataSetServiceTest {
@Test
fun `start should throw on missing dataset`() {
Assertions.assertThatThrownBy { DataSetService(mock(), mock()).start("test-dataset") }
Assertions.assertThatThrownBy { DataSetService(mock(), mock(), mock()).start("test-dataset") }
.isExactlyInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("test-dataset")
}
Expand All @@ -29,23 +29,36 @@ class DataSetServiceIntegrationTest : DataSourceTest() {
@Inject
lateinit var service: DataSetService

@Inject
lateinit var runRepository: MigrationRunRepository

@Inject
lateinit var recordRepository: MigrationRecordRepository

@Test
fun `start can stream a source dataset`() {
StepVerifier.create(service.start("test-dataset"))
.assertNext {
assertThat(it).isEqualTo(DataSetResults(1))
.assertNext { run ->
assertThat(run.id).isNotNull
assertThat(run.dataSetId).isEqualTo("test-dataset")
assertThat(run.createdTime).isNotNull
assertThat(run.updatedTime).isNotNull
assertThat(run.completedTime).isNull()
assertThat(run.results).isEqualTo(DataSetResults(1))
}
.verifyComplete()

StepVerifier.create(recordRepository.findAll())
.assertNext {
MigrationRecord(
MigrationRecordKey("test-dataset", "sourcedatacount"),
sourceData = "b57448e19e0e383cdabaf669a4b85676bb7061e7f3720e57ea148a5735de957a"
)
.assertNext { record ->
assertThat(record.id.migrationId).isNotNull
assertThat(record.id.migrationKey).isEqualTo("sourcedatacount")
assertThat(record.sourceData).isEqualTo("b57448e19e0e383cdabaf669a4b85676bb7061e7f3720e57ea148a5735de957a")

StepVerifier.create(runRepository.findById(record.id.migrationId))
.assertNext {
assertThat(it.dataSetId).isEqualTo("test-dataset")
}
.verifyComplete()
}
.verifyComplete()
}
Expand Down

0 comments on commit 8100e91

Please sign in to comment.