diff --git a/packages/HilltopCrawler/README.md b/packages/HilltopCrawler/README.md index 148d718a..ccb75831 100644 --- a/packages/HilltopCrawler/README.md +++ b/packages/HilltopCrawler/README.md @@ -18,16 +18,16 @@ Overall, this is built around the Hilltop API which exposes three levels of GET * SITES_LIST — List of Sites, includes names and locations * MEASUREMENTS_LIST — Per site, list of measurements available for that site, including details about the first and last observation date -* MEASUREMENT_DATA — Per site, per measurement, the timeseries observed data +* MEASUREMENT_DATA / MEASUREMENT_DATA_LATEST — Per site, per measurement, the timeseries observed data either historical or most recent The app crawls these by reading each level to decide what to read from the next level. i.e., The SITES_LIST tells the -app which sites call MEASUREMENTS_LIST for which in turn tells which measurements to call MEASUREMENT_DATA for. Requests +app which sites call MEASUREMENTS_LIST which in turn tells the app which measurements to call MEASUREMENT_DATA for. Requests for MEASUREMENT_DATA are also split into monthly chunks to avoid issues with too much data being returned in one -request. +request. If a site appears to be reporting data frequently then a MEASUREMENT_DATA_LATEST will be queued up to get data for the most recent 35 days. The app keeps a list of API requests that it will keep up to date by calling that API on a schedule. This list is stored in the `hilltop_fetch_tasks` table and works like a task queue. Each time a request is made, the result is used to try -and determine when next to schedule the task. The simple example is for MEASUREMENT_DATA if the last observation was +and determine when next to schedule the task. An example is for MEASUREMENT_DATA_LATEST if the last observation was recent, then a refresh should be attempted soon, if it was a long way in the past, it should be refreshed less often. The next schedule time has been implemented as a random time in an interval, to provide some jitter when between task @@ -47,7 +47,8 @@ This process is built around three main components: * Fetch data from hilltop for that task * If a valid result which is not the same as the previous version * Queue any new tasks from the result - * Send the result to Kafka `hilltop_raw` topic + * Send the result to Kafka `hilltop_raw` topic (note the Kafka topic does not distinguish between + MEASUREMENT_DATA / MEASUREMENT_DATA_LATEST it just calls them both MEASUREMENT_DATA) * Requeue the task for sometime in the future, based on the type of request * Kafka streams component @@ -73,8 +74,8 @@ at the same time without getting the same task. See this [reference](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/) for discussion about the `SKIP LOCKED` query. -The queue implementation is fairly simple for this specific use. If it becomes more of a generic work queue then a -standard implemention such as Quartz might be worthwhile moving to. +The queue implementation is fairly simple for this specific use. If it becomes more of a generic work queue then a +standard implementation such as Quartz might be worthwhile moving to. ## Example Configuration @@ -98,11 +99,10 @@ VALUES (9, 'https://hilltop.gw.govt.nz/merged.hts', '{ "measurementNames": ["Rai ### TODO / Improvement -* The `previous_history` on tasks will grow unbounded. This needs to be capped * The algorithm for determining the next time to schedule an API refresh could be improved, something could be built from the previous history based on how often data is unchanged. * To avoid hammering Hilltop there is rate limiting using a "token bucket" library, currently this uses one bucket for all requests. It could be split to use one bucket per server -* Because of the chunking by month, every time new data arrives we end up storing the whole month up to that new data +* Each time the latest measurement API is called we will receive data that has already been seen and processed previously and store it again in the `hilltop_raw` topic. This seems wasteful and there are options for cleaning up when a record just - supersedes the previous. But cleaning up would mean losing some knowledge about when a observation was first seen. + supersedes the previous. But cleaning up could mean losing some knowledge about when a observation was first seen. diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt index 619c1f08..4420cc5f 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt @@ -3,7 +3,7 @@ package nz.govt.eop.hilltop_crawler import java.util.concurrent.TimeUnit import nz.govt.eop.hilltop_crawler.api.requests.buildSiteListUrl import nz.govt.eop.hilltop_crawler.db.DB -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.SITES_LIST +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @@ -26,7 +26,9 @@ class CheckForNewSourcesTask(val db: DB) { val sources = db.listSources() sources.forEach { - db.createFetchTask(DB.HilltopFetchTaskCreate(it.id, SITES_LIST, buildSiteListUrl(it.htsUrl))) + db.createFetchTask( + DB.HilltopFetchTaskCreate( + it.id, HilltopFetchTaskType.SITES_LIST, buildSiteListUrl(it.htsUrl))) } } } diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt index 72212b94..85aaa7f7 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt @@ -37,3 +37,18 @@ fun buildPastMeasurementsUrl( .queryParam("to", month.plusMonths(1).atDay(1).atStartOfDay().minusSeconds(1)) .build() .toASCIIString() + +fun buildLatestMeasurementsUrl( + hilltopUrl: String, + siteId: String, + measurementName: String, +): String = + DefaultUriBuilderFactory() + .uriString(hilltopUrl) + .queryParam("Service", "Hilltop") + .queryParam("Request", "GetData") + .queryParam("Site", siteId) + .queryParam("Measurement", measurementName) + .queryParam("TimeInterval", "P35D/now") + .build() + .toASCIIString() diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt index 64bd5f43..ffe8f312 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt @@ -6,10 +6,16 @@ import java.sql.ResultSet import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneOffset -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Component +enum class HilltopFetchTaskType { + SITES_LIST, + MEASUREMENTS_LIST, + MEASUREMENT_DATA, + MEASUREMENT_DATA_LATEST, +} + @Component class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { @@ -27,14 +33,14 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { data class HilltopFetchTaskCreate( val sourceId: Int, - val requestType: HilltopMessageType, + val requestType: HilltopFetchTaskType, val baseUrl: String, ) data class HilltopFetchTaskRow( val id: Int, val sourceId: Int, - val requestType: HilltopMessageType, + val requestType: HilltopFetchTaskType, val nextFetchAt: Instant, val fetchUri: URI, val previousDataHash: String?, @@ -110,7 +116,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { HilltopFetchTaskRow( rs.getInt("id"), rs.getInt("source_id"), - HilltopMessageType.valueOf(rs.getString("request_type")), + HilltopFetchTaskType.valueOf(rs.getString("request_type")), rs.getTimestamp("next_fetch_at").toInstant(), URI(rs.getString("fetch_url")), rs.getString("previous_data_hash")) @@ -127,7 +133,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { """ UPDATE hilltop_fetch_tasks SET previous_data_hash = ?, - previous_history = previous_history || ?::jsonb, + previous_history = jsonb_path_query_array(previous_history, '$[last-49 to last]') || ?::jsonb, next_fetch_at = ? WHERE id = ? """ diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt index 55f31095..f1937d03 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt @@ -12,7 +12,7 @@ import nz.govt.eop.hilltop_crawler.api.parsers.HilltopXmlParsers import nz.govt.eop.hilltop_crawler.db.DB import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchResult import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchStatus.* -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.* +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @@ -74,27 +74,34 @@ class FetchTaskProcessor( val taskMapper = try { when (taskToProcess.requestType) { - SITES_LIST -> + HilltopFetchTaskType.SITES_LIST -> SitesListTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseSitesResponse(xmlContent)) - MEASUREMENTS_LIST -> + HilltopFetchTaskType.MEASUREMENTS_LIST -> MeasurementsListTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseMeasurementsResponse(xmlContent)) - MEASUREMENT_DATA -> + HilltopFetchTaskType.MEASUREMENT_DATA -> MeasurementDataTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseMeasurementValuesResponse(xmlContent)) + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST -> + MeasurementDataLatestTaskMapper( + source, + taskToProcess.fetchUri, + fetchedAt, + xmlContent, + parsers.parseMeasurementValuesResponse(xmlContent)) } } catch (e: JsonMappingException) { logger.warn(e) { "Failed to parse content [${xmlContent}]" } diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt index 48670dec..deea9202 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt @@ -10,9 +10,11 @@ import kotlin.random.Random import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurementValues import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurements import nz.govt.eop.hilltop_crawler.api.parsers.HilltopSites +import nz.govt.eop.hilltop_crawler.api.requests.buildLatestMeasurementsUrl import nz.govt.eop.hilltop_crawler.api.requests.buildMeasurementListUrl import nz.govt.eop.hilltop_crawler.api.requests.buildPastMeasurementsUrl import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType /** * This is an abstract class that represents a mapper for processing a specific type of task. When @@ -26,7 +28,7 @@ import nz.govt.eop.hilltop_crawler.db.DB * do the above per task type. */ abstract class TaskMapper( - val type: HilltopMessageType, + val type: HilltopFetchTaskType, val sourceConfig: DB.HilltopSourcesRow, val fetchedUri: URI, val fetchedAt: Instant, @@ -60,7 +62,7 @@ class SitesListTaskMapper( parsedContent: HilltopSites ) : TaskMapper( - HilltopMessageType.SITES_LIST, + HilltopFetchTaskType.SITES_LIST, sourceConfig, fetchedUri, fetchedAt, @@ -73,7 +75,7 @@ class SitesListTaskMapper( .map { DB.HilltopFetchTaskCreate( sourceConfig.id, - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, buildMeasurementListUrl(baseUri, it.name), ) } @@ -88,7 +90,7 @@ class SitesListTaskMapper( ) override fun determineNextFetchAt(): Instant = - randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS)) + randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) } class MeasurementsListTaskMapper( @@ -99,7 +101,7 @@ class MeasurementsListTaskMapper( parsedContent: HilltopMeasurements ) : TaskMapper( - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, sourceConfig, fetchedUri, fetchedAt, @@ -125,7 +127,7 @@ class MeasurementsListTaskMapper( sourceConfig.config.measurementNames.contains(it.name) && it.vm == null } != null } - .flatMap { + .flatMap { it -> val fromDate = LocalDate.parse( it.from.subSequence(0, 10), DateTimeFormatter.ofPattern("yyyy-MM-dd")) @@ -139,13 +141,29 @@ class MeasurementsListTaskMapper( } .requestAs - generateMonthSequence(fromDate, toDate).map { yearMonth -> - DB.HilltopFetchTaskCreate( - sourceConfig.id, - HilltopMessageType.MEASUREMENT_DATA, - buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth), - ) - } + val isLastObservationInCurrentMonth = + YearMonth.from(toDate) == YearMonth.from(LocalDate.now()) + + generateMonthSequence( + fromDate, + if (isLastObservationInCurrentMonth) toDate.minusMonths(1) else toDate) + .map { yearMonth -> + DB.HilltopFetchTaskCreate( + sourceConfig.id, + HilltopFetchTaskType.MEASUREMENT_DATA, + buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth)) + } + .let { tasks -> + if (isLastObservationInCurrentMonth) { + tasks.plus( + DB.HilltopFetchTaskCreate( + sourceConfig.id, + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + buildLatestMeasurementsUrl(baseUri, it.siteName, requestAs))) + } else { + tasks + } + } } override fun buildKafkaMessage(): HilltopMessage = @@ -159,7 +177,7 @@ class MeasurementsListTaskMapper( ) override fun determineNextFetchAt(): Instant = - randomTimeBetween(fetchedAt.plus(1, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) + randomTimeBetween(fetchedAt.plus(10, ChronoUnit.DAYS), fetchedAt.plus(20, ChronoUnit.DAYS)) } class MeasurementDataTaskMapper( @@ -170,7 +188,43 @@ class MeasurementDataTaskMapper( parsedContent: HilltopMeasurementValues ) : TaskMapper( - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, + sourceConfig, + fetchedUri, + fetchedAt, + content, + parsedContent) { + override fun buildNewTasksList(): List = emptyList() + + override fun buildKafkaMessage(): HilltopMessage? = + if (parsedContent.measurement != null) { + HilltopMeasurementsMessage( + sourceConfig.councilId, + baseUri, + fetchedAt, + parsedContent.measurement.siteName, + parsedContent.measurement.dataSource.measurementName, + parsedContent.measurement.data.values.first().timestamp.let { YearMonth.from(it) }, + fetchedUri.toASCIIString(), + content, + ) + } else { + null + } + + override fun determineNextFetchAt(): Instant = + randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) +} + +class MeasurementDataLatestTaskMapper( + sourceConfig: DB.HilltopSourcesRow, + fetchedUri: URI, + fetchedAt: Instant, + content: String, + parsedContent: HilltopMeasurementValues +) : + TaskMapper( + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, sourceConfig, fetchedUri, fetchedAt, @@ -206,7 +260,9 @@ class MeasurementDataTaskMapper( * be called when getting historical data as well) */ override fun determineNextFetchAt(): Instant { + val lastValueAt = parsedContent.measurement?.data?.values?.lastOrNull()?.timestamp?.toInstant() + return if (lastValueAt != null && lastValueAt > fetchedAt.minus(1, ChronoUnit.HOURS)) { randomTimeBetween( maxOf(lastValueAt.plus(15, ChronoUnit.MINUTES), fetchedAt), @@ -215,9 +271,6 @@ class MeasurementDataTaskMapper( randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.HOURS)) } else if (lastValueAt != null && lastValueAt > fetchedAt.minus(7, ChronoUnit.DAYS)) { randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.DAYS)) - // Just approx to a month - } else if (lastValueAt != null && lastValueAt > fetchedAt.minus(28, ChronoUnit.DAYS)) { - randomTimeBetween(fetchedAt, fetchedAt.plus(7, ChronoUnit.DAYS)) } else { // Any older or historical data, will rarely change, so we can fetch it less often. randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS)) } diff --git a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt index 301c12f3..fd0cf7ff 100644 --- a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt +++ b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt @@ -10,6 +10,7 @@ import java.net.URI import java.time.Instant import nz.govt.eop.hilltop_crawler.HilltopCrawlerTestConfiguration import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.* @@ -279,7 +280,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe "0fcafcc9533e521e53cad82226d44c832eca280e75dda23ffe5575b6563995c0" it.nextFetchAt shouldBeAfter Instant.now() @@ -320,7 +321,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe "0fcafcc9533e521e53cad82226d44c832eca280e75dda23ffe5575b6563995c0" it.nextFetchAt shouldBeAfter Instant.now() @@ -328,7 +329,7 @@ class FetchTaskProcessorIntegrationTest( // Has created rows for the new sites tasks.forExactly(2) { - it.requestType shouldBe HilltopMessageType.MEASUREMENTS_LIST + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENTS_LIST it.nextFetchAt shouldBeBefore Instant.now() } @@ -376,7 +377,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENTS_LIST + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENTS_LIST it.previousDataHash shouldBe "e58323f66a24dfdc774756f608efba792b55deba4f8c1135c3aefee38f71f404" it.nextFetchAt shouldBeAfter Instant.now() @@ -384,7 +385,7 @@ class FetchTaskProcessorIntegrationTest( // Has created rows for the new sites tasks.forOne { - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.nextFetchAt shouldBeBefore Instant.now() it.fetchUri shouldBe URI( @@ -436,7 +437,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.previousDataHash shouldBe "c1b8652916a235b608818d3ce3efa0dc29517ba115b1de1b6221a487b4e696bc" it.nextFetchAt shouldBeAfter Instant.now() @@ -492,7 +493,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.previousDataHash shouldBe "1ddfbd7f9a3d44bca5a6ae05d56f2016eb4d6e4292625fa550102dea10861ce6" it.nextFetchAt shouldBeAfter Instant.now() @@ -501,6 +502,56 @@ class FetchTaskProcessorIntegrationTest( verifyNoInteractions(mockKafka) } + @Test + fun `should correctly process a measurement values latest message`() { + // GIVEN + val sourceId = createDefaultSource(jdbcTemplate) + + createFetchTask( + jdbcTemplate, + sourceId, + "MEASUREMENT_DATA_LATEST", + "http://example.com", + "2021-01-01 00:00:00", + ) + + val input = + this.javaClass.getResource("/hilltop-xml/MeasurementValuesResponse.xml")!!.readText() + + mockServer + .expect(requestTo("http://example.com")) + .andExpect(method(HttpMethod.GET)) + .andRespond(withSuccess(input, null)) + + // WHEN + val result = underTest.runNextTask() + + // THEN + mockServer.verify() + + result shouldBe true + + val tasks = listTasksToProcess(jdbcTemplate) + tasks shouldHaveSize 1 + + // Has updated the row we just fetched + tasks.forOne { + it.fetchUri shouldBe URI("http://example.com") + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA_LATEST + it.previousDataHash shouldBe + "c1b8652916a235b608818d3ce3efa0dc29517ba115b1de1b6221a487b4e696bc" + it.nextFetchAt shouldBeAfter Instant.now() + } + + argumentCaptor().apply { + verify(mockKafka).send(capture()) + + firstValue.councilId shouldBe 1 + firstValue.type shouldBe HilltopMessageType.MEASUREMENT_DATA + firstValue.hilltopBaseUrl shouldBe "http://example.com" + } + } + @Test fun `should reschedule a task when an unknown error occurs`() { // For this faking that the kafka component threw an exception and just making sure the task @@ -539,7 +590,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe null it.nextFetchAt shouldBeAfter Instant.now() } @@ -557,7 +608,7 @@ fun listTasksToProcess(template: JdbcTemplate): List = DB.HilltopFetchTaskRow( rs.getInt("id"), rs.getInt("source_id"), - HilltopMessageType.valueOf(rs.getString("request_type")), + HilltopFetchTaskType.valueOf(rs.getString("request_type")), rs.getTimestamp("next_fetch_at").toInstant(), URI(rs.getString("fetch_url")), rs.getString("previous_data_hash")) diff --git a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt index 3c809d1a..216d0cd8 100644 --- a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt +++ b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt @@ -5,11 +5,10 @@ import io.kotest.matchers.date.shouldBeAfter import io.kotest.matchers.date.shouldBeBefore import io.kotest.matchers.shouldBe import java.net.URI -import java.time.Instant -import java.time.YearMonth -import java.time.ZoneOffset +import java.time.* import nz.govt.eop.hilltop_crawler.api.parsers.* import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.springframework.test.context.ActiveProfiles @@ -57,7 +56,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, "http://some.uri?Service=Hilltop&Request=MeasurementList&Site=Some%20Site", )) } @@ -158,7 +157,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", )) } @@ -196,12 +195,12 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=another%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", )) } @@ -230,37 +229,110 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-02-01T00:00&to=2000-02-29T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-03-01T00:00&to=2000-03-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-04-01T00:00&to=2000-04-30T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-05-01T00:00&to=2000-05-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-06-01T00:00&to=2000-06-30T23:59:59", ), ) } + @Test + fun `should return list which excludes the current month and includes the latest measurements when last observation is in the current month`() { + // GIVEN + val currentDate = LocalDate.now() + + val underTest = + createRecordForTesting( + listOf( + HilltopDatasource( + "some datasource name", + "some site name", + "${currentDate.minusMonths(1)}T00:00:00", + "${currentDate}T01:00:00", + "StdSeries", + listOf( + HilltopMeasurement("some datasource name", "some datasource name", 1))), + )) + + // WHEN + val result = underTest.buildNewTasksList() + + // THEN + val previousYearMonth = YearMonth.from(currentDate.minusMonths(1)) + + result shouldHaveSize 2 + result shouldBe + listOf( + DB.HilltopFetchTaskCreate( + 1, + HilltopFetchTaskType.MEASUREMENT_DATA, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=${previousYearMonth.atDay(1)}T00:00&to=${previousYearMonth.atEndOfMonth()}T23:59:59", + ), + DB.HilltopFetchTaskCreate( + 1, + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&TimeInterval=P35D/now", + ), + ) + } + + @Test + fun `should return list with only the task for fetching latest data when only observations are in the current month`() { + // GIVEN + val currentDate = LocalDate.now() + + val underTest = + createRecordForTesting( + listOf( + HilltopDatasource( + "some datasource name", + "some site name", + "${currentDate}T00:00:00", + "${currentDate}T01:00:00", + "StdSeries", + listOf( + HilltopMeasurement("some datasource name", "some datasource name", 1))), + )) + + // WHEN + val result = underTest.buildNewTasksList() + + // THEN + result shouldHaveSize 1 + result shouldBe + listOf( + DB.HilltopFetchTaskCreate( + 1, + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&TimeInterval=P35D/now", + ), + ) + } + @Test fun `should return list of tasks mapped from measurements for determine next tasks using the requestAs name in the URL`() { // GIVEN @@ -285,7 +357,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=check%20me%20out%20like%20this&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), ) @@ -458,6 +530,142 @@ class TaskMappersTests { ) } + @Test + fun `should return next fetch at between 20 and 30 days away`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:20:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2000-01-20T00:20:00Z") + result shouldBeBefore Instant.parse("2000-01-30T00:20:00Z") + } + + @Test + fun `should return next fetch at in next month when the last measurement is a long time ago`() { + // GIVEN + val underTest = createRecordForTesting("2020-01-01T00:00:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2020-01-01T00:00:00Z") + result shouldBeBefore Instant.parse("2020-01-31T00:00:00Z") + } + + @Test + fun `should return next fetch at in next month when there is no measurements`() { + // GIVEN + val fetchedAt = Instant.parse("2000-01-01T00:00:00Z") + + val underTest = + MeasurementDataTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri"), + fetchedAt, + "some content", + HilltopMeasurementValues(null)) + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2000-01-01T00:00:00Z") + result shouldBeBefore Instant.parse("2000-01-31T00:00:00Z") + } + } + + @Nested + inner class MeasurementDataLatestTaskMapperTests { + + private fun createRecordForTesting( + fetchedAtString: String, + lastValueAtString: String + ): MeasurementDataLatestTaskMapper { + + val fetchedAt = Instant.parse(fetchedAtString) + val timestampInPlus12Time = + Instant.parse(lastValueAtString) + .atOffset(ZoneOffset.of("+12")) + .toString() + .substring(0, 16) + + return MeasurementDataLatestTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri?foo=bar"), + fetchedAt, + "some content", + HilltopMeasurementValues( + Measurement( + "some site name", + DataSource("some measurement name"), + Data( + "some name", + listOf( + Value(timestampInPlus12Time, "1.0", null), + ))))) + } + + @Test + fun `should return empty list for determine next tasks`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:00:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.buildNewTasksList() + + // THEN + result shouldHaveSize 0 + } + + @Test + fun `should return null for Kafka message when there are no measurements`() { + // GIVEN + val fetchedAt = Instant.parse("2000-01-01T00:00:00Z") + + val underTest = + MeasurementDataTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri"), + fetchedAt, + "some content", + HilltopMeasurementValues(null)) + + // WHEN + val result = underTest.buildKafkaMessage() + + // THEN + result shouldBe null + } + + @Test + fun `should return Kafka message when there are measurements`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:20:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.buildKafkaMessage() + + // THEN + result shouldBe + HilltopMeasurementsMessage( + 1, + "http://some.uri", + Instant.parse("2000-01-01T00:20:00Z"), + "some site name", + "some measurement name", + YearMonth.of(2000, 1), + "http://some.uri?foo=bar", + "some content", + ) + } + @Test fun `should return next fetch at in the next 30 minutes when last value was recent`() { // GIVEN @@ -510,19 +718,6 @@ class TaskMappersTests { result shouldBeBefore Instant.parse("2000-01-08T00:00:00Z") } - @Test - fun `should return next fetch at in the next week when last value was within a month`() { - // GIVEN - val underTest = createRecordForTesting("2000-01-27T00:00:00Z", "2000-01-01T00:00:00Z") - - // WHEN - val result = underTest.determineNextFetchAt() - - // THEN - result shouldBeAfter Instant.parse("2000-01-27T00:00:00Z") - result shouldBeBefore Instant.parse("2000-02-03T00:00:00Z") - } - @Test fun `should return next fetch at in next month when the last measurement is a long time ago`() { // GIVEN