From f2b0bde617d7c830a7ec18604b62e486265b7276 Mon Sep 17 00:00:00 2001 From: Steve Mosley Date: Wed, 27 Sep 2023 17:26:21 +1300 Subject: [PATCH] Manager - Kafka Consumer for Observations (#97) * Manager - Hilltop Crawler Changes * Shuffle files around * martin/aggregate water use (#95) * add water use daily aggregation sql view * optimise sql query performance * Adding Error handling to Observations Consumer --------- Co-authored-by: Martin Peak --- packages/Manager/Dockerfile | 2 +- packages/Manager/build.gradle.kts | 11 +- .../main/kotlin/nz/govt/eop/Application.kt | 2 +- .../hilltop_crawler/HilltopCrawlerConsumer.kt | 152 ---------- .../hilltop_crawler/HilltopSitesParser.kt | 38 --- .../hilltop_crawler/HilltopXmlParsers.kt | 79 ------ .../observations/ObservationMessages.kt | 75 +++++ .../observations/ObservationStore.kt | 157 +++++++++++ .../observations/ObservationsConsumer.kt | 47 ++++ .../kotlin/nz/govt/eop/messages/Messages.kt | 12 - .../src/main/resources/application-prod.yml | 10 + .../src/main/resources/application.yml | 18 ++ .../migration/V0029__observations_changes.sql | 15 + ...served_water_use_aggregated_daily_view.sql | 27 ++ .../observations/ObservationsConsumerTest.kt | 261 ++++++++++++++++++ 15 files changed, 618 insertions(+), 288 deletions(-) delete mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopCrawlerConsumer.kt delete mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopSitesParser.kt delete mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopXmlParsers.kt create mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationMessages.kt create mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationStore.kt create mode 100644 packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumer.kt create mode 100644 packages/Manager/src/main/resources/db/migration/V0029__observations_changes.sql create mode 100644 packages/Manager/src/main/resources/db/migration/V0030__observed_water_use_aggregated_daily_view.sql create mode 100644 packages/Manager/src/test/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumerTest.kt diff --git a/packages/Manager/Dockerfile b/packages/Manager/Dockerfile index a2d16c3c5..ae80371c0 100644 --- a/packages/Manager/Dockerfile +++ b/packages/Manager/Dockerfile @@ -9,7 +9,7 @@ WORKDIR application COPY --from=builder application/dependencies/ ./ COPY --from=builder application/spring-boot-loader/ ./ COPY --from=builder application/application/ ./ -ENTRYPOINT ["java", "-XX:MaxRAMPercentage=85", "org.springframework.boot.loader.JarLauncher"] +ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80", "org.springframework.boot.loader.JarLauncher"] EXPOSE 8080 HEALTHCHECK --interval=5s --retries=5 --start-period=60s CMD curl --fail http://localhost:8080/actuator/health || exit 1 diff --git a/packages/Manager/build.gradle.kts b/packages/Manager/build.gradle.kts index 773c6aefb..37046c783 100644 --- a/packages/Manager/build.gradle.kts +++ b/packages/Manager/build.gradle.kts @@ -2,11 +2,11 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile import org.jooq.meta.jaxb.ForcedType plugins { - id("org.springframework.boot") version "3.0.5" - id("io.spring.dependency-management") version "1.1.0" - kotlin("jvm") version "1.7.22" - kotlin("plugin.spring") version "1.7.22" - id("com.diffplug.spotless") version "6.17.0" + id("org.springframework.boot") version "3.1.3" + id("io.spring.dependency-management") version "1.1.3" + kotlin("jvm") version "1.8.22" + kotlin("plugin.spring") version "1.8.22" + id("com.diffplug.spotless") version "6.18.0" id("org.flywaydb.flyway") version "9.1.6" id("nu.studer.jooq") version "8.0" id("com.adarshr.test-logger") version "3.2.0" @@ -35,6 +35,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-jdbc") implementation("org.springframework.boot:spring-boot-starter-jooq") implementation("org.springframework.kafka:spring-kafka") + implementation("org.apache.kafka:kafka-streams") implementation("io.micrometer:micrometer-tracing-bridge-brave") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/Application.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/Application.kt index abb16ea07..7c27df4fc 100644 --- a/packages/Manager/src/main/kotlin/nz/govt/eop/Application.kt +++ b/packages/Manager/src/main/kotlin/nz/govt/eop/Application.kt @@ -15,7 +15,7 @@ import org.springframework.web.client.RestTemplate @EnableKafka @EnableCaching @SpringBootApplication -class Application() { +class Application { @Bean fun restTemplate(): RestTemplate = RestTemplateBuilder().build() } diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopCrawlerConsumer.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopCrawlerConsumer.kt deleted file mode 100644 index f49c4ae4e..000000000 --- a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopCrawlerConsumer.kt +++ /dev/null @@ -1,152 +0,0 @@ -package nz.govt.eop.consumers.hilltop_crawler - -import java.math.BigDecimal -import java.time.LocalDateTime -import java.time.OffsetDateTime -import java.time.ZoneOffset -import net.postgis.jdbc.geometry.Geometry -import nz.govt.eop.messages.HilltopDataMessage -import nz.govt.eop.si.jooq.tables.Councils.Companion.COUNCILS -import nz.govt.eop.si.jooq.tables.ObservationSites.Companion.OBSERVATION_SITES -import nz.govt.eop.si.jooq.tables.ObservationSitesMeasurements.Companion.OBSERVATION_SITES_MEASUREMENTS -import nz.govt.eop.si.jooq.tables.Observations.Companion.OBSERVATIONS -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.jooq.DSLContext -import org.jooq.impl.DSL.* -import org.springframework.context.annotation.Profile -import org.springframework.kafka.annotation.KafkaListener -import org.springframework.stereotype.Component - -const val HILLTOP_RAW_DATA_TOPIC_NAME = "hilltop.raw" - -fun nzstFromString(value: String): OffsetDateTime = - OffsetDateTime.of(LocalDateTime.parse(value), ZoneOffset.of("+12")) - -@Profile("hilltop-consumer") -@Component -class HilltopCrawlerConsumer(val context: DSLContext) { - - @KafkaListener( - id = "nz.govt.eop.consumers.hilltop-raw", - topics = [HILLTOP_RAW_DATA_TOPIC_NAME], - concurrency = "16") - fun processNewMessage(message: ConsumerRecord) { - - when (message.value().type) { - "SITES_LIST" -> processSites(message.value()) - "MEASUREMENT_DATA" -> processMeasurements(message.value()) - } - } - - fun processSites(message: HilltopDataMessage) { - val hilltopSitesXml = HilltopSitesParser.parseSites(message.xml) - - hilltopSitesXml.validSites().forEach { - context - .insertInto( - OBSERVATION_SITES, - OBSERVATION_SITES.COUNCIL_ID, - OBSERVATION_SITES.NAME, - OBSERVATION_SITES.LOCATION, - OBSERVATION_SITES.CREATED_AT, - OBSERVATION_SITES.UPDATED_AT) - .select( - select( - COUNCILS.ID, - value(it.name), - if ("NZMG".equals(hilltopSitesXml.projection)) { - field( - "ST_TRANSFORM(ST_GEOMFROMTEXT('POINT(${it.easting} ${it.northing})', 27200), 2193)", - Geometry::class.java) - } else { - field( - "ST_GEOMFROMTEXT('POINT(${it.easting} ${it.northing})', 2193)", - Geometry::class.java) - }, - value(message.at.atOffset(ZoneOffset.UTC)), - value(message.at.atOffset(ZoneOffset.UTC))) - .from(COUNCILS) - .where(COUNCILS.STATS_NZ_ID.eq(message.councilId))) - .onConflict(OBSERVATION_SITES.COUNCIL_ID, OBSERVATION_SITES.NAME) - .doUpdate() - .set(OBSERVATION_SITES.UPDATED_AT, message.at.atOffset(ZoneOffset.UTC)) - .execute() - } - } - - fun processMeasurements(message: HilltopDataMessage) { - - val measurementData = HilltopXmlParsers.parseSiteMeasurementData(message.xml.toByteArray()) - - val siteId = - context - .select(OBSERVATION_SITES.ID) - .from(OBSERVATION_SITES) - .where( - OBSERVATION_SITES.COUNCIL_ID.eq( - context - .select(COUNCILS.ID) - .from(COUNCILS) - .where(COUNCILS.STATS_NZ_ID.eq(message.councilId)))) - .and(OBSERVATION_SITES.NAME.eq(measurementData.siteId)) - .fetchOne(OBSERVATION_SITES.ID) - ?: throw Exception( - "Site ID not found for ${message.councilId} ${measurementData.siteId}") - - if (measurementData.measurements.isNotEmpty()) { - val siteMeasurementsId = - context - .insertInto( - OBSERVATION_SITES_MEASUREMENTS, - OBSERVATION_SITES_MEASUREMENTS.SITE_ID, - OBSERVATION_SITES_MEASUREMENTS.MEASUREMENT_NAME, - OBSERVATION_SITES_MEASUREMENTS.FIRST_OBSERVATION_AT, - OBSERVATION_SITES_MEASUREMENTS.LAST_OBSERVATION_AT, - OBSERVATION_SITES_MEASUREMENTS.OBSERVATION_COUNT, - OBSERVATION_SITES_MEASUREMENTS.CREATED_AT, - OBSERVATION_SITES_MEASUREMENTS.UPDATED_AT) - .values( - siteId, - measurementData.measurementName, - nzstFromString(measurementData.measurements.first().timestamp), - nzstFromString(measurementData.measurements.last().timestamp), - measurementData.measurements.size, - message.at.atOffset(ZoneOffset.UTC), - message.at.atOffset(ZoneOffset.UTC)) - .onConflict( - OBSERVATION_SITES_MEASUREMENTS.SITE_ID, - OBSERVATION_SITES_MEASUREMENTS.MEASUREMENT_NAME) - .doUpdate() - .set(OBSERVATION_SITES_MEASUREMENTS.UPDATED_AT, message.at.atOffset(ZoneOffset.UTC)) - .returningResult(OBSERVATION_SITES_MEASUREMENTS.ID) - .fetch() - - val lists = - measurementData.measurements - .map { - row( - siteMeasurementsId[0].value1(), - nzstFromString(it.timestamp), - BigDecimal(it.value), - message.at.atOffset(ZoneOffset.UTC), - message.at.atOffset(ZoneOffset.UTC)) - } - .chunked(1000) - - lists.forEach { it -> - context - .insertInto( - OBSERVATIONS, - OBSERVATIONS.OBSERVATION_MEASUREMENT_ID, - OBSERVATIONS.OBSERVED_AT, - OBSERVATIONS.AMOUNT, - OBSERVATIONS.CREATED_AT, - OBSERVATIONS.UPDATED_AT) - .valuesOfRows(it) - .onConflict(OBSERVATIONS.OBSERVATION_MEASUREMENT_ID, OBSERVATIONS.OBSERVED_AT) - .doNothing() - .execute() - } - } - } -} diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopSitesParser.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopSitesParser.kt deleted file mode 100644 index 66144626f..000000000 --- a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopSitesParser.kt +++ /dev/null @@ -1,38 +0,0 @@ -package nz.govt.eop.consumers.hilltop_crawler - -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.dataformat.xml.XmlMapper -import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper -import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty -import com.fasterxml.jackson.module.kotlin.KotlinModule - -data class HilltopSitesXml( - @JacksonXmlProperty(localName = "Agency") val agency: String, - @JacksonXmlProperty(localName = "Projection") val projection: String?, - @JacksonXmlProperty(localName = "Site") - @JacksonXmlElementWrapper(useWrapping = false) - val sites: List = arrayListOf() -) { - fun validSites(): List = sites.filter { it.isValidSite() } -} - -data class HilltopSiteXml( - @JacksonXmlProperty(localName = "Name", isAttribute = true) val name: String, - @JacksonXmlProperty(localName = "Easting") val easting: Int?, - @JacksonXmlProperty(localName = "Northing") val northing: Int? -) { - fun isValidSite(): Boolean = easting != null && northing != null -} - -object HilltopSitesParser { - - val xmlMapper = - XmlMapper.builder() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .addModule(KotlinModule.Builder().build()) - .build() - - fun parseSites(data: String): HilltopSitesXml { - return xmlMapper.readValue(data, HilltopSitesXml::class.java) - } -} diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopXmlParsers.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopXmlParsers.kt deleted file mode 100644 index 1b33d4664..000000000 --- a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/hilltop_crawler/HilltopXmlParsers.kt +++ /dev/null @@ -1,79 +0,0 @@ -package nz.govt.eop.consumers.hilltop_crawler - -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.dataformat.xml.XmlMapper -import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper -import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty -import com.fasterxml.jackson.module.kotlin.KotlinModule - -data class HilltopMeasurementDataXml( - @JacksonXmlProperty(localName = "Measurement") - val measurement: HilltopMeasurementDataMeasurement -) - -data class HilltopMeasurementDataMeasurement( - @JacksonXmlProperty(localName = "SiteName", isAttribute = true) val siteId: String, - @JacksonXmlProperty(localName = "DataSource") val dataSource: DataSource, - @JacksonXmlProperty(localName = "Data") val data: Data -) - -data class DataSource(@JacksonXmlProperty(localName = "Name", isAttribute = true) val name: String) - -data class Data( - @JacksonXmlProperty(localName = "DateFormat", isAttribute = true) val dateFormat: String, - @JacksonXmlProperty(localName = "E") - @JacksonXmlElementWrapper(useWrapping = false) - val values: List -) - -data class Value( - @JacksonXmlProperty(localName = "T") val timestamp: String, - @JacksonXmlProperty(localName = "I1") val value1: String?, - @JacksonXmlProperty(localName = "Value") val value2: String?, - @JacksonXmlProperty(localName = "Parameter") - @JacksonXmlElementWrapper(useWrapping = false) - val parameters: List? -) - -data class Parameter( - @JacksonXmlProperty(localName = "Name") val name: String, - @JacksonXmlProperty(localName = "Value") val value: String -) - -data class Measurement( - val timestamp: String, - val value: String?, - val parameters: Map? -) - -data class MeasurementData( - val siteId: String, - val measurementName: String, - val measurements: List -) - -object HilltopXmlParsers { - - val xmlMapper = - XmlMapper.builder() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .addModule(KotlinModule.Builder().build()) - .build() - - fun parseSiteMeasurementData(data: ByteArray): MeasurementData { - val hilltopXML = xmlMapper.readValue(data, HilltopMeasurementDataXml::class.java) - - return MeasurementData( - hilltopXML.measurement.siteId, - hilltopXML.measurement.dataSource.name, - hilltopXML.measurement.data.values.map { - val value: String? = it.value1 ?: it.value2 - Measurement( - it.timestamp, - value, - if (it.parameters != null) - it.parameters.map { param -> Pair(param.name, param.value) }.toMap() - else null) - }) - } -} diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationMessages.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationMessages.kt new file mode 100644 index 000000000..2a13e9a60 --- /dev/null +++ b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationMessages.kt @@ -0,0 +1,75 @@ +package nz.govt.eop.consumers.observations + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import java.math.BigDecimal +import java.time.OffsetDateTime +import java.time.YearMonth + +enum class ObservationMessageType { + SITE_DETAILS, + OBSERVATION_DATA, +} + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type") +@JsonSubTypes( + JsonSubTypes.Type(value = SiteMessageKey::class, name = "SITE_DETAILS"), + JsonSubTypes.Type(value = ObservationDataMessageKey::class, name = "OBSERVATION_DATA"), +) +abstract class ObservationMessageKey(val type: ObservationMessageType) { + abstract val councilId: Int + abstract val siteName: String +} + +data class SiteMessageKey( + override val councilId: Int, + override val siteName: String, +) : ObservationMessageKey(ObservationMessageType.SITE_DETAILS) + +data class ObservationDataMessageKey( + override val councilId: Int, + override val siteName: String, + val measurementName: String, + val yearMonth: YearMonth +) : ObservationMessageKey(ObservationMessageType.OBSERVATION_DATA) + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type") +@JsonSubTypes( + JsonSubTypes.Type(value = SiteDetailsMessage::class, name = "SITE_DETAILS"), + JsonSubTypes.Type(value = ObservationDataMessage::class, name = "OBSERVATION_DATA"), +) +abstract class ObservationMessage(val type: ObservationMessageType) { + abstract val councilId: Int + abstract val siteName: String + + abstract fun toKey(): ObservationMessageKey +} + +data class SiteDetailsMessage( + override val councilId: Int, + override val siteName: String, + val location: Location? +) : ObservationMessage(ObservationMessageType.SITE_DETAILS) { + override fun toKey() = + SiteMessageKey( + councilId, + siteName, + ) +} + +data class Location(val easting: Int, val northing: Int) + +data class ObservationDataMessage( + override val councilId: Int, + override val siteName: String, + val measurementName: String, + val observations: List +) : ObservationMessage(ObservationMessageType.OBSERVATION_DATA) { + override fun toKey() = + ObservationDataMessageKey( + councilId, siteName, measurementName, YearMonth.from(observations.first().observedAt)) +} + +data class Observation(val observedAt: OffsetDateTime, val value: BigDecimal) diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationStore.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationStore.kt new file mode 100644 index 000000000..ae77bb6a3 --- /dev/null +++ b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationStore.kt @@ -0,0 +1,157 @@ +package nz.govt.eop.consumers.observations + +import java.sql.PreparedStatement +import java.sql.Timestamp +import java.sql.Types +import net.postgis.jdbc.geometry.Geometry +import net.postgis.jdbc.geometry.Point +import org.springframework.dao.EmptyResultDataAccessException +import org.springframework.jdbc.core.BatchPreparedStatementSetter +import org.springframework.jdbc.core.JdbcTemplate +import org.springframework.jdbc.core.SingleColumnRowMapper +import org.springframework.jdbc.support.GeneratedKeyHolder +import org.springframework.jdbc.support.KeyHolder +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional + +@Component +class ObservationStore(private val jdbcTemplate: JdbcTemplate) { + + val SELECT_SITE_QUERY = + "SELECT id FROM observation_sites WHERE council_id = (SELECT id FROM councils WHERE stats_nz_id = ?) AND name = ? FOR UPDATE" + + val UPDATE_SITE_QUERY = + "UPDATE observation_sites SET location = ?, updated_at = NOW() WHERE id = ?" + + val INSERT_SITE_QUERY = + "INSERT INTO observation_sites (council_id, name, location) VALUES ((SELECT id FROM councils WHERE stats_nz_id = ?), ?, ?)" + + val SELECT_MEASUREMENT_ID_QUERY = + """ + SELECT id FROM observation_sites_measurements + WHERE site_id = (SELECT id FROM observation_sites WHERE council_id = ( + SELECT id FROM councils WHERE stats_nz_id = ?) AND name = ?) + AND measurement_name = ? + """ + + val INSERT_MEASUREMENT_QUERY = + """ + INSERT INTO observation_sites_measurements(site_id, measurement_name, first_observation_at, last_observation_at, observation_count) + VALUES ((SELECT id FROM observation_sites WHERE council_id = (SELECT id FROM councils WHERE stats_nz_id = ?) AND name = ?), ?, ?, ?, ?) + RETURNING id + """ + + val BATCH_OBSERVATION_QUERY = + """ + INSERT INTO observations (observation_measurement_id, observed_at, amount) + VALUES (?, ?, ?) + ON CONFLICT (observation_measurement_id, observed_at) + DO UPDATE SET amount = EXCLUDED.amount, updated_at = NOW() + """ + + @Transactional + fun storeSite(councilStatsId: Int, siteName: String, location: Location?) { + val point = location?.let { createPoint(it.easting, it.northing) } + + val existingSiteId = fetchExistingSiteId(councilStatsId, siteName) + existingSiteId?.let { updateSite(it, point) } ?: insertSite(councilStatsId, siteName, point) + } + + @Transactional + fun storeObservations( + councilStatsId: Int, + siteName: String, + measurementName: String, + observations: List + ) { + var measurementId = getMeasurementId(councilStatsId, siteName, measurementName) + + measurementId = + measurementId ?: insertMeasurement(councilStatsId, siteName, measurementName, observations) + + insertObservations(measurementId, observations) + } + + private fun getMeasurementId( + councilStatsId: Int, + siteName: String, + measurementName: String + ): Int? { + return try { + jdbcTemplate.queryForObject( + SELECT_MEASUREMENT_ID_QUERY, Int::class.java, councilStatsId, siteName, measurementName) + } catch (ex: EmptyResultDataAccessException) { + null + } + } + + private fun insertMeasurement( + councilStatsId: Int, + siteName: String, + measurementName: String, + observations: List + ): Int { + val keyHolder: KeyHolder = GeneratedKeyHolder() + + jdbcTemplate.update( + { connection -> + val ps = connection.prepareStatement(INSERT_MEASUREMENT_QUERY, arrayOf("id")) + ps.setInt(1, councilStatsId) + ps.setString(2, siteName) + ps.setString(3, measurementName) + ps.setTimestamp(4, Timestamp.from(observations.first().observedAt.toInstant())) + ps.setTimestamp(5, Timestamp.from(observations.last().observedAt.toInstant())) + ps.setInt(6, observations.size) + ps + }, + keyHolder) + + return keyHolder.keys?.get("id") as Int? + ?: throw IllegalStateException("No key found in KeyHolder after insert operation.") + } + + private fun insertObservations(measurementId: Int, observations: List) { + jdbcTemplate.batchUpdate( + BATCH_OBSERVATION_QUERY, + object : BatchPreparedStatementSetter { + override fun setValues(ps: PreparedStatement, i: Int) { + val observation = observations[i] + ps.setInt(1, measurementId) + ps.setTimestamp(2, Timestamp.from(observation.observedAt.toInstant())) + ps.setBigDecimal(3, observation.value) + } + + override fun getBatchSize() = observations.size + }) + } + + private fun fetchExistingSiteId(councilStatsId: Int, siteName: String): Int? { + return try { + jdbcTemplate.queryForObject( + SELECT_SITE_QUERY, SingleColumnRowMapper(), councilStatsId, siteName) + } catch (ex: EmptyResultDataAccessException) { + null + } + } + + private fun insertSite(councilStatsId: Int, siteName: String, point: Geometry?) { + jdbcTemplate.update(INSERT_SITE_QUERY) { ps: PreparedStatement -> + ps.setInt(1, councilStatsId) + ps.setString(2, siteName) + ps.setObject(3, point, Types.OTHER) + } + } + + private fun updateSite(existingSiteId: Int, point: Geometry?) { + jdbcTemplate.update(UPDATE_SITE_QUERY) { ps: PreparedStatement -> + ps.setObject(1, point, Types.OTHER) + ps.setInt(2, existingSiteId) + } + } +} + +fun createPoint(easting: Int, northing: Int): Point { + val point = Point(easting.toDouble(), northing.toDouble()) + point.setSrid(2193) + return point +} diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumer.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumer.kt new file mode 100644 index 000000000..89a9b1ced --- /dev/null +++ b/packages/Manager/src/main/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumer.kt @@ -0,0 +1,47 @@ +package nz.govt.eop.consumers.observations + +import mu.KotlinLogging +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.kstream.Consumed +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Profile +import org.springframework.dao.NonTransientDataAccessException +import org.springframework.kafka.annotation.EnableKafkaStreams +import org.springframework.kafka.support.serializer.JsonSerde +import org.springframework.stereotype.Component + +@Profile("hilltop-consumer") +@EnableKafkaStreams +@Component +class ObservationsConsumer(val store: ObservationStore) { + private val logger = KotlinLogging.logger {} + + @Autowired + fun buildPipeline(streamsBuilder: StreamsBuilder) { + + val messageStream = + streamsBuilder.stream( + "observations", + Consumed.with( + JsonSerde(ObservationMessageKey::class.java), + JsonSerde(ObservationMessage::class.java))) + + messageStream.foreach { _, value -> + try { + when (value) { + is SiteDetailsMessage -> { + store.storeSite(value.councilId, value.siteName, value.location) + } + is ObservationDataMessage -> { + store.storeObservations( + value.councilId, value.siteName, value.measurementName, value.observations) + } + } + } catch (e: NonTransientDataAccessException) { + logger.error(e) { + "Exception processing ${value.type} message ${value.councilId} / ${value.siteName}" + } + } + } + } +} diff --git a/packages/Manager/src/main/kotlin/nz/govt/eop/messages/Messages.kt b/packages/Manager/src/main/kotlin/nz/govt/eop/messages/Messages.kt index f43c41bb0..28ae0de74 100644 --- a/packages/Manager/src/main/kotlin/nz/govt/eop/messages/Messages.kt +++ b/packages/Manager/src/main/kotlin/nz/govt/eop/messages/Messages.kt @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.type.TypeFactory import java.math.BigDecimal import java.time.Instant import nz.govt.eop.consumers.WATER_ALLOCATION_TOPIC_NAME -import nz.govt.eop.consumers.hilltop_crawler.HILLTOP_RAW_DATA_TOPIC_NAME import org.apache.kafka.common.header.Headers enum class ConsentStatus { @@ -27,26 +26,15 @@ data class WaterAllocationMessage( val receivedAt: Instant ) -data class HilltopDataMessage( - val councilId: Int, - val hilltopUrl: String, - val type: String, - val xml: String, - val at: Instant -) - object KafkaMessageTypes { private var waterAllocationMessageType: JavaType = TypeFactory.defaultInstance().constructType(WaterAllocationMessage::class.java) - private var hilltopDataMessageType: JavaType = - TypeFactory.defaultInstance().constructType(HilltopDataMessage::class.java) @JvmStatic fun determineTypeFromTopicName(topic: String, data: ByteArray, headers: Headers): JavaType? { return when (topic) { WATER_ALLOCATION_TOPIC_NAME -> waterAllocationMessageType - HILLTOP_RAW_DATA_TOPIC_NAME -> hilltopDataMessageType else -> null } } diff --git a/packages/Manager/src/main/resources/application-prod.yml b/packages/Manager/src/main/resources/application-prod.yml index 2351bf29e..7effc59cb 100644 --- a/packages/Manager/src/main/resources/application-prod.yml +++ b/packages/Manager/src/main/resources/application-prod.yml @@ -11,6 +11,15 @@ spring: mechanism: SCRAM-SHA-512 jaas: config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_SASL_USERNAME}" password="${KAFKA_SASL_PASSWORD}"; +management: + endpoints: + enabled-by-default: false + web: + exposure: + include: "health" + endpoint: + health: + enabled: true logging: level: @@ -19,3 +28,4 @@ logging: org.jooq.Constants: OFF org.jooq: INFO org.springframework.web.servlet.resource.ResourceHttpRequestHandler: ERROR # this logs WARN's for paths that are probably being scanned by bots + org.apache.kafka: WARN diff --git a/packages/Manager/src/main/resources/application.yml b/packages/Manager/src/main/resources/application.yml index 9eb0b83bd..3c2c658a0 100644 --- a/packages/Manager/src/main/resources/application.yml +++ b/packages/Manager/src/main/resources/application.yml @@ -1,4 +1,6 @@ spring: + application: + name: manager main: banner-mode: off datasource: @@ -21,6 +23,21 @@ spring: spring.json.value.type.method: nz.govt.eop.messages.KafkaMessageTypes.determineTypeFromTopicName spring.json.trusted.packages: nz.govt.eop.* + streams: + properties: + num.stream.threads: 8 + max.poll.records: 100 + default: + key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + application-id: nz.govt.eop.consumers.manager +management: + endpoints: + enabled-by-default: true + web: + exposure: + include: "*" + server: compression: enabled: true @@ -32,3 +49,4 @@ logging: nz.govt.eop: INFO org.jooq.Constants: OFF org.jooq: INFO + org.apache.kafka: WARN diff --git a/packages/Manager/src/main/resources/db/migration/V0029__observations_changes.sql b/packages/Manager/src/main/resources/db/migration/V0029__observations_changes.sql new file mode 100644 index 000000000..ecddff736 --- /dev/null +++ b/packages/Manager/src/main/resources/db/migration/V0029__observations_changes.sql @@ -0,0 +1,15 @@ +ALTER TABLE observation_sites + ALTER COLUMN location DROP NOT NULL; + +ALTER TABLE observations + DROP CONSTRAINT observations_pkey; + +ALTER TABLE observations + DROP CONSTRAINT observations_observation_measurement_id_observed_at_key; + +ALTER TABLE observations + DROP COLUMN id; + +ALTER TABLE observations + ADD PRIMARY KEY (observation_measurement_id, observed_at); + diff --git a/packages/Manager/src/main/resources/db/migration/V0030__observed_water_use_aggregated_daily_view.sql b/packages/Manager/src/main/resources/db/migration/V0030__observed_water_use_aggregated_daily_view.sql new file mode 100644 index 000000000..c51581430 --- /dev/null +++ b/packages/Manager/src/main/resources/db/migration/V0030__observed_water_use_aggregated_daily_view.sql @@ -0,0 +1,27 @@ + +CREATE OR REPLACE VIEW observed_water_use_aggregated_daily AS + +WITH filtered_obs AS ( + SELECT osm.site_id, + date(o.observed_at) AS day_observed_at, + osm.measurement_name, + o.amount + FROM observations o + LEFT JOIN observation_sites_measurements osm ON o.observation_measurement_id = osm.id + WHERE (osm.measurement_name::text = ANY (ARRAY['Water Meter Reading'::text, 'Water Meter Volume'::text])) + AND o.observed_at > '2023-01-01 00:00:00+13'::timestamp with time zone + ) + SELECT filtered_obs.site_id, + filtered_obs.day_observed_at, + filtered_obs.measurement_name, + CASE + WHEN filtered_obs.measurement_name::text = 'Water Meter Reading'::text THEN max(filtered_obs.amount) - min(filtered_obs.amount) + WHEN filtered_obs.measurement_name::text = 'Water Meter Volume'::text THEN avg(filtered_obs.amount) * 86.4 + ELSE 0::numeric + END AS daily_usage + FROM filtered_obs + GROUP BY + filtered_obs.site_id, + filtered_obs.day_observed_at, + filtered_obs.measurement_name; + diff --git a/packages/Manager/src/test/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumerTest.kt b/packages/Manager/src/test/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumerTest.kt new file mode 100644 index 000000000..b36b22177 --- /dev/null +++ b/packages/Manager/src/test/kotlin/nz/govt/eop/consumers/observations/ObservationsConsumerTest.kt @@ -0,0 +1,261 @@ +package nz.govt.eop.consumers.observations + +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import java.math.BigDecimal +import java.sql.Timestamp +import java.time.OffsetDateTime +import java.util.Properties +import net.postgis.jdbc.PGgeometry +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.TestInputTopic +import org.apache.kafka.streams.TopologyTestDriver +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase +import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest +import org.springframework.jdbc.core.JdbcTemplate +import org.springframework.kafka.support.serializer.JsonSerde +import org.springframework.test.context.ActiveProfiles +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional + +@ActiveProfiles("test") +@JdbcTest +@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) +class ObservationsConsumerTest(@Autowired private val jdbcTemplate: JdbcTemplate) { + + private final val inputTopic: TestInputTopic + + init { + val streamsBuilder = StreamsBuilder() + + // Class under test + ObservationsConsumer(ObservationStore(jdbcTemplate)).buildPipeline(streamsBuilder) + + inputTopic = + TopologyTestDriver(streamsBuilder.build(), Properties()) + .createInputTopic( + "observations", + JsonSerde(ObservationMessageKey::class.java).noTypeInfo().serializer(), + JsonSerde(ObservationMessage::class.java).noTypeInfo().serializer()) + } + + @BeforeEach + fun cleanDb() { + jdbcTemplate.execute("TRUNCATE TABLE observations CASCADE") + jdbcTemplate.execute("TRUNCATE TABLE observation_sites_measurements CASCADE") + jdbcTemplate.execute("TRUNCATE TABLE observation_sites CASCADE") + } + + @Nested + open inner class SiteDetailsMessageHandling { + @Test + fun `should create site when none exists`() { + + // GIVEN + val message = SiteDetailsMessage(1, "A", Location(1, 2)) + + // WHEN + inputTopic.pipeInput(message.toKey(), message) + + // THEN + val result = jdbcTemplate.queryForMap("SELECT * FROM observation_sites") + result["council_id"] shouldBe 1 + result["name"] shouldBe "A" + result["location"] shouldBe PGgeometry(createPoint(1, 2)) + } + + @Test + fun `should create site with no location when none exists`() { + + // GIVEN + val message = SiteDetailsMessage(1, "B", null) + + // WHEN + inputTopic.pipeInput(message.toKey(), message) + + // THEN + val result = jdbcTemplate.queryForMap("SELECT * FROM observation_sites") + result["council_id"] shouldBe 1 + result["name"] shouldBe "B" + result["location"] shouldBe null + } + + @Transactional(propagation = Propagation.NOT_SUPPORTED) + @Test + open fun `should update site when one exists`() { + + // GIVEN + val firstMessage = SiteDetailsMessage(1, "C", null) + val secondMessage = SiteDetailsMessage(1, "C", Location(1, 2)) + + // WHEN + inputTopic.pipeInput(firstMessage.toKey(), firstMessage) + inputTopic.pipeInput(secondMessage.toKey(), secondMessage) + + // THEN + val result = jdbcTemplate.queryForMap("SELECT * FROM observation_sites") + result["council_id"] shouldBe 1 + result["name"] shouldBe "C" + result["location"] shouldBe PGgeometry(createPoint(1, 2)) + result["created_at"] shouldNotBe result["updated_at"] + } + + @Test + fun `should store correct council link for site`() { + // Know that id 18 is Marlborough District Council, it maps to 16 on the councils table + // and its a good example because 18 isn't an id on the councils table. + + // GIVEN + val message = SiteDetailsMessage(18, "SITE", null) + + // WHEN + inputTopic.pipeInput(message.toKey(), message) + + // THEN + val result = jdbcTemplate.queryForMap("SELECT * FROM observation_sites") + result["council_id"] shouldBe 16 + } + + @Test + @Transactional(propagation = Propagation.NOT_SUPPORTED) + open fun `should process messages after an error`() { + // GIVEN + val messageThatWillFail = SiteDetailsMessage(365, "SITE", null) + val message2 = SiteDetailsMessage(1, "SITE", null) + + // WHEN + inputTopic.pipeInput(messageThatWillFail.toKey(), messageThatWillFail) + inputTopic.pipeInput(message2.toKey(), message2) + + // THEN + val result = jdbcTemplate.queryForMap("SELECT * FROM observation_sites") + result["council_id"] shouldBe 1 + } + } + + @Nested + inner class ObservationDataMessageHandling { + + @Test + fun `should not throw error when site doesn't exist`() { + // GIVEN + val observationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2020-01-01T00:00:00Z"), BigDecimal.valueOf(1.0)))) + + // WHEN / THEN + inputTopic.pipeInput(observationMessage.toKey(), observationMessage) + } + + @Test + fun `should store observation data when measurement record does not exist`() { + // GIVEN + val siteMessage = SiteDetailsMessage(1, "SITE", null) + inputTopic.pipeInput(siteMessage.toKey(), siteMessage) + + val observationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2020-01-01T00:00:00Z"), BigDecimal.valueOf(6.6)))) + + // WHEN + inputTopic.pipeInput(observationMessage.toKey(), observationMessage) + + // THEN + val measurementRecord = + jdbcTemplate.queryForMap("SELECT * FROM observation_sites_measurements") + measurementRecord["measurement_name"] shouldBe "RAIN" + + val observationsResult = jdbcTemplate.queryForMap("SELECT * FROM observations") + observationsResult["observation_measurement_id"] shouldBe measurementRecord["id"] + observationsResult["observed_at"] shouldBe + Timestamp.from(OffsetDateTime.parse("2020-01-01T00:00:00Z").toInstant()) + observationsResult["amount"] shouldBe BigDecimal.valueOf(6.6) + } + + @Test + fun `should store observation data when measurement record already exists`() { + // GIVEN + val siteMessage = SiteDetailsMessage(1, "SITE", null) + inputTopic.pipeInput(siteMessage.toKey(), siteMessage) + + val firstObservationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2020-01-01T00:00:00Z"), BigDecimal.valueOf(1.0)))) + inputTopic.pipeInput(firstObservationMessage.toKey(), firstObservationMessage) + + val secondObservationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2021-01-01T00:00:00Z"), BigDecimal.valueOf(2.0)))) + + // WHEN + inputTopic.pipeInput(secondObservationMessage.toKey(), secondObservationMessage) + + // THEN + val observationsResult = + jdbcTemplate.queryForMap( + "SELECT * FROM observations WHERE observed_at > '2020-01-01T00:00:00Z'") + observationsResult["observed_at"] shouldBe + Timestamp.from(OffsetDateTime.parse("2021-01-01T00:00:00Z").toInstant()) + observationsResult["amount"] shouldBe BigDecimal.valueOf(2.0) + } + + @Test + fun `should update observation data when data already exists`() { + // GIVEN + val siteMessage = SiteDetailsMessage(1, "SITE", null) + inputTopic.pipeInput(siteMessage.toKey(), siteMessage) + + val firstObservationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2020-01-01T00:00:00Z"), BigDecimal.valueOf(1.0)))) + inputTopic.pipeInput(firstObservationMessage.toKey(), firstObservationMessage) + + val secondObservationMessage = + ObservationDataMessage( + 1, + "SITE", + "RAIN", + listOf( + Observation( + OffsetDateTime.parse("2020-01-01T00:00:00Z"), BigDecimal.valueOf(2.0)))) + + // WHEN + inputTopic.pipeInput(secondObservationMessage.toKey(), secondObservationMessage) + + // THEN + val observationsResult = jdbcTemplate.queryForMap("SELECT * FROM observations") + observationsResult["observed_at"] shouldBe + Timestamp.from(OffsetDateTime.parse("2020-01-01T00:00:00Z").toInstant()) + observationsResult["amount"] shouldBe BigDecimal.valueOf(2.0) + } + } +}