Logstash is a lightweight, open source data collection engine organized as simple pipeline with a large number of plugins. It has input plugins for Netflow, SNMP, collectd, syslog, etc.
Logstash is used as to collect, enrich and transport data from multiple sources into PNDA.
At a very high level,
- Kafka Producer will use this codec to avro encode input events.
- Kafka Consumer will use this codec to avro decode events read from the data bus.
For a description of the PNDA Avro encapsulation please read the Data Preparation Guide.
-
Install the latest stable version of Logstash; Logstash 5.3.0 at the time of writing. The procedure is in the Logstash Guide
-
Install the PNDA Avro codec by reading the installation instructions
As an example, we will see how to read TCP data and send it to PNDA.
Note: This assumes you have created a kafka topic named test
and you are ready to direct data towards this topic.
-
Get the PNDA Avro schema
-
Create a logstash configuration file:
input {
tcp {
port => 20518
add_field => [ "source", "syslog" ]
}
}
filter {
mutate {
rename => { "message" => "rawdata" } # Put the content of the message to the PNDA Avro 'rawdata' field
ruby {
# Convert the Logstash timestamp to a milliseconds timestamp integer
# You can use whatever makes sense to you as long as the timestamp is a valid timestamp integer in milliseconds
code => "event.set('timestamp', (event.get('@timestamp').to_f * 1000).to_f * 1000).to_i)"
}
}
}
output {
kafka {
codec => pnda-avro { schema_uri => "/opt/pnda/pnda.avsc" } # The PNDA Avro schema to use
bootstrap_servers => "localhost:9092"
topic_id => 'test'
compression_type => "none" # "none", "gzip", "snappy", "lz4"
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
}
}
Then run logstash:
bin/logstash -f logstash.conf
In another terminal:
echo '{"router: Jun 19 09:56:14.258 EDT: %LINEPROTO-5-UPDOWN: Line protocol on Interface Tunnel1, changed state to down"}' | nc localhost 20518
Start kafka consumer (if you are using local kafka cluster):
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
You should be able to see the syslog event we just sent as a binary PNDA Avro serialized message.
Check the Logstash Kafka output plugin documentation and the security part on the PNDA Guide for more information.
As an example a Logstash output section would look like this:
output {
kafka {
codec => pnda-avro { schema_uri => "/opt/pnda/pnda.avsc" } # The PNDA Avro schema to use
bootstrap_servers => "localhost:9092"
topic_id => 'test'
compression_type => "none" # "none", "gzip", "snappy", "lz4"
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
ssl => true
ssl_keystore_location => "/opt/pnda/server.keystore.jks"
ssl_keystore_password => "test1234"
ssl_truststore_location => "/opt/pnda/server.truststore.jks"
ssl_truststore_password => "test1234"
}
}
Let's understand a bit more about how input fields are mapped to fields expected by the codec.
In Logstash world, the message field is like a default field. It's where most input plugins place the payload that they receive from the network, read from a file, etc.
To be correctly encoded, each Logstash event needs to have the following fields:
The way our codec works is to map logstash event fields to the avro schema:
- source
- timestamp
- rawdata
Note : @timestamp
and host
fields are likely generated by logstash itself and can be used to create the timestamp
field.
Note that source
and rawdata
are not default fields in logstash event. They need to be set with filters or by an input plugin.
In simple terms, we expect the Logstash event after it's been through the filter section to contain the fields source
, timestamp
and rawdata
. How they are set is up to the integrator.
For example, let's take the file
input plugin. Here's a sample config:
input {
file {
path => "/var/log/*"
add_field => [ "source", "system_logs" ]
}
}
Here's a sample line:
Mar 23 13:17:01 localhost CRON[25941]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)
If you want to send the whole line as is, you could do that.
The output from logstash might look something like this:
{
"timestamp" => 1458740254000,
"source" => "system_logs",
"rawdata" => "Mar 23 13:17:01 localhost CRON[25941]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)"
}
These fields are sent to the Kafka output plugin which will use the PNDA Avro codec to serialize them according to the PNDA Avro Schema.
You can perform quite complex logic inside a logstash filter to do whatever you need to do.
For more information, see the Logstash documentation.