-
Notifications
You must be signed in to change notification settings - Fork 2
Adding a new Sensor or Device
This page explains how to add an implementation for a new Sensor/Device type to the RADAR-IoT framework. Please read the README of the framework first to familiarise yourself with the concepts discussed here.
This can be divided into 3 simple steps -
- Define the
Schema
of the Sensor output - Create an implementation of
Sensor
to retrieve the data from the sensor - Create a Converter so that the data can be consumed and used by the
DataConsumers
For this example, we will be creating a mock sensor
so that we can test it easily on our local IDEs. The mock sensor will contain a value
attribute and a time
attribute. The value will be a random value to simulate a sensor and the time will the time when the value is generated.
This is required for strong typing and data validation. The schema can be added according to the type of schema retriever that you are using. For this example, we will use the GithubAvroSchemaRetriever
.
So now you just need to add the schema to a repository on Github and it will be read by the framework. We have added ours in the RADAR-Schemas repo in the sensors
branch. After this, our config for the schema retriever block will look like -
schema_retriever:
module: 'commons.schema'
class: 'GithubAvroSchemaRetriever'
args:
repo_owner: 'RADAR-base'
repo_name: 'RADAR-Schemas'
branch: 'sensors'
basepath: 'commons/iot/sensor'
extension: '.avsc'
token: '*******'
Note: You can get a Personal Access Token from GitHub to increase the API limits and add it in the token
key above.
We will now create the actual implementation that will get the data from the sensor. For this, we need to extend the Sensor
abstract class and implement the get_measurement
function.
For our mock sensor, this will look like-
mock_sensor.py
import random
from datetime import datetime
from commons.data import Response, IoTError, ErrorCode
from sensors import Sensor
class MockSensor(Sensor):
def __init__(self, name, topic, poll_freq_ms, flush_size, flush_after_s):
super().__init__(name, topic, poll_freq_ms, flush_size, flush_after_s)
self.global_counter = 0
def get_measurement(self):
self.global_counter += 1
if self.global_counter % 10 == 0:
return Response(response=None, errors=[
IoTError('MockError', ErrorCode.STATUS_OFF, 'The MockSensor mocks an error every 10 iterations',
'blah->nooooo->save me->dead')])
else:
return Response({'time': datetime.now().timestamp(), 'value': random.random() * 1000}, errors=None)
A few things to note from the above impl -
- The
MockSensor
class must extend theSensor
class or a sub-class ofSensor
class. - The
__init__
method (or the constructor) must call the super class' (Sensor class) constructor with all the arguments i.esuper().__init__(name, topic, poll_freq_ms, flush_size, flush_after_s)
. - We are also running a
global_counter
which counts the number of iterations and sends an Error response every 10th iteration. - The
get_measurement
function returns aResponse
object from thecommons.data
module. It can contain a dictionary for actual sensor response and a list ofError
objects. - There is a custom error class
IoTError
created specifically for any errors occurred during the normal functioning of the framework.
After the implementation, you can now add the sensor to the config and start collecting data -
- name: "mock_sensor"
# Name of your python module which contains the sensor class
module: "test.mock"
# Name of the class of the sensor in the module
class: "MockSensor"
# topic/channel to publish the data to in pub/sub paradigm
publishing_topic: "data-stream/sensors/mock"
# polling frequency in milliseconds
poll_frequency_ms: 1000
# Flush size for flushing the records
flush_size: 10
# Flush after [value] seconds if the flush size is not reached
flush_after_s: 1000
Note: The name of the sensor must match the name of the schema file you created in step 1.
The pre-requisite for this step is having Docker and docker-compose installed on your system. For installing, please take a look at the official docs for docker and docker-compose.
The next intermediate step is to check if we are in fact able to send data to the pub/sub system. For this, first, start redis
using docker by running the following in the project's root directory -
docker-compose -f docker/redis.yml up -d
After this, run the application using
python3 main.py
Note: Please make sure you have Python 3.7+ installed on your system or virtual env.
Now, you can see the data in Redis by running -
docker exec -it docker_redis_1 redis-cli subscribe data-stream/sensors/mock
You should see the data being published to the mock channel. This will be every 10 messages or 1000 seconds as we specified in the config of the sensor. Now we need to consume this data and do something with it. So we will add a converter for reading the data in the data consumer and uploading it to influx-DB and displaying it on Grafana dashboards.
This part will be written in kotlin since our data-consumer part is based on kotlin. But you are free to use any language supported that has a Redis client(available for most programming languages). This will also enable you to use any existing libraries for your platform in other languages.
We will write a converter which will convert the data received from the channel or topic in Redis and use it to convert the data to an influx DB Point
object.
For this cd into the data consumer directory (data/kotlin
) and open the project in your favourite Java/Kotlin IDE (IntelliJ IDEA is the standard one). Add a new file in the package data/kotlin/data-uploader/src/main/kotlin/org/radarbase/iot/converter/influxdb
named MockSensorInfluxDbConverter.kt
and add the following lines -
MockSensorInfluxDbConverter.kt
package org.radarbase.iot.converter.influxdb
import com.fasterxml.jackson.core.type.TypeReference
import org.influxdb.dto.Point
import org.radarbase.iot.commons.util.Parser
import org.radarbase.iot.converter.influxdb.InfluxDbConverter
import org.radarbase.iot.converter.messageparser.JsonMessageParser
import org.radarbase.iot.sensor.MockSensor
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
class MockSensorInfluxDbConverter(
private val measurementName: String = "mockSensor",
private val parser: Parser<String,
List<MockSensor>> = JsonMessageParser(typeReference)
) : InfluxDbConverter {
override fun convert(messages: List<String>): List<Point> {
return messages.map { message ->
parser.parse(message).map {
Point.measurement(measurementName)
.time(it.getTime().toLong(), TimeUnit.SECONDS)
.addField("value", it.getValue())
.tag(InfluxDbConverter.genericKeyMap)
.build()
}
}.flatten()
}
companion object {
private val logger =
LoggerFactory.getLogger(MockSensorInfluxDbConverter::class.java)
private val typeReference = object : TypeReference<List<MockSensor>>() {}
}
}
Note: The MockSensor
class is an Avro class which is generated using the schema specified in step one. This can be done using the Avro tools as mentioned in this tutorial and then you need to place it in the classpath. If using the RADAR-Schemas repo this can be easily generated using the build command in java-sdk
path. It will then be generated as radar-schemas-commons
library which can be directly included in build.gradle
dependencies.
In the above implementation, we are reading a serialised String (which contains a list of sensor Response
s) and deserializing the JSON and then creating influxDb Point
objects. We are also adding the keys (projectId
, subjectId
and sourceId
) as tags so we can differentiate between data from different users/deployments.
You need to start influxdb and grafana so you can view the data. For this, you can run them on you local computer for testing. Run the following -
docker run -d -p 8086:8086 \
-v influxdb:/var/lib/influxdb \
influxdb
and
docker run -d -p 3000:3000 grafana/grafana
Now we will configure the data-uploader module to consume the data from the mock sensor. Add the following to the file data/kotlin/data-uploader/src/main/resources/radar_iot_config.yaml
-
radarConfig:
projectId: "radar"
userId: "sub-1"
sourceId: "03d28e5c-e005-46d4-a9b3-279c27fbbc83"
# If a converter is not specified for a particular consumer for a sensor,
# then the data from the sensor will not be forwarded to that consumer for processing
sensorConfigs:
- sensorName: "mock"
inputTopic: "data-stream/sensors/mock"
outputTopic: "mock"
converterClasses:
- consumerName: "influx_db"
converterClass: "org.radarbase.iot.converter.influxdb.MockSensorInfluxDbConverter"
dataConsumerConfigs:
- consumerClass: "org.radarbase.iot.consumer.InfluxDbDataConsumer"
maxCacheSize: "1000"
uploadIntervalSeconds: "10"
consumerName: "influx_db"
influxDbConfig:
url: "http://localhost:8086"
username: "root"
password: "root"
dbName: "radarIot"
retentionPolicyName: "radarIotRetentionPolicy"
# Should be at least 1h
retentionPolicyDuration: "1h"
retentionPolicyReplicationFactor: 1
Now, run the application using Gradle -
./gradlew run
This will enable sending data to Influx Db. Now we will set up Grafana to view this data.
Please set up influx DB in Grafana using the tutorial here. Please note to use the same values for configuring the database as mentioned in the config file above under the influxDbConfig
tag.
The measurement name in our case will be mockSensor
as mentioned in the MockSensorInfluxDbConverter
constructor.
You should be able to see the data in a nice graph. Note: For production deployments, it is recommended to deploy influxDB and Grafana on a VM possibly behind a reverse proxy and use proper authentication instead of provided defaults.
You can add a converter for each type of data consumer that you want. For example, If also using the RADAR-base platform as the backend (which is based on Apache kafka and Confluent Rest Proxy, So should also work with Kafka directly), You can create an Avro Converter for the data, and specify RestProxyDataConsumer
as the consumer.
So the converter will look something like this -
data/kotlin/data-uploader/src/main/kotlin/org/radarbase/iot/converter/avro/MockSensorAvroConverter.kt
package org.radarbase.iot.converter.avro.grovepi
import com.fasterxml.jackson.core.type.TypeReference
import org.radarbase.data.AvroRecordData
import org.radarbase.data.RecordData
import org.radarbase.iot.commons.util.Parser
import org.radarbase.iot.converter.avro.AvroConverter
import org.radarbase.iot.converter.messageparser.JsonMessageParser
import org.radarbase.iot.sensor.MockSensor
import org.radarbase.topic.AvroTopic
import org.radarcns.kafka.ObservationKey
import org.slf4j.LoggerFactory
class MockSensorAvroConverter(
private val topicName: String = "radar_iot_mock_sensor",
private val messageParser: Parser<String, List<MockSensor>> =
JsonMessageParser(typeReference)
) :
AvroConverter<ObservationKey, GrovePiAirQualitySensor> {
override fun getAvroTopic(): AvroTopic<ObservationKey, MockSensor> =
AvroTopic(
topicName, ObservationKey.getClassSchema(), MockSensor.getClassSchema(),
ObservationKey::class.java, MockSensor::class.java
)
override fun convert(messages: List<String>):
RecordData<ObservationKey, MockSensor> {
val values: List<MockSensor> = messages.map {
logger.debug("Parsing message: $it")
messageParser.parse(it)
}.flatten()
logger.debug("Avro Values: ${values.map { it.toString() }}")
return AvroRecordData<ObservationKey, MockSensor>(
getAvroTopic(),
AvroConverter.genericObservationKey,
values
)
}
companion object {
private val logger = LoggerFactory.getLogger(MockSensorAvroConverter::class.java)
private val typeReference = object : TypeReference<List<MockSensor>>() {}
}
}
Then add it to the configuration file. So the final config file data/kotlin/data-uploader/src/main/resources/radar_iot_config.yaml
will look like -
# make sure these values are exactly as in Management portal for using authorisation
radarConfig:
projectId: "radar"
userId: "sub-1"
sourceId: "03d28e5c-e005-46d4-a9b3-279c27fbbc83"
schemaRegistryUrl: "http://localhost:8084/"
kafkaUrl: "http://localhost:8090/radar-gateway/"
# The following values are required for authentication with the RADAR-base platform. Remove these if using plain Rest Proxy.
baseUrl: "http://localhost:8081"
oAuthClientId: "radar_iot"
oAuthClientSecret: "secret"
metaToken: "dlsLwIw0E1cP"
dataConsumerConfigs:
- consumerClass: "org.radarbase.iot.consumer.RestProxyDataConsumer"
maxCacheSize: "1000"
uploadIntervalSeconds: "10"
consumerName: "rest_proxy"
- consumerClass: "org.radarbase.iot.consumer.InfluxDbDataConsumer"
maxCacheSize: "1000"
uploadIntervalSeconds: "10"
consumerName: "influx_db"
influxDbConfig:
url: "http://10.200.107.10:8086"
username: "root"
password: "root"
dbName: "radarIot"
retentionPolicyName: "radarIotRetentionPolicy"
# Should be at least 1h
retentionPolicyDuration: "1h"
retentionPolicyReplicationFactor: 1
# If a converter is not specified for a particular consumer for a sensor,
# then the data from the sensor will not be forwarded to that consumer for processing
sensorConfigs:
- sensorName: "mock"
inputTopic: "data-stream/sensors/mock"
outputTopic: "mock"
converterClasses:
- consumerName: "rest_proxy"
converterClass: "org.radarbase.iot.converter.avro.MockSensorAvroConverter"
- consumerName: "influx_db"
converterClass: "org.radarbase.iot.converter.influxdb.coralenviro.CoralEnviroHumidityInfluxDbConverter"
Note that the above example was just an illustration of the process required for adding a new sensor. The actual steps will vary from implementation to implementation and you may need to adapt some parts of this tutorial. If you have any trouble, please feel free to open an issue.