Skip to content

Commit

Permalink
Crawler fix start of month (#149)
Browse files Browse the repository at this point in the history
* Tweak to limit amount of history kept.

* Extend measurement list so it will be on month ahead if there are recent
measurements

* Add new fetch task that is constantly refreshing the latest data

* Refactor to make comparsion more explicit

* Update README for recent changes
  • Loading branch information
smozely authored Nov 7, 2023
1 parent 78eb555 commit f4db299
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 75 deletions.
20 changes: 10 additions & 10 deletions packages/HilltopCrawler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ Overall, this is built around the Hilltop API which exposes three levels of GET
* SITES_LIST — List of Sites, includes names and locations
* MEASUREMENTS_LIST — Per site, list of measurements available for that site, including details about the first and last
observation date
* MEASUREMENT_DATA — Per site, per measurement, the timeseries observed data
* MEASUREMENT_DATA / MEASUREMENT_DATA_LATEST — Per site, per measurement, the timeseries observed data either historical or most recent

The app crawls these by reading each level to decide what to read from the next level. i.e., The SITES_LIST tells the
app which sites call MEASUREMENTS_LIST for which in turn tells which measurements to call MEASUREMENT_DATA for. Requests
app which sites call MEASUREMENTS_LIST which in turn tells the app which measurements to call MEASUREMENT_DATA for. Requests
for MEASUREMENT_DATA are also split into monthly chunks to avoid issues with too much data being returned in one
request.
request. If a site appears to be reporting data frequently then a MEASUREMENT_DATA_LATEST will be queued up to get data for the most recent 35 days.

The app keeps a list of API requests that it will keep up to date by calling that API on a schedule. This list is stored
in the `hilltop_fetch_tasks` table and works like a task queue. Each time a request is made, the result is used to try
and determine when next to schedule the task. The simple example is for MEASUREMENT_DATA if the last observation was
and determine when next to schedule the task. An example is for MEASUREMENT_DATA_LATEST if the last observation was
recent, then a refresh should be attempted soon, if it was a long way in the past, it should be refreshed less often.

The next schedule time has been implemented as a random time in an interval, to provide some jitter when between task
Expand All @@ -47,7 +47,8 @@ This process is built around three main components:
* Fetch data from hilltop for that task
* If a valid result which is not the same as the previous version
* Queue any new tasks from the result
* Send the result to Kafka `hilltop_raw` topic
* Send the result to Kafka `hilltop_raw` topic (note the Kafka topic does not distinguish between
MEASUREMENT_DATA / MEASUREMENT_DATA_LATEST it just calls them both MEASUREMENT_DATA)
* Requeue the task for sometime in the future, based on the type of request

* Kafka streams component
Expand All @@ -73,8 +74,8 @@ at the same time without getting the same task.
See this [reference](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/) for
discussion about the `SKIP LOCKED` query.

The queue implementation is fairly simple for this specific use. If it becomes more of a generic work queue then a
standard implemention such as Quartz might be worthwhile moving to.
The queue implementation is fairly simple for this specific use. If it becomes more of a generic work queue then a
standard implementation such as Quartz might be worthwhile moving to.

## Example Configuration

Expand All @@ -98,11 +99,10 @@ VALUES (9, 'https://hilltop.gw.govt.nz/merged.hts', '{ "measurementNames": ["Rai

### TODO / Improvement

* The `previous_history` on tasks will grow unbounded. This needs to be capped
* The algorithm for determining the next time to schedule an API refresh could be improved, something could be built
from the previous history based on how often data is unchanged.
* To avoid hammering Hilltop there is rate limiting using a "token bucket" library, currently this uses one bucket for
all requests. It could be split to use one bucket per server
* Because of the chunking by month, every time new data arrives we end up storing the whole month up to that new data
* Each time the latest measurement API is called we will receive data that has already been seen and processed previously and store it
again in the `hilltop_raw` topic. This seems wasteful and there are options for cleaning up when a record just
supersedes the previous. But cleaning up would mean losing some knowledge about when a observation was first seen.
supersedes the previous. But cleaning up could mean losing some knowledge about when a observation was first seen.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package nz.govt.eop.hilltop_crawler
import java.util.concurrent.TimeUnit
import nz.govt.eop.hilltop_crawler.api.requests.buildSiteListUrl
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.SITES_LIST
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
Expand All @@ -26,7 +26,9 @@ class CheckForNewSourcesTask(val db: DB) {
val sources = db.listSources()

sources.forEach {
db.createFetchTask(DB.HilltopFetchTaskCreate(it.id, SITES_LIST, buildSiteListUrl(it.htsUrl)))
db.createFetchTask(
DB.HilltopFetchTaskCreate(
it.id, HilltopFetchTaskType.SITES_LIST, buildSiteListUrl(it.htsUrl)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,18 @@ fun buildPastMeasurementsUrl(
.queryParam("to", month.plusMonths(1).atDay(1).atStartOfDay().minusSeconds(1))
.build()
.toASCIIString()

fun buildLatestMeasurementsUrl(
hilltopUrl: String,
siteId: String,
measurementName: String,
): String =
DefaultUriBuilderFactory()
.uriString(hilltopUrl)
.queryParam("Service", "Hilltop")
.queryParam("Request", "GetData")
.queryParam("Site", siteId)
.queryParam("Measurement", measurementName)
.queryParam("TimeInterval", "P35D/now")
.build()
.toASCIIString()
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ import java.sql.ResultSet
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.stereotype.Component

enum class HilltopFetchTaskType {
SITES_LIST,
MEASUREMENTS_LIST,
MEASUREMENT_DATA,
MEASUREMENT_DATA_LATEST,
}

@Component
class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {

Expand All @@ -27,14 +33,14 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {

data class HilltopFetchTaskCreate(
val sourceId: Int,
val requestType: HilltopMessageType,
val requestType: HilltopFetchTaskType,
val baseUrl: String,
)

data class HilltopFetchTaskRow(
val id: Int,
val sourceId: Int,
val requestType: HilltopMessageType,
val requestType: HilltopFetchTaskType,
val nextFetchAt: Instant,
val fetchUri: URI,
val previousDataHash: String?,
Expand Down Expand Up @@ -110,7 +116,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {
HilltopFetchTaskRow(
rs.getInt("id"),
rs.getInt("source_id"),
HilltopMessageType.valueOf(rs.getString("request_type")),
HilltopFetchTaskType.valueOf(rs.getString("request_type")),
rs.getTimestamp("next_fetch_at").toInstant(),
URI(rs.getString("fetch_url")),
rs.getString("previous_data_hash"))
Expand All @@ -127,7 +133,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {
"""
UPDATE hilltop_fetch_tasks
SET previous_data_hash = ?,
previous_history = previous_history || ?::jsonb,
previous_history = jsonb_path_query_array(previous_history, '$[last-49 to last]') || ?::jsonb,
next_fetch_at = ?
WHERE id = ?
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import nz.govt.eop.hilltop_crawler.api.parsers.HilltopXmlParsers
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchResult
import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchStatus.*
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.*
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

Expand Down Expand Up @@ -74,27 +74,34 @@ class FetchTaskProcessor(
val taskMapper =
try {
when (taskToProcess.requestType) {
SITES_LIST ->
HilltopFetchTaskType.SITES_LIST ->
SitesListTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseSitesResponse(xmlContent))
MEASUREMENTS_LIST ->
HilltopFetchTaskType.MEASUREMENTS_LIST ->
MeasurementsListTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementsResponse(xmlContent))
MEASUREMENT_DATA ->
HilltopFetchTaskType.MEASUREMENT_DATA ->
MeasurementDataTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementValuesResponse(xmlContent))
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST ->
MeasurementDataLatestTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementValuesResponse(xmlContent))
}
} catch (e: JsonMappingException) {
logger.warn(e) { "Failed to parse content [${xmlContent}]" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import kotlin.random.Random
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurementValues
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurements
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopSites
import nz.govt.eop.hilltop_crawler.api.requests.buildLatestMeasurementsUrl
import nz.govt.eop.hilltop_crawler.api.requests.buildMeasurementListUrl
import nz.govt.eop.hilltop_crawler.api.requests.buildPastMeasurementsUrl
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType

/**
* This is an abstract class that represents a mapper for processing a specific type of task. When
Expand All @@ -26,7 +28,7 @@ import nz.govt.eop.hilltop_crawler.db.DB
* do the above per task type.
*/
abstract class TaskMapper<T>(
val type: HilltopMessageType,
val type: HilltopFetchTaskType,
val sourceConfig: DB.HilltopSourcesRow,
val fetchedUri: URI,
val fetchedAt: Instant,
Expand Down Expand Up @@ -60,7 +62,7 @@ class SitesListTaskMapper(
parsedContent: HilltopSites
) :
TaskMapper<HilltopSites>(
HilltopMessageType.SITES_LIST,
HilltopFetchTaskType.SITES_LIST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand All @@ -73,7 +75,7 @@ class SitesListTaskMapper(
.map {
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopMessageType.MEASUREMENTS_LIST,
HilltopFetchTaskType.MEASUREMENTS_LIST,
buildMeasurementListUrl(baseUri, it.name),
)
}
Expand All @@ -88,7 +90,7 @@ class SitesListTaskMapper(
)

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS))
randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
}

class MeasurementsListTaskMapper(
Expand All @@ -99,7 +101,7 @@ class MeasurementsListTaskMapper(
parsedContent: HilltopMeasurements
) :
TaskMapper<HilltopMeasurements>(
HilltopMessageType.MEASUREMENTS_LIST,
HilltopFetchTaskType.MEASUREMENTS_LIST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand All @@ -125,7 +127,7 @@ class MeasurementsListTaskMapper(
sourceConfig.config.measurementNames.contains(it.name) && it.vm == null
} != null
}
.flatMap {
.flatMap { it ->
val fromDate =
LocalDate.parse(
it.from.subSequence(0, 10), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
Expand All @@ -139,13 +141,29 @@ class MeasurementsListTaskMapper(
}
.requestAs

generateMonthSequence(fromDate, toDate).map { yearMonth ->
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopMessageType.MEASUREMENT_DATA,
buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth),
)
}
val isLastObservationInCurrentMonth =
YearMonth.from(toDate) == YearMonth.from(LocalDate.now())

generateMonthSequence(
fromDate,
if (isLastObservationInCurrentMonth) toDate.minusMonths(1) else toDate)
.map { yearMonth ->
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopFetchTaskType.MEASUREMENT_DATA,
buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth))
}
.let { tasks ->
if (isLastObservationInCurrentMonth) {
tasks.plus(
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST,
buildLatestMeasurementsUrl(baseUri, it.siteName, requestAs)))
} else {
tasks
}
}
}

override fun buildKafkaMessage(): HilltopMessage =
Expand All @@ -159,7 +177,7 @@ class MeasurementsListTaskMapper(
)

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt.plus(1, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
randomTimeBetween(fetchedAt.plus(10, ChronoUnit.DAYS), fetchedAt.plus(20, ChronoUnit.DAYS))
}

class MeasurementDataTaskMapper(
Expand All @@ -170,7 +188,43 @@ class MeasurementDataTaskMapper(
parsedContent: HilltopMeasurementValues
) :
TaskMapper<HilltopMeasurementValues>(
HilltopMessageType.MEASUREMENT_DATA,
HilltopFetchTaskType.MEASUREMENT_DATA,
sourceConfig,
fetchedUri,
fetchedAt,
content,
parsedContent) {
override fun buildNewTasksList(): List<DB.HilltopFetchTaskCreate> = emptyList()

override fun buildKafkaMessage(): HilltopMessage? =
if (parsedContent.measurement != null) {
HilltopMeasurementsMessage(
sourceConfig.councilId,
baseUri,
fetchedAt,
parsedContent.measurement.siteName,
parsedContent.measurement.dataSource.measurementName,
parsedContent.measurement.data.values.first().timestamp.let { YearMonth.from(it) },
fetchedUri.toASCIIString(),
content,
)
} else {
null
}

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
}

class MeasurementDataLatestTaskMapper(
sourceConfig: DB.HilltopSourcesRow,
fetchedUri: URI,
fetchedAt: Instant,
content: String,
parsedContent: HilltopMeasurementValues
) :
TaskMapper<HilltopMeasurementValues>(
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand Down Expand Up @@ -206,7 +260,9 @@ class MeasurementDataTaskMapper(
* be called when getting historical data as well)
*/
override fun determineNextFetchAt(): Instant {

val lastValueAt = parsedContent.measurement?.data?.values?.lastOrNull()?.timestamp?.toInstant()

return if (lastValueAt != null && lastValueAt > fetchedAt.minus(1, ChronoUnit.HOURS)) {
randomTimeBetween(
maxOf(lastValueAt.plus(15, ChronoUnit.MINUTES), fetchedAt),
Expand All @@ -215,9 +271,6 @@ class MeasurementDataTaskMapper(
randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.HOURS))
} else if (lastValueAt != null && lastValueAt > fetchedAt.minus(7, ChronoUnit.DAYS)) {
randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.DAYS))
// Just approx to a month
} else if (lastValueAt != null && lastValueAt > fetchedAt.minus(28, ChronoUnit.DAYS)) {
randomTimeBetween(fetchedAt, fetchedAt.plus(7, ChronoUnit.DAYS))
} else { // Any older or historical data, will rarely change, so we can fetch it less often.
randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS))
}
Expand Down
Loading

0 comments on commit f4db299

Please sign in to comment.