Skip to content

Commit

Permalink
Add new fetch task that is constantly refreshing the latest data
Browse files Browse the repository at this point in the history
  • Loading branch information
smozely committed Nov 6, 2023
1 parent 0597b8d commit 11d048a
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package nz.govt.eop.hilltop_crawler
import java.util.concurrent.TimeUnit
import nz.govt.eop.hilltop_crawler.api.requests.buildSiteListUrl
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.SITES_LIST
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
Expand All @@ -26,7 +26,9 @@ class CheckForNewSourcesTask(val db: DB) {
val sources = db.listSources()

sources.forEach {
db.createFetchTask(DB.HilltopFetchTaskCreate(it.id, SITES_LIST, buildSiteListUrl(it.htsUrl)))
db.createFetchTask(
DB.HilltopFetchTaskCreate(
it.id, HilltopFetchTaskType.SITES_LIST, buildSiteListUrl(it.htsUrl)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,18 @@ fun buildPastMeasurementsUrl(
.queryParam("to", month.plusMonths(1).atDay(1).atStartOfDay().minusSeconds(1))
.build()
.toASCIIString()

fun buildLatestMeasurementsUrl(
hilltopUrl: String,
siteId: String,
measurementName: String,
): String =
DefaultUriBuilderFactory()
.uriString(hilltopUrl)
.queryParam("Service", "Hilltop")
.queryParam("Request", "GetData")
.queryParam("Site", siteId)
.queryParam("Measurement", measurementName)
.queryParam("TimeInterval", "P35D/now")
.build()
.toASCIIString()
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ import java.sql.ResultSet
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.stereotype.Component

enum class HilltopFetchTaskType {
SITES_LIST,
MEASUREMENTS_LIST,
MEASUREMENT_DATA,
MEASUREMENT_DATA_LATEST,
}

@Component
class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {

Expand All @@ -27,14 +33,14 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {

data class HilltopFetchTaskCreate(
val sourceId: Int,
val requestType: HilltopMessageType,
val requestType: HilltopFetchTaskType,
val baseUrl: String,
)

data class HilltopFetchTaskRow(
val id: Int,
val sourceId: Int,
val requestType: HilltopMessageType,
val requestType: HilltopFetchTaskType,
val nextFetchAt: Instant,
val fetchUri: URI,
val previousDataHash: String?,
Expand Down Expand Up @@ -110,7 +116,7 @@ class DB(val template: JdbcTemplate, val objectMapper: ObjectMapper) {
HilltopFetchTaskRow(
rs.getInt("id"),
rs.getInt("source_id"),
HilltopMessageType.valueOf(rs.getString("request_type")),
HilltopFetchTaskType.valueOf(rs.getString("request_type")),
rs.getTimestamp("next_fetch_at").toInstant(),
URI(rs.getString("fetch_url")),
rs.getString("previous_data_hash"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import nz.govt.eop.hilltop_crawler.api.parsers.HilltopXmlParsers
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchResult
import nz.govt.eop.hilltop_crawler.db.DB.HilltopFetchStatus.*
import nz.govt.eop.hilltop_crawler.fetcher.HilltopMessageType.*
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

Expand Down Expand Up @@ -74,27 +74,34 @@ class FetchTaskProcessor(
val taskMapper =
try {
when (taskToProcess.requestType) {
SITES_LIST ->
HilltopFetchTaskType.SITES_LIST ->
SitesListTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseSitesResponse(xmlContent))
MEASUREMENTS_LIST ->
HilltopFetchTaskType.MEASUREMENTS_LIST ->
MeasurementsListTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementsResponse(xmlContent))
MEASUREMENT_DATA ->
HilltopFetchTaskType.MEASUREMENT_DATA ->
MeasurementDataTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementValuesResponse(xmlContent))
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST ->
MeasurementDataLatestTaskMapper(
source,
taskToProcess.fetchUri,
fetchedAt,
xmlContent,
parsers.parseMeasurementValuesResponse(xmlContent))
}
} catch (e: JsonMappingException) {
logger.warn(e) { "Failed to parse content [${xmlContent}]" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import kotlin.random.Random
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurementValues
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopMeasurements
import nz.govt.eop.hilltop_crawler.api.parsers.HilltopSites
import nz.govt.eop.hilltop_crawler.api.requests.buildLatestMeasurementsUrl
import nz.govt.eop.hilltop_crawler.api.requests.buildMeasurementListUrl
import nz.govt.eop.hilltop_crawler.api.requests.buildPastMeasurementsUrl
import nz.govt.eop.hilltop_crawler.db.DB
import nz.govt.eop.hilltop_crawler.db.HilltopFetchTaskType

/**
* This is an abstract class that represents a mapper for processing a specific type of task. When
Expand All @@ -26,7 +28,7 @@ import nz.govt.eop.hilltop_crawler.db.DB
* do the above per task type.
*/
abstract class TaskMapper<T>(
val type: HilltopMessageType,
val type: HilltopFetchTaskType,
val sourceConfig: DB.HilltopSourcesRow,
val fetchedUri: URI,
val fetchedAt: Instant,
Expand Down Expand Up @@ -60,7 +62,7 @@ class SitesListTaskMapper(
parsedContent: HilltopSites
) :
TaskMapper<HilltopSites>(
HilltopMessageType.SITES_LIST,
HilltopFetchTaskType.SITES_LIST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand All @@ -73,7 +75,7 @@ class SitesListTaskMapper(
.map {
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopMessageType.MEASUREMENTS_LIST,
HilltopFetchTaskType.MEASUREMENTS_LIST,
buildMeasurementListUrl(baseUri, it.name),
)
}
Expand All @@ -88,7 +90,7 @@ class SitesListTaskMapper(
)

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS))
randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
}

class MeasurementsListTaskMapper(
Expand All @@ -99,7 +101,7 @@ class MeasurementsListTaskMapper(
parsedContent: HilltopMeasurements
) :
TaskMapper<HilltopMeasurements>(
HilltopMessageType.MEASUREMENTS_LIST,
HilltopFetchTaskType.MEASUREMENTS_LIST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand Down Expand Up @@ -141,17 +143,28 @@ class MeasurementsListTaskMapper(

val monthSequence =
if (YearMonth.from(toDate).equals(YearMonth.from(LocalDate.now()))) {
generateMonthSequence(fromDate, toDate.plusMonths(1))
generateMonthSequence(fromDate, toDate.minusMonths(1))
} else {
generateMonthSequence(fromDate, toDate)
}

monthSequence.map { yearMonth ->
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopMessageType.MEASUREMENT_DATA,
buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth),
)
val monthlyTasks =
monthSequence.map { yearMonth ->
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopFetchTaskType.MEASUREMENT_DATA,
buildPastMeasurementsUrl(baseUri, it.siteName, requestAs, yearMonth),
)
}

return@flatMap if (YearMonth.from(toDate).equals(YearMonth.from(LocalDate.now()))) {
monthlyTasks.plus(
DB.HilltopFetchTaskCreate(
sourceConfig.id,
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST,
buildLatestMeasurementsUrl(baseUri, it.siteName, requestAs)))
} else {
monthlyTasks
}
}

Expand All @@ -166,7 +179,7 @@ class MeasurementsListTaskMapper(
)

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt.plus(1, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
randomTimeBetween(fetchedAt.plus(10, ChronoUnit.DAYS), fetchedAt.plus(20, ChronoUnit.DAYS))
}

class MeasurementDataTaskMapper(
Expand All @@ -177,7 +190,43 @@ class MeasurementDataTaskMapper(
parsedContent: HilltopMeasurementValues
) :
TaskMapper<HilltopMeasurementValues>(
HilltopMessageType.MEASUREMENT_DATA,
HilltopFetchTaskType.MEASUREMENT_DATA,
sourceConfig,
fetchedUri,
fetchedAt,
content,
parsedContent) {
override fun buildNewTasksList(): List<DB.HilltopFetchTaskCreate> = emptyList()

override fun buildKafkaMessage(): HilltopMessage? =
if (parsedContent.measurement != null) {
HilltopMeasurementsMessage(
sourceConfig.councilId,
baseUri,
fetchedAt,
parsedContent.measurement.siteName,
parsedContent.measurement.dataSource.measurementName,
parsedContent.measurement.data.values.first().timestamp.let { YearMonth.from(it) },
fetchedUri.toASCIIString(),
content,
)
} else {
null
}

override fun determineNextFetchAt(): Instant =
randomTimeBetween(fetchedAt.plus(20, ChronoUnit.DAYS), fetchedAt.plus(30, ChronoUnit.DAYS))
}

class MeasurementDataLatestTaskMapper(
sourceConfig: DB.HilltopSourcesRow,
fetchedUri: URI,
fetchedAt: Instant,
content: String,
parsedContent: HilltopMeasurementValues
) :
TaskMapper<HilltopMeasurementValues>(
HilltopFetchTaskType.MEASUREMENT_DATA_LATEST,
sourceConfig,
fetchedUri,
fetchedAt,
Expand Down Expand Up @@ -213,7 +262,9 @@ class MeasurementDataTaskMapper(
* be called when getting historical data as well)
*/
override fun determineNextFetchAt(): Instant {

val lastValueAt = parsedContent.measurement?.data?.values?.lastOrNull()?.timestamp?.toInstant()

return if (lastValueAt != null && lastValueAt > fetchedAt.minus(1, ChronoUnit.HOURS)) {
randomTimeBetween(
maxOf(lastValueAt.plus(15, ChronoUnit.MINUTES), fetchedAt),
Expand All @@ -222,9 +273,6 @@ class MeasurementDataTaskMapper(
randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.HOURS))
} else if (lastValueAt != null && lastValueAt > fetchedAt.minus(7, ChronoUnit.DAYS)) {
randomTimeBetween(fetchedAt, fetchedAt.plus(1, ChronoUnit.DAYS))
// Just approx to a month
} else if (lastValueAt != null && lastValueAt > fetchedAt.minus(28, ChronoUnit.DAYS)) {
randomTimeBetween(fetchedAt, fetchedAt.plus(7, ChronoUnit.DAYS))
} else { // Any older or historical data, will rarely change, so we can fetch it less often.
randomTimeBetween(fetchedAt, fetchedAt.plus(30, ChronoUnit.DAYS))
}
Expand Down
Loading

0 comments on commit 11d048a

Please sign in to comment.