Skip to content

Commit

Permalink
Manager - Kafka Consumer for Observations (#97)
Browse files Browse the repository at this point in the history
* Manager - Hilltop Crawler Changes

* Shuffle files around

* martin/aggregate water use (#95)

* add water use daily aggregation sql view

* optimise sql query performance

* Adding Error handling to Observations Consumer

---------

Co-authored-by: Martin Peak <[email protected]>
  • Loading branch information
smozely and MartyP233 committed Dec 7, 2023
1 parent 8a805b2 commit b279a34
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 283 deletions.
2 changes: 1 addition & 1 deletion packages/Manager/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ WORKDIR application
COPY --from=builder application/dependencies/ ./
COPY --from=builder application/spring-boot-loader/ ./
COPY --from=builder application/application/ ./
ENTRYPOINT ["java", "-XX:MaxRAMPercentage=85", "org.springframework.boot.loader.launch.JarLauncher"]
ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80", "org.springframework.boot.loader.launch.JarLauncher"]

EXPOSE 8080
HEALTHCHECK --interval=5s --retries=5 --start-period=60s CMD curl --fail http://localhost:8080/actuator/health || exit 1
1 change: 1 addition & 0 deletions packages/Manager/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-jdbc")
implementation("org.springframework.boot:spring-boot-starter-jooq")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")
implementation("io.micrometer:micrometer-tracing-bridge-brave")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.springframework.web.client.RestTemplate
@EnableKafka
@EnableCaching
@SpringBootApplication
class Application() {
class Application {
@Bean fun restTemplate(): RestTemplate = RestTemplateBuilder().build()
}

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package nz.govt.eop.consumers.observations

import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.math.BigDecimal
import java.time.OffsetDateTime
import java.time.YearMonth

enum class ObservationMessageType {
SITE_DETAILS,
OBSERVATION_DATA,
}

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = SiteMessageKey::class, name = "SITE_DETAILS"),
JsonSubTypes.Type(value = ObservationDataMessageKey::class, name = "OBSERVATION_DATA"),
)
abstract class ObservationMessageKey(val type: ObservationMessageType) {
abstract val councilId: Int
abstract val siteName: String
}

data class SiteMessageKey(
override val councilId: Int,
override val siteName: String,
) : ObservationMessageKey(ObservationMessageType.SITE_DETAILS)

data class ObservationDataMessageKey(
override val councilId: Int,
override val siteName: String,
val measurementName: String,
val yearMonth: YearMonth
) : ObservationMessageKey(ObservationMessageType.OBSERVATION_DATA)

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = SiteDetailsMessage::class, name = "SITE_DETAILS"),
JsonSubTypes.Type(value = ObservationDataMessage::class, name = "OBSERVATION_DATA"),
)
abstract class ObservationMessage(val type: ObservationMessageType) {
abstract val councilId: Int
abstract val siteName: String

abstract fun toKey(): ObservationMessageKey
}

data class SiteDetailsMessage(
override val councilId: Int,
override val siteName: String,
val location: Location?
) : ObservationMessage(ObservationMessageType.SITE_DETAILS) {
override fun toKey() =
SiteMessageKey(
councilId,
siteName,
)
}

data class Location(val easting: Int, val northing: Int)

data class ObservationDataMessage(
override val councilId: Int,
override val siteName: String,
val measurementName: String,
val observations: List<Observation>
) : ObservationMessage(ObservationMessageType.OBSERVATION_DATA) {
override fun toKey() =
ObservationDataMessageKey(
councilId, siteName, measurementName, YearMonth.from(observations.first().observedAt))
}

data class Observation(val observedAt: OffsetDateTime, val value: BigDecimal)
Loading

0 comments on commit b279a34

Please sign in to comment.