Skip to content

Commit

Permalink
#6 Allow ability to start streaming a dataset off the source DB by it…
Browse files Browse the repository at this point in the history
…s name

- Currently automatically starts a hard-coded stream of a dataset at start-up
- Each row is just toStringed right now; obviously not a real implementation
- Doesn't go anywhere; just logged
  • Loading branch information
chadlwilson committed Oct 7, 2021
1 parent 94d2f7e commit a9e91a5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion")
implementation("javax.annotation:javax.annotation-api")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")

implementation("io.micronaut.flyway:micronaut-flyway")

Expand All @@ -62,6 +63,7 @@ dependencies {
testImplementation("org.testcontainers:postgresql")
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.mockito:mockito-core")
testImplementation("org.mockito:mockito-inline:3.9.0")
testImplementation("org.mockito:mockito-junit-jupiter")
testImplementation("org.mockito.kotlin:mockito-kotlin:3.2.0")
testImplementation("io.projectreactor:reactor-test:3.4.10")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
package com.thoughtworks.recce.server.dataset

class DataSetService
import com.thoughtworks.recce.server.config.ReconciliationConfiguration
import io.micronaut.discovery.event.ServiceReadyEvent
import io.micronaut.runtime.event.annotation.EventListener
import io.micronaut.scheduling.annotation.Async
import jakarta.inject.Inject
import jakarta.inject.Singleton
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

private val logger = KotlinLogging.logger {}

@Singleton
open class DataSetService(@Inject val config: ReconciliationConfiguration) {
fun start(dataSetName: String): Flux<String> {

val source = config.datasets[dataSetName]?.source
?: throw IllegalArgumentException("[$dataSetName] not found!")

logger.info { "Streaming [$dataSetName] from [source]..." }

return Mono.from(source.dbOperations.connectionFactory().create())
.flatMapMany { it.createStatement(source.query).execute() }
.flatMap { result -> result.map { row, _ -> row.toString() } }
}

@EventListener
@Async
open fun doOnStart(event: ServiceReadyEvent): Flux<String> {
return start("test-dataset")
.doOnEach { logger.info { it } }
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
package com.thoughtworks.recce.server.dataset

import com.thoughtworks.recce.server.config.DataSourceTest
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import io.r2dbc.spi.*
import jakarta.inject.Inject
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction
import org.junit.jupiter.api.Test
import org.mockito.kotlin.mock
import reactor.test.StepVerifier

internal class DataSetServiceTest {
@Test
fun `should load configurations`() {
fun `start should throw on missing dataset`() {
Assertions.assertThatThrownBy { DataSetService(mock()).start("test-dataset") }
.isExactlyInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("test-dataset")
}
}

@MicronautTest(
environments = arrayOf("test-integration"),
propertySources = arrayOf("classpath:config/application-test-dataset.yml")
)
class DataSetServiceIntegrationTest : DataSourceTest() {
@Inject
lateinit var service: DataSetService

@Test
fun `start can stream a source dataset`() {
StepVerifier.create(service.start("test-dataset"))
.assertNext { assertThat(it).contains("name='COUNT(*)', value=3") }
.verifyComplete()
}

@Test
fun `should emit error on bad query`() {
transaction(sourceDb) { SchemaUtils.drop(TestData) }

StepVerifier.create(service.start("test-dataset"))
.expectError(R2dbcBadGrammarException::class.java)
.verify()
}
}

0 comments on commit a9e91a5

Please sign in to comment.