diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt index 619c1f08..4420cc5f 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/CheckForNewSourcesTask.kt @@ -3,7 +3,7 @@ package nz.govt.eop.hilltop_crawler import java.util.concurrent.TimeUnit import nz.govt.eop.hilltop_crawler.api.requests.buildSiteListUrl import nz.govt.eop.hilltop_crawler.db.DB -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.SITES_LIST +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @@ -26,7 +26,9 @@ class CheckForNewSourcesTask(val db: DB) { val sources = db.listSources() sources.forEach { - db.createFetchTask(DB.HilltopFetchTaskCreate(it.id, SITES_LIST, buildSiteListUrl(it.htsUrl))) + db.createFetchTask( + DB.HilltopFetchTaskCreate( + it.id, HilltopFetchTaskType.SITES_LIST, buildSiteListUrl(it.htsUrl))) } } } diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt index 72212b94..85aaa7f7 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/api/requests/HilltopRequestBuilders.kt @@ -37,3 +37,18 @@ fun buildPastMeasurementsUrl( .queryParam("to", month.plusMonths(1).atDay(1).atStartOfDay().minusSeconds(1)) .build() .toASCIIString() + +fun buildLatestMeasurementsUrl( + hilltopUrl: String, + siteId: String, + measurementName: String, +): String = + DefaultUriBuilderFactory() + .uriString(hilltopUrl) + .queryParam("Service", "Hilltop") + .queryParam("Request", "GetData") + .queryParam("Site", siteId) + .queryParam("Measurement", measurementName) + .queryParam("TimeInterval", "P35D/now") + .build() + .toASCIIString() diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt index 4a836ea4..ffe8f312 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/db/DB.kt @@ -6,10 +6,16 @@ import java.sql.ResultSet import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneOffset -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Component +enum class HilltopFetchTaskType { + SITES_LIST, + MEASUREMENTS_LIST, + MEASUREMENT_DATA, + MEASUREMENT_DATA_LATEST, +} + @Component class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { @@ -27,14 +33,14 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { data class HilltopFetchTaskCreate( val sourceId: Int, - val requestType: HilltopMessageType, + val requestType: HilltopFetchTaskType, val baseUrl: String, ) data class HilltopFetchTaskRow( val id: Int, val sourceId: Int, - val requestType: HilltopMessageType, + val requestType: HilltopFetchTaskType, val nextFetchAt: Instant, val fetchUri: URI, val previousDataHash: String?, @@ -110,7 +116,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) { HilltopFetchTaskRow( rs.getInt("id"), rs.getInt("source_id"), - HilltopMessageType.valueOf(rs.getString("request_type")), + HilltopFetchTaskType.valueOf(rs.getString("request_type")), rs.getTimestamp("next_fetch_at").toInstant(), URI(rs.getString("fetch_url")), rs.getString("previous_data_hash")) diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt index 55f31095..f1937d03 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessor.kt @@ -12,7 +12,7 @@ import nz.govt.eop.hilltop_crawler.api.parsers.HilltopXmlParsers import nz.govt.eop.hilltop_crawler.db.DB import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchResult import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchStatus.* -import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.* +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @@ -74,27 +74,34 @@ class FetchTaskProcessor( val taskMapper = try { when (taskToProcess.requestType) { - SITES_LIST -> + HilltopFetchTaskType.SITES_LIST -> SitesListTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseSitesResponse(xmlContent)) - MEASUREMENTS_LIST -> + HilltopFetchTaskType.MEASUREMENTS_LIST -> MeasurementsListTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseMeasurementsResponse(xmlContent)) - MEASUREMENT_DATA -> + HilltopFetchTaskType.MEASUREMENT_DATA -> MeasurementDataTaskMapper( source, taskToProcess.fetchUri, fetchedAt, xmlContent, parsers.parseMeasurementValuesResponse(xmlContent)) + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST -> + MeasurementDataLatestTaskMapper( + source, + taskToProcess.fetchUri, + fetchedAt, + xmlContent, + parsers.parseMeasurementValuesResponse(xmlContent)) } } catch (e: JsonMappingException) { logger.warn(e) { "Failed to parse content [${xmlContent}]" } diff --git a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt index f4490aaa..fee27a4c 100644 --- a/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt +++ b/packages/HilltopCrawler/src/main/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappers.kt @@ -10,9 +10,11 @@ import kotlin.random.Random import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurementValues import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurements import nz.govt.eop.hilltop_crawler.api.parsers.HilltopSites +import nz.govt.eop.hilltop_crawler.api.requests.buildLatestMeasurementsUrl import nz.govt.eop.hilltop_crawler.api.requests.buildMeasurementListUrl import nz.govt.eop.hilltop_crawler.api.requests.buildPastMeasurementsUrl import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType /** * This is an abstract class that represents a mapper for processing a specific type of task. When @@ -26,7 +28,7 @@ import nz.govt.eop.hilltop_crawler.db.DB * do the above per task type. */ abstract class TaskMapper( - val type: HilltopMessageType, + val type: HilltopFetchTaskType, val sourceConfig: DB.HilltopSourcesRow, val fetchedUri: URI, val fetchedAt: Instant, @@ -60,7 +62,7 @@ class SitesListTaskMapper( parsedContent: HilltopSites ) : TaskMapper( - HilltopMessageType.SITES_LIST, + HilltopFetchTaskType.SITES_LIST, sourceConfig, fetchedUri, fetchedAt, @@ -73,7 +75,7 @@ class SitesListTaskMapper( .map { DB.HilltopFetchTaskCreate( sourceConfig.id, - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, buildMeasurementListUrl(baseUri, it.name), ) } @@ -88,7 +90,7 @@ class SitesListTaskMapper( ) override fun determineNextFetchAt(): Instant = - randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS)) + randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) } class MeasurementsListTaskMapper( @@ -99,7 +101,7 @@ class MeasurementsListTaskMapper( parsedContent: HilltopMeasurements ) : TaskMapper( - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, sourceConfig, fetchedUri, fetchedAt, @@ -141,17 +143,28 @@ class MeasurementsListTaskMapper( val monthSequence = if (YearMonth.from(toDate).equals(YearMonth.from(LocalDate.now()))) { - generateMonthSequence(fromDate, toDate.plusMonths(1)) + generateMonthSequence(fromDate, toDate.minusMonths(1)) } else { generateMonthSequence(fromDate, toDate) } - monthSequence.map { yearMonth -> - DB.HilltopFetchTaskCreate( - sourceConfig.id, - HilltopMessageType.MEASUREMENT_DATA, - buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth), - ) + val monthlyTasks = + monthSequence.map { yearMonth -> + DB.HilltopFetchTaskCreate( + sourceConfig.id, + HilltopFetchTaskType.MEASUREMENT_DATA, + buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth), + ) + } + + return@flatMap if (YearMonth.from(toDate).equals(YearMonth.from(LocalDate.now()))) { + monthlyTasks.plus( + DB.HilltopFetchTaskCreate( + sourceConfig.id, + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + buildLatestMeasurementsUrl(baseUri, it.siteName, requestAs))) + } else { + monthlyTasks } } @@ -166,7 +179,7 @@ class MeasurementsListTaskMapper( ) override fun determineNextFetchAt(): Instant = - randomTimeBetween(fetchedAt.plus(1, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) + randomTimeBetween(fetchedAt.plus(10, ChronoUnit.DAYS), fetchedAt.plus(20, ChronoUnit.DAYS)) } class MeasurementDataTaskMapper( @@ -177,7 +190,43 @@ class MeasurementDataTaskMapper( parsedContent: HilltopMeasurementValues ) : TaskMapper( - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, + sourceConfig, + fetchedUri, + fetchedAt, + content, + parsedContent) { + override fun buildNewTasksList(): List = emptyList() + + override fun buildKafkaMessage(): HilltopMessage? = + if (parsedContent.measurement != null) { + HilltopMeasurementsMessage( + sourceConfig.councilId, + baseUri, + fetchedAt, + parsedContent.measurement.siteName, + parsedContent.measurement.dataSource.measurementName, + parsedContent.measurement.data.values.first().timestamp.let { YearMonth.from(it) }, + fetchedUri.toASCIIString(), + content, + ) + } else { + null + } + + override fun determineNextFetchAt(): Instant = + randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS)) +} + +class MeasurementDataLatestTaskMapper( + sourceConfig: DB.HilltopSourcesRow, + fetchedUri: URI, + fetchedAt: Instant, + content: String, + parsedContent: HilltopMeasurementValues +) : + TaskMapper( + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, sourceConfig, fetchedUri, fetchedAt, @@ -213,7 +262,9 @@ class MeasurementDataTaskMapper( * be called when getting historical data as well) */ override fun determineNextFetchAt(): Instant { + val lastValueAt = parsedContent.measurement?.data?.values?.lastOrNull()?.timestamp?.toInstant() + return if (lastValueAt != null && lastValueAt > fetchedAt.minus(1, ChronoUnit.HOURS)) { randomTimeBetween( maxOf(lastValueAt.plus(15, ChronoUnit.MINUTES), fetchedAt), @@ -222,9 +273,6 @@ class MeasurementDataTaskMapper( randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.HOURS)) } else if (lastValueAt != null && lastValueAt > fetchedAt.minus(7, ChronoUnit.DAYS)) { randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.DAYS)) - // Just approx to a month - } else if (lastValueAt != null && lastValueAt > fetchedAt.minus(28, ChronoUnit.DAYS)) { - randomTimeBetween(fetchedAt, fetchedAt.plus(7, ChronoUnit.DAYS)) } else { // Any older or historical data, will rarely change, so we can fetch it less often. randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS)) } diff --git a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt index 301c12f3..fd0cf7ff 100644 --- a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt +++ b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/FetchTaskProcessorIntegrationTest.kt @@ -10,6 +10,7 @@ import java.net.URI import java.time.Instant import nz.govt.eop.hilltop_crawler.HilltopCrawlerTestConfiguration import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.* @@ -279,7 +280,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe "0fcafcc9533e521e53cad82226d44c832eca280e75dda23ffe5575b6563995c0" it.nextFetchAt shouldBeAfter Instant.now() @@ -320,7 +321,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe "0fcafcc9533e521e53cad82226d44c832eca280e75dda23ffe5575b6563995c0" it.nextFetchAt shouldBeAfter Instant.now() @@ -328,7 +329,7 @@ class FetchTaskProcessorIntegrationTest( // Has created rows for the new sites tasks.forExactly(2) { - it.requestType shouldBe HilltopMessageType.MEASUREMENTS_LIST + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENTS_LIST it.nextFetchAt shouldBeBefore Instant.now() } @@ -376,7 +377,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENTS_LIST + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENTS_LIST it.previousDataHash shouldBe "e58323f66a24dfdc774756f608efba792b55deba4f8c1135c3aefee38f71f404" it.nextFetchAt shouldBeAfter Instant.now() @@ -384,7 +385,7 @@ class FetchTaskProcessorIntegrationTest( // Has created rows for the new sites tasks.forOne { - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.nextFetchAt shouldBeBefore Instant.now() it.fetchUri shouldBe URI( @@ -436,7 +437,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.previousDataHash shouldBe "c1b8652916a235b608818d3ce3efa0dc29517ba115b1de1b6221a487b4e696bc" it.nextFetchAt shouldBeAfter Instant.now() @@ -492,7 +493,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.MEASUREMENT_DATA + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA it.previousDataHash shouldBe "1ddfbd7f9a3d44bca5a6ae05d56f2016eb4d6e4292625fa550102dea10861ce6" it.nextFetchAt shouldBeAfter Instant.now() @@ -501,6 +502,56 @@ class FetchTaskProcessorIntegrationTest( verifyNoInteractions(mockKafka) } + @Test + fun `should correctly process a measurement values latest message`() { + // GIVEN + val sourceId = createDefaultSource(jdbcTemplate) + + createFetchTask( + jdbcTemplate, + sourceId, + "MEASUREMENT_DATA_LATEST", + "http://example.com", + "2021-01-01 00:00:00", + ) + + val input = + this.javaClass.getResource("/hilltop-xml/MeasurementValuesResponse.xml")!!.readText() + + mockServer + .expect(requestTo("http://example.com")) + .andExpect(method(HttpMethod.GET)) + .andRespond(withSuccess(input, null)) + + // WHEN + val result = underTest.runNextTask() + + // THEN + mockServer.verify() + + result shouldBe true + + val tasks = listTasksToProcess(jdbcTemplate) + tasks shouldHaveSize 1 + + // Has updated the row we just fetched + tasks.forOne { + it.fetchUri shouldBe URI("http://example.com") + it.requestType shouldBe HilltopFetchTaskType.MEASUREMENT_DATA_LATEST + it.previousDataHash shouldBe + "c1b8652916a235b608818d3ce3efa0dc29517ba115b1de1b6221a487b4e696bc" + it.nextFetchAt shouldBeAfter Instant.now() + } + + argumentCaptor().apply { + verify(mockKafka).send(capture()) + + firstValue.councilId shouldBe 1 + firstValue.type shouldBe HilltopMessageType.MEASUREMENT_DATA + firstValue.hilltopBaseUrl shouldBe "http://example.com" + } + } + @Test fun `should reschedule a task when an unknown error occurs`() { // For this faking that the kafka component threw an exception and just making sure the task @@ -539,7 +590,7 @@ class FetchTaskProcessorIntegrationTest( // Has updated the row we just fetched tasks.forOne { it.fetchUri shouldBe URI("http://example.com") - it.requestType shouldBe HilltopMessageType.SITES_LIST + it.requestType shouldBe HilltopFetchTaskType.SITES_LIST it.previousDataHash shouldBe null it.nextFetchAt shouldBeAfter Instant.now() } @@ -557,7 +608,7 @@ fun listTasksToProcess(template: JdbcTemplate): List = DB.HilltopFetchTaskRow( rs.getInt("id"), rs.getInt("source_id"), - HilltopMessageType.valueOf(rs.getString("request_type")), + HilltopFetchTaskType.valueOf(rs.getString("request_type")), rs.getTimestamp("next_fetch_at").toInstant(), URI(rs.getString("fetch_url")), rs.getString("previous_data_hash")) diff --git a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt index 6d70287a..216d0cd8 100644 --- a/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt +++ b/packages/HilltopCrawler/src/test/kotlin/nz/govt/eop/hilltop_crawler/fetcher/TaskMappersTests.kt @@ -5,12 +5,10 @@ import io.kotest.matchers.date.shouldBeAfter import io.kotest.matchers.date.shouldBeBefore import io.kotest.matchers.shouldBe import java.net.URI -import java.time.Instant -import java.time.LocalDate -import java.time.YearMonth -import java.time.ZoneOffset +import java.time.* import nz.govt.eop.hilltop_crawler.api.parsers.* import nz.govt.eop.hilltop_crawler.db.DB +import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.springframework.test.context.ActiveProfiles @@ -58,7 +56,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENTS_LIST, + HilltopFetchTaskType.MEASUREMENTS_LIST, "http://some.uri?Service=Hilltop&Request=MeasurementList&Site=Some%20Site", )) } @@ -159,7 +157,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", )) } @@ -197,12 +195,12 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=another%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", )) } @@ -231,39 +229,39 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-02-01T00:00&to=2000-02-29T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-03-01T00:00&to=2000-03-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-04-01T00:00&to=2000-04-30T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-05-01T00:00&to=2000-05-31T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=2000-06-01T00:00&to=2000-06-30T23:59:59", ), ) } @Test - fun `should return list which includes one month into the future when last observation is in the current month`() { + fun `should return list which excludes the current month and includes the latest measurements when last observation is in the current month`() { // GIVEN val currentDate = LocalDate.now() @@ -273,7 +271,7 @@ class TaskMappersTests { HilltopDatasource( "some datasource name", "some site name", - "${currentDate}T00:00:00", + "${currentDate.minusMonths(1)}T00:00:00", "${currentDate}T01:00:00", "StdSeries", listOf( @@ -284,20 +282,53 @@ class TaskMappersTests { val result = underTest.buildNewTasksList() // THEN - val currentYearMonth = YearMonth.from(currentDate) - val nextYearMonth = currentYearMonth.plusMonths(1) + val previousYearMonth = YearMonth.from(currentDate.minusMonths(1)) + result shouldHaveSize 2 result shouldBe listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, - "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=${currentYearMonth.atDay(1)}T00:00&to=${currentYearMonth.atEndOfMonth()}T23:59:59", + HilltopFetchTaskType.MEASUREMENT_DATA, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=${previousYearMonth.atDay(1)}T00:00&to=${previousYearMonth.atEndOfMonth()}T23:59:59", ), DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, - "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&from=${nextYearMonth.atDay(1)}T00:00&to=${nextYearMonth.atEndOfMonth()}T23:59:59", + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&TimeInterval=P35D/now", + ), + ) + } + + @Test + fun `should return list with only the task for fetching latest data when only observations are in the current month`() { + // GIVEN + val currentDate = LocalDate.now() + + val underTest = + createRecordForTesting( + listOf( + HilltopDatasource( + "some datasource name", + "some site name", + "${currentDate}T00:00:00", + "${currentDate}T01:00:00", + "StdSeries", + listOf( + HilltopMeasurement("some datasource name", "some datasource name", 1))), + )) + + // WHEN + val result = underTest.buildNewTasksList() + + // THEN + result shouldHaveSize 1 + result shouldBe + listOf( + DB.HilltopFetchTaskCreate( + 1, + HilltopFetchTaskType.MEASUREMENT_DATA_LATEST, + "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=some%20datasource%20name&TimeInterval=P35D/now", ), ) } @@ -326,7 +357,7 @@ class TaskMappersTests { listOf( DB.HilltopFetchTaskCreate( 1, - HilltopMessageType.MEASUREMENT_DATA, + HilltopFetchTaskType.MEASUREMENT_DATA, "http://some.uri?Service=Hilltop&Request=GetData&Site=some%20site%20name&Measurement=check%20me%20out%20like%20this&from=2000-01-01T00:00&to=2000-01-31T23:59:59", ), ) @@ -499,6 +530,142 @@ class TaskMappersTests { ) } + @Test + fun `should return next fetch at between 20 and 30 days away`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:20:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2000-01-20T00:20:00Z") + result shouldBeBefore Instant.parse("2000-01-30T00:20:00Z") + } + + @Test + fun `should return next fetch at in next month when the last measurement is a long time ago`() { + // GIVEN + val underTest = createRecordForTesting("2020-01-01T00:00:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2020-01-01T00:00:00Z") + result shouldBeBefore Instant.parse("2020-01-31T00:00:00Z") + } + + @Test + fun `should return next fetch at in next month when there is no measurements`() { + // GIVEN + val fetchedAt = Instant.parse("2000-01-01T00:00:00Z") + + val underTest = + MeasurementDataTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri"), + fetchedAt, + "some content", + HilltopMeasurementValues(null)) + + // WHEN + val result = underTest.determineNextFetchAt() + + // THEN + result shouldBeAfter Instant.parse("2000-01-01T00:00:00Z") + result shouldBeBefore Instant.parse("2000-01-31T00:00:00Z") + } + } + + @Nested + inner class MeasurementDataLatestTaskMapperTests { + + private fun createRecordForTesting( + fetchedAtString: String, + lastValueAtString: String + ): MeasurementDataLatestTaskMapper { + + val fetchedAt = Instant.parse(fetchedAtString) + val timestampInPlus12Time = + Instant.parse(lastValueAtString) + .atOffset(ZoneOffset.of("+12")) + .toString() + .substring(0, 16) + + return MeasurementDataLatestTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri?foo=bar"), + fetchedAt, + "some content", + HilltopMeasurementValues( + Measurement( + "some site name", + DataSource("some measurement name"), + Data( + "some name", + listOf( + Value(timestampInPlus12Time, "1.0", null), + ))))) + } + + @Test + fun `should return empty list for determine next tasks`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:00:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.buildNewTasksList() + + // THEN + result shouldHaveSize 0 + } + + @Test + fun `should return null for Kafka message when there are no measurements`() { + // GIVEN + val fetchedAt = Instant.parse("2000-01-01T00:00:00Z") + + val underTest = + MeasurementDataTaskMapper( + DB.HilltopSourcesRow( + 1, 1, "http://some.url", DB.HilltopSourceConfig(emptyList(), emptyList())), + URI("http://some.uri"), + fetchedAt, + "some content", + HilltopMeasurementValues(null)) + + // WHEN + val result = underTest.buildKafkaMessage() + + // THEN + result shouldBe null + } + + @Test + fun `should return Kafka message when there are measurements`() { + // GIVEN + val underTest = createRecordForTesting("2000-01-01T00:20:00Z", "2000-01-01T00:00:00Z") + + // WHEN + val result = underTest.buildKafkaMessage() + + // THEN + result shouldBe + HilltopMeasurementsMessage( + 1, + "http://some.uri", + Instant.parse("2000-01-01T00:20:00Z"), + "some site name", + "some measurement name", + YearMonth.of(2000, 1), + "http://some.uri?foo=bar", + "some content", + ) + } + @Test fun `should return next fetch at in the next 30 minutes when last value was recent`() { // GIVEN @@ -551,19 +718,6 @@ class TaskMappersTests { result shouldBeBefore Instant.parse("2000-01-08T00:00:00Z") } - @Test - fun `should return next fetch at in the next week when last value was within a month`() { - // GIVEN - val underTest = createRecordForTesting("2000-01-27T00:00:00Z", "2000-01-01T00:00:00Z") - - // WHEN - val result = underTest.determineNextFetchAt() - - // THEN - result shouldBeAfter Instant.parse("2000-01-27T00:00:00Z") - result shouldBeBefore Instant.parse("2000-02-03T00:00:00Z") - } - @Test fun `should return next fetch at in next month when the last measurement is a long time ago`() { // GIVEN