Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: useSKTimestamps #46

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 54 additions & 54 deletions src/influx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
* limitations under the License.
*/

import { SKContext } from '@chacal/signalk-ts'
import { Context, Path, PathValue, SourceRef } from '@signalk/server-api'
import { Context, Path, PathValue, SourceRef, Timestamp } from '@signalk/server-api'
import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client'
import { BucketsAPI, DbrpsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis'
import { InfluxDB as InfluxV1 } from 'influx'
Expand Down Expand Up @@ -211,19 +210,18 @@ export class SKInflux {
context: Context,
isSelf: boolean,
sourceRef: SourceRef,
_timestamp: Date,
timestamp: Timestamp | undefined,
pathValue: PathValue,
now: number,
) {
const timestamp = this.useSKTimestamp ? _timestamp : new Date()
if (this.isIgnored(pathValue.path, sourceRef)) {
return
}
if (!this.shouldStoreNow(context, pathValue.path, sourceRef, now)) {
return
}
if (!this.onlySelf || isSelf) {
const point = toPoint(context, isSelf, sourceRef, timestamp, pathValue, this.logging.debug)
const point = this.toPoint(context, isSelf, sourceRef, timestamp, pathValue, this.logging.debug)
if (point) {
this.writeApi.writePoint(point)
this.updateLastWritten(context, pathValue.path, sourceRef, now)
Expand Down Expand Up @@ -272,7 +270,6 @@ export class SKInflux {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
getSelfValues(params: Omit<QueryParams, 'context'>): Promise<Array<any>> {
const query = paramsToQuery(this.bucket, params)
// console.log(query)
return this.queryApi.collectRows(query)
}

Expand Down Expand Up @@ -308,59 +305,62 @@ export class SKInflux {
return false
}
}
}

const toPoint = (
context: SKContext,
isSelf: boolean,
source: string,
timestamp: Date,
pathValue: PathValue,
debug: (s: string) => void,
) => {
const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source).timestamp(timestamp)
if (isSelf) {
point.tag(SELF_TAG_NAME, SELF_TAG_VALUE)
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value = pathValue.value as any
if (
pathValue.path === 'navigation.position' &&
typeof value === 'object' &&
value !== null &&
value.latitude !== null &&
value.longitude !== null &&
!isNaN(value.latitude) &&
!isNaN(value.longitude)
toPoint(
context: Context,
isSelf: boolean,
source: string,
timestamp: Timestamp | undefined,
pathValue: PathValue,
debug: (s: string) => void,
) {
point.floatField('lat', value.latitude)
point.floatField('lon', value.longitude)
point.tag('s2_cell_id', posToS2CellId(value))
} else {
const valueType = typeFor(pathValue)
if (value === null) {
return
const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source)
if (this.useSKTimestamp) {
point.timestamp(timestamp !== undefined ? new Date(timestamp) : new Date())
}
try {
switch (valueType) {
case JsValueType.number:
point.floatField('value', value)
break
case JsValueType.string:
point.stringField('value', value)
break
case JsValueType.boolean:
point.booleanField('value', value)
break
case JsValueType.object:
point.stringField('value', JSON.stringify(value))
if (isSelf) {
point.tag(SELF_TAG_NAME, SELF_TAG_VALUE)
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value = pathValue.value as any
if (
pathValue.path === 'navigation.position' &&
typeof value === 'object' &&
value !== null &&
value.latitude !== null &&
value.longitude !== null &&
!isNaN(value.latitude) &&
!isNaN(value.longitude)
) {
point.floatField('lat', value.latitude)
point.floatField('lon', value.longitude)
point.tag('s2_cell_id', posToS2CellId(value))
} else {
const valueType = typeFor(pathValue)
if (value === null) {
return
}
try {
switch (valueType) {
case JsValueType.number:
point.floatField('value', value)
break
case JsValueType.string:
point.stringField('value', value)
break
case JsValueType.boolean:
point.booleanField('value', value)
break
case JsValueType.object:
point.stringField('value', JSON.stringify(value))
}
} catch (e) {
debug(`Error creating point ${pathValue.path}:${pathValue.value} => ${valueType}`)
return undefined
}
} catch (e) {
debug(`Error creating point ${pathValue.path}:${pathValue.value} => ${valueType}`)
return undefined
}
return point
}
return point
}

const typeFor = (pathValue: PathValue): JsValueType => {
Expand All @@ -385,7 +385,7 @@ const paramsToQuery = (bucket: string, params: PartialBy<QueryParams, 'context'>
: `and r.${SELF_TAG_NAME} == "${SELF_TAG_VALUE}"`
return `
from(bucket: "${bucket}")
|> range(start: -1y)
|> range(start: -10y)
|> filter(fn: (r) => r["_measurement"] == "${params.paths[0]}" ${contextTagClause})
`
}
Expand Down
62 changes: 62 additions & 0 deletions src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,68 @@ describe('Plugin', () => {
it('Uses data types from schema, , initial string value', async () =>
assertNumericAfterFirstOtherValue('first string value'))

it('Uses delta timestamp by configuration', async () => {
plugin.stop()
plugin = InfluxPluginFactory(app)
await plugin.start({
influxes: [
{
url: `http://${INFLUX_HOST}:8086`,
token: 'signalk_token',
org: 'signalk_org',
bucket,
onlySelf: false,
writeOptions: {
batchSize: 1,
flushInterval: 10,
maxRetries: 1,
},
ignoredPaths: [],
ignoredSources: [],
useSKTimestamp: true, // <===============
resolution: 0,
},
],
})
const FIXED_TIMESTAMP = '2023-08-17T17:01:00Z'
app.signalk.emit('delta', {
context: TESTCONTEXT,
updates: [
{
$source: TESTSOURCE,
timestamp: new Date(FIXED_TIMESTAMP),
values: [
{
path: TESTPATHNUMERIC,
value: 222,
},
],
},
],
})

return retry(
() =>
plugin.flush().then(() =>
plugin
.getSelfValues({
paths: [TESTPATHNUMERIC],
influxIndex: 0,
})
.then((rows) => {
expect(rows.length).to.equal(1)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((rows as any)[0]._time).to.equal(FIXED_TIMESTAMP)
}),
),
[null],
{
retriesMax: 5,
interval: 50,
},
)
})

const assertNumericAfterFirstOtherValue = (firstValue: string | null) => {
const NUMERICSCHEMAPATH = 'navigation.headingTrue'
;[firstValue, 3.14, null, 'last string value'].forEach((value) =>
Expand Down
3 changes: 1 addition & 2 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
*/

import { SKInflux, SKInfluxConfig } from './influx'
import { SKDelta } from '@chacal/signalk-ts'
import { EventEmitter } from 'stream'
import { registerHistoryApiRoute } from './HistoryAPI'
import { IRouter } from 'express'
import { Context, PathValue, SourceRef } from '@signalk/server-api'
import { Context, Delta, PathValue, SourceRef, ValuesDelta } from '@signalk/server-api'

// eslint-disable-next-line @typescript-eslint/no-var-requires
const packageInfo = require('../package.json')
Expand Down
Loading