Skip to content

Commit

Permalink
Hilltop Crawler (#98)
Browse files Browse the repository at this point in the history
* Manager - Hilltop Crawler Changes

* Shuffle files around

* Crawler - Updates

* Intergration testing

* Improve DB naming

* Big refactor of test setup

* Correct time to wait between quiet polls

* Add global error handling around task

* Add check to make sure Virtual Measurement matches Measurement Name

* Add a bunch to the readme

* Add views to aggregate daily allocations and usage by area (#100)

* Add views to aggregate consent allocations and usage by day

* Start writing tests

* Allow easier setup and assertion against test data, start adding more tests

* Add tests for various allocation scenarios

* Add failing test for aggregating data across different areas

* Simplify views

* Splitting out the calculation of daily useage by area

* Add more assertions for aggregation of different areas

* water_allocation.meter should correlate with observation_site.name

* Format code

* Feedback from PR review

---------

Co-authored-by: Steve Mosley <[email protected]>

* Remove unnesscary `yield`

* Adding index on next_fetch_at column

* Add some more readme details about the task queue

* Adding notes about partition sizes

* Improve note about how schedulign tasks works

---------

Co-authored-by: Vimal Jobanputra <[email protected]>
  • Loading branch information
smozely and vimto committed Dec 7, 2023
1 parent b279a34 commit f94fc47
Show file tree
Hide file tree
Showing 53 changed files with 3,759 additions and 547 deletions.
108 changes: 108 additions & 0 deletions packages/HilltopCrawler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Hilltop Crawler

## Description

This app extracts observation data from Hilltop servers and publishes them as to a Kafka `observations` topic for other
applications to consume. Which Hilltop servers and which types to extract are configured in a database table which can
be added to while the system is running.

The app intends to keep all versions of data pulled from the Hilltop. Allowing downstream systems to keep track of when
data has been updated in Hilltop. Noting that because Hilltop does not expose when changes were made we can only capture
when the crawler first saw data change. This is also intended to allow us to update some parts of the system without
having to re-read all of the data from hilltop.

## Process

Overall, this is built around the Hilltop API which exposes three levels of GET API request

* SITES_LIST — List of Sites, includes names and locations
* MEASUREMENTS_LIST — Per site, list of measurements available for that site, including details about the first and last
observation date
* MEASUREMENT_DATA — Per site, per measurement, the timeseries observed data

The app crawls these by reading each level to decide what to read from the next level. i.e., The SITES_LIST tells the
app which sites call MEASUREMENTS_LIST for which in turn tells which measurements to call MEASUREMENT_DATA for. Requests
for MEASUREMENT_DATA are also split into monthly chunks to avoid issues with too much data being returned in one
request.

The app keeps a list of API requests that it will keep up to date by calling that API on a schedule. This list is stored
in the `hilltop_fetch_tasks` table and works like a task queue. Each time a request is made, the result is used to try
and determine when next to schedule the task. The simple example is for MEASUREMENT_DATA if the last observation was
recent, then a refresh should be attempted soon, if it was a long way in the past, it should be refreshed less often.

The next schedule time has been implemented as a random time in an interval, to provide some jitter when between task
requeue times to hopefully spread them out, and the load on the servers we are calling.

The task queue also keeps meta-data about the previous history of tasks that are not used by the app this is to allow
engineers to monitor how the system is working.

This process is built around three main components:

* Every hour, monitor the configuration table
* Read the configuration table
* From the configuration, add any new tasks to the task queue

* Continuously monitor the task queue
* Read the next task to work on
* Fetch data from hilltop for that task
* If a valid result which is not the same as the previous version
* Queue any new tasks from the result
* Send the result to Kafka `hilltop_raw` topic
* Requeue the task for sometime in the future, based on the type of request

* Kafka streams component
* Monitor the stream
* For each message map it to either a `site_details` or and `observations` message

Currently, the Manager component listens to the `observations` topic and stores the data from that in a DB table.

## Task Queue

The task queue is currently a custom implementation on top of a single table `hilltop_fetch_tasks`.

This is a slightly specialized queue where

* Each task has a scheduled run time backed by the `next_fecth_at` column
* Each time a task runs it will be re-queued for some time in the future
* The same task can be added multiple times and rely on Postgres “ON CONFLICT DO NOTHING” to avoid the task being added
multiple times

The implementation relies on the postgres `SKIP LOCKED` feature to allow multiple worker threads to pull from the queue
at the same time without getting the same task.

See this [reference](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/) for
discussion about the `SKIP LOCKED` query.

The queue implementation is fairly simple for this specific use. If it becomes more of a generic work queue then a
standard implemention such as Quartz might be worthwhile moving to.

## Example Configuration

These are a couple of insert statements that are not stored in migration scripts, so developer machines don't index
them by default.

GW Water Use

```sql
INSERT INTO hilltop_sources (council_id, hts_url, configuration)
VALUES (9, 'https://hilltop.gw.govt.nz/WaterUse.hts',
'{ "measurementNames": ["Water Meter Volume", "Water Meter Reading"] }');
```

GW Rainfall

```sql
INSERT INTO hilltop_sources (council_id, hts_url, configuration)
VALUES (9, 'https://hilltop.gw.govt.nz/merged.hts', '{ "measurementNames": ["Rainfall"] }');
```

### TODO / Improvement

* The `previous_history` on tasks will grow unbounded. This needs to be capped
* The algorithm for determining the next time to schedule an API refresh could be improved, something could be built
from the previous history based on how often data is unchanged.
* To avoid hammering Hilltop there is rate limiting using a "token bucket" library, currently this uses one bucket for
all requests. It could be split to use one bucket per server
* Because of the chunking by month, every time new data arrives we end up storing the whole month up to that new data
again in the `hilltop_raw` topic. This seems wasteful and there are options for cleaning up when a record just
supersedes the previous. But cleaning up would mean losing some knowledge about when a observation was first seen.
21 changes: 5 additions & 16 deletions packages/HilltopCrawler/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ dependencies {
implementation("org.flywaydb:flyway-core:10.1.0")
implementation("org.flywaydb:flyway-database-postgresql:10.1.0")
implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
implementation("org.apache.kafka:kafka-streams")
implementation("com.bucket4j:bucket4j-core:8.3.0")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("io.kotest:kotest-assertions-core:5.4.2")
testImplementation("io.kotest:kotest-assertions-core:5.7.2")
testImplementation("io.kotest:kotest-assertions-json:5.7.2")
testImplementation("org.mockito.kotlin:mockito-kotlin:5.1.0")
}

// Don't repackage build in a "-plain" Jar
Expand All @@ -70,21 +74,6 @@ configure<com.diffplug.gradle.spotless.SpotlessExtension> {
kotlinGradle { ktfmt() }
}

val dbConfig =
mapOf(
"url" to
"jdbc:postgresql://${System.getenv("CONFIG_DATABASE_HOST") ?: "localhost"}:5432/eop_test",
"user" to "postgres",
"password" to "password")

flyway {
url = dbConfig["url"]
user = dbConfig["user"]
password = dbConfig["password"]
schemas = arrayOf("hilltop_crawler")
locations = arrayOf("filesystem:./src/main/resources/db/migration")
}

testlogger {
showStandardStreams = true
showPassedStandardStreams = false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,16 @@
package nz.govt.eop.hilltop_crawler

import java.security.MessageDigest
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.scheduling.annotation.EnableScheduling

@SpringBootApplication
@EnableScheduling
@EnableKafka
@EnableConfigurationProperties(ApplicationConfiguration::class)
class Application {

@Bean
fun jsonCustomizer(): Jackson2ObjectMapperBuilderCustomizer {
return Jackson2ObjectMapperBuilderCustomizer { _: Jackson2ObjectMapperBuilder -> }
}
}
class Application {}

fun main(args: Array<String>) {
System.setProperty("com.sun.security.enableAIAcaIssuers", "true")
runApplication<Application>(*args)
}

fun hashMessage(message: String) =
MessageDigest.getInstance("SHA-256").digest(message.toByteArray()).joinToString("") {
"%02x".format(it)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package nz.govt.eop.hilltop_crawler

import java.util.concurrent.TimeUnit
import nz.govt.eop.hilltop_crawler.api.requests.buildSiteListUrl
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.SITES_LIST
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

/**
* This task is responsible for triggering the first fetch task for each source stored in the DB.
*
* It makes sure any new rows added to the DB will start to be pulled from within an hour.
*
* Each time it runs, it will create the initial fetch task for each source found in the DB. This
* relies on the task queue (via "ON CONFLICT DO NOTHING") making sure that duplicate tasks will not
* be created.
*/
@Profile("!test")
@Component
class CheckForNewSourcesTask(val db: DB) {

@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.HOURS)
fun triggerSourcesTasks() {
val sources = db.listSources()

sources.forEach {
db.createFetchTask(DB.HilltopFetchTaskCreate(it.id, SITES_LIST, buildSiteListUrl(it.htsUrl)))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package nz.govt.eop.hilltop_crawler

const val HILLTOP_RAW_DATA_TOPIC_NAME = "hilltop.raw"
const val OUTPUT_DATA_TOPIC_NAME = "observations"

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package nz.govt.eop.hilltop_crawler.api

import io.github.bucket4j.Bandwidth
import io.github.bucket4j.BlockingBucket
import io.github.bucket4j.Bucket
import java.net.URI
import java.time.Duration
import mu.KotlinLogging
import org.springframework.stereotype.Component
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate

/**
* Simple wrapper around RestTemplate to limit the number of requests per second This is to avoid
* overloading the Hilltop servers
*
* Could be extended to have a bucket per host to increase throughput. But currently the bottleneck
* is when initially loading data from Hilltop, which ends up effectively processing one host at a
* time.
*/
@Component
class HilltopFetcher(val restTemplate: RestTemplate) {

private final val logger = KotlinLogging.logger {}

private final val bucketBandwidthLimit: Bandwidth = Bandwidth.simple(20, Duration.ofSeconds(1))
private final val bucket: BlockingBucket =
Bucket.builder().addLimit(bucketBandwidthLimit).build().asBlocking()

fun fetch(fetchRequest: URI): String? {
bucket.consume(1)
logger.trace { "Downloading $fetchRequest" }
return try {
restTemplate.getForObject(fetchRequest, String::class.java)
} catch (e: RestClientException) {
logger.info(e) { "Failed to download $fetchRequest" }
null
}
}
}
Loading

0 comments on commit f94fc47

Please sign in to comment.