Skip to content

Commit

Permalink
#6 - Factor out management of MigrationRun lifecycle into a separat…
Browse files Browse the repository at this point in the history
…e service and set `completedTime` on runs

Not sure if have this right, but trying to turn the `ReconciliationService` into having more responsibility for setting up the pipeline rather than tracking/managing lifecycle
  • Loading branch information
chadlwilson committed Oct 15, 2021
1 parent bd4f3f2 commit 9674f10
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,64 +1,20 @@
package com.thoughtworks.recce.server.dataset

import com.thoughtworks.recce.server.config.DataLoadDefinition
import com.thoughtworks.recce.server.config.ReconciliationConfiguration
import io.micronaut.discovery.event.ServiceReadyEvent
import io.micronaut.runtime.event.annotation.EventListener
import io.micronaut.scheduling.annotation.Async
import jakarta.inject.Inject
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2
import java.time.LocalDateTime

private val logger = KotlinLogging.logger {}

@Singleton
open class MigrationRunService(
@Inject private val config: ReconciliationConfiguration,
private val runRepository: MigrationRunRepository,
private val recordRepository: MigrationRecordRepository
) {
fun start(dataSetId: String): Mono<MigrationRun> {

val source = config.datasets[dataSetId]?.source
?: throw IllegalArgumentException("[$dataSetId] not found!")

logger.info { "Starting reconciliation run for [$dataSetId]..." }

val migrationRun = runRepository
.save(MigrationRun(dataSetId))
.doOnNext { logger.info { "Starting reconciliation run for $it}..." } }
.cache()

return streamFromSource(source, migrationRun)
.count()
.zipWith(migrationRun)
.map { (count, run) -> run.apply { results = DataSetResults(count) } }
}

private fun streamFromSource(source: DataLoadDefinition, run: Mono<MigrationRun>): Flux<MigrationRecord> {
return Flux.usingWhen(
source.dbOperations.connectionFactory().create(),
{ it.createStatement(source.query).execute() },
{ it.close() }
)
.flatMap { result -> result.map(HashedRow::fromRow) }
.zipWith(run.repeat())
.map { (row, run) ->
MigrationRecord(MigrationRecordKey(run.id!!, row.migrationKey)).apply {
sourceData = row.hashedValue
}
}
.flatMap { record -> recordRepository.save(record) }
}

@EventListener
@Async
open fun doOnStart(event: ServiceReadyEvent): Mono<MigrationRun> {
return start("test-dataset")
.doOnEach { logger.info { it.toString() } }
}
open class MigrationRunService(private val runRepository: MigrationRunRepository) {
fun start(dataSetId: String): Mono<MigrationRun> = runRepository
.save(MigrationRun(dataSetId))
.doOnNext { logger.info { "Starting reconciliation run for $it}..." } }
.cache()

fun complete(run: MigrationRun): Mono<MigrationRun> =
runRepository.update(run.apply { completedTime = LocalDateTime.now() })
.doOnNext { logger.info { "Run completed for $it" } }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.thoughtworks.recce.server.dataset

import com.thoughtworks.recce.server.config.DataLoadDefinition
import com.thoughtworks.recce.server.config.ReconciliationConfiguration
import io.micronaut.discovery.event.ServiceReadyEvent
import io.micronaut.runtime.event.annotation.EventListener
import io.micronaut.scheduling.annotation.Async
import jakarta.inject.Inject
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2

private val logger = KotlinLogging.logger {}

@Singleton
open class ReconciliationService(
@Inject private val config: ReconciliationConfiguration,
@Inject private val runService: MigrationRunService,
private val recordRepository: MigrationRecordRepository
) {
fun runFor(dataSetId: String): Mono<MigrationRun> {

val source = config.datasets[dataSetId]?.source
?: throw IllegalArgumentException("[$dataSetId] not found!")

logger.info { "Starting reconciliation run for [$dataSetId]..." }

val migrationRun = runService.start(dataSetId)

return streamFromSource(source, migrationRun)
.count()
.flatMap { count -> migrationRun.map { it.apply { results = DataSetResults(count) } } }
.flatMap { runService.complete(it) }
}

private fun streamFromSource(source: DataLoadDefinition, run: Mono<MigrationRun>): Flux<MigrationRecord> {
return Flux.usingWhen(
source.dbOperations.connectionFactory().create(),
{ it.createStatement(source.query).execute() },
{ it.close() }
)
.flatMap { result -> result.map(HashedRow::fromRow) }
.zipWith(run.repeat())
.map { (row, run) ->
MigrationRecord(MigrationRecordKey(run.id!!, row.migrationKey)).apply {
sourceData = row.hashedValue
}
}
.flatMap { record -> recordRepository.save(record) }
}

@EventListener
@Async
open fun doOnStart(event: ServiceReadyEvent): Mono<MigrationRun> {
return runFor("test-dataset")
.doOnEach { logger.info { it.toString() } }
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
package com.thoughtworks.recce.server.dataset

import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.time.LocalDateTime

internal class MigrationRunServiceTest {

private val startedRun = MigrationRun(1, dataSetName, LocalDateTime.now())

@Test
fun `start should throw on missing dataset`() {
assertThatThrownBy { MigrationRunService(mock(), mock(), mock()).start("test-dataset") }
.isExactlyInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("test-dataset")
fun `should return start results`() {

val runRepository = mock<MigrationRunRepository> {
on { save(any()) } doReturn Mono.just(startedRun)
}

val eventualRun = MigrationRunService(runRepository).start(dataSetName)

StepVerifier.create(eventualRun)
.expectNext(startedRun)
.verifyComplete()
}

@Test
fun `complete should set completed time`() {
val runRepository = mock<MigrationRunRepository> {
on { update(any()) } doReturn Mono.just(startedRun)
}

StepVerifier.create(MigrationRunService(runRepository).complete(startedRun))
.assertNext {
assertThat(it.completedTime).isAfterOrEqualTo(it.createdTime)
}
.verifyComplete()
}

companion object {
private const val dataSetName = "my-dataset"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import reactor.util.function.Tuples
environments = arrayOf("test-integration"),
propertySources = arrayOf("classpath:config/application-test-dataset.yml")
)
class MigrationRunServiceIntegrationTest : DataSourceTest() {
class ReconciliationServiceIntegrationTest : DataSourceTest() {
@Inject
lateinit var service: MigrationRunService
lateinit var service: ReconciliationService

@Inject
lateinit var runRepository: MigrationRunRepository
Expand All @@ -28,13 +28,13 @@ class MigrationRunServiceIntegrationTest : DataSourceTest() {

@Test
fun `start can stream a source dataset`() {
StepVerifier.create(service.start("test-dataset"))
StepVerifier.create(service.runFor("test-dataset"))
.assertNext { run ->
assertThat(run.id).isNotNull
assertThat(run.dataSetId).isEqualTo("test-dataset")
assertThat(run.createdTime).isNotNull
assertThat(run.updatedTime).isNotNull
assertThat(run.completedTime).isNull()
assertThat(run.updatedTime).isAfterOrEqualTo(run.createdTime)
assertThat(run.completedTime).isAfterOrEqualTo(run.createdTime)
assertThat(run.results).isEqualTo(DataSetResults(3))
}
.verifyComplete()
Expand Down Expand Up @@ -70,7 +70,7 @@ class MigrationRunServiceIntegrationTest : DataSourceTest() {
fun `should emit error on bad query`() {
transaction(sourceDb) { SchemaUtils.drop(TestData) }

StepVerifier.create(service.start("test-dataset"))
StepVerifier.create(service.runFor("test-dataset"))
.expectError(R2dbcBadGrammarException::class.java)
.verify()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.thoughtworks.recce.server.dataset

import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
import org.mockito.kotlin.mock

internal class ReconciliationServiceTest {
@Test
fun `start should throw on missing dataset`() {
assertThatThrownBy { ReconciliationService(mock(), mock(), mock()).runFor("test-dataset") }
.isExactlyInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("test-dataset")
}
}

0 comments on commit 9674f10

Please sign in to comment.