Filter and transform messages from one Kafka topic to another.
______________
| Rules Server |
|______________|
||
________\/_______
______________ | [http cache] | ______________
| Source Topic |==> | Kafka Transform |==> | Sink Topic |
|______________| |_________________| |______________|
- Filter topics
- Adapt formats
- Dynamically reload rules
- Scale by just launching more containers
- Multiply messages (produce multiple messages from a single message)
For each message on the source topic Kafka Transform will:
- Request the current rules to the rules url (can be cached) or the env variable
- For each rule:
- Evaluate the query over the message
- If matches, apply the template of the rule
- Send the result of the template to the sink topic
Note:
- Every message inside the source topic must be a valid Json value (if not it will produce a deserialization error, see below)
- Every message that is not a Json Object will be discarded (non objects are incompatible with the query language, see Query section)
- Keys will be copied with no change and can have any value
docker run -d \
-e rules.url=http://localhost/rules.json \
-e kafka.application.id=my-transform \
-e kafka.bootstrap.servers=localhost:9092 \
-e kafka.topic.source=sourceTopic \
-e kafka.topic.sink=sinkTopic \
--name my-transform \
socialmetrix/kafka-transform
Using remote rules service
docker service create \
-e rules.url=http://localhost/rules.json \
-e kafka.application.id=my-transform \
-e kafka.bootstrap.servers=localhost:9092 \
-e kafka.topic.source=sourceTopic \
-e kafka.topic.sink=sinkTopic \
--name my-transform \
socialmetrix/kafka-transform
Using local rules
docker service create \
-e rules.type=local \
-e JsonTemplate.delimiters.start='<<' \
-e JsonTemplate.delimiters.end='>>' \
-e rules.local='[{"query":"value:9","template":"<<$this>>"}]' \
-e kafka.application.id=qa-lat-opinions \
-e kafka.bootstrap.servers=localhost:9092 \
-e kafka.topic.source=sourceTopic \
-e kafka.topic.sink=sinkTopic \
--name my-transform \
socialmetrix/kafka-transform
Note: json template delimiters must change to avoid conflicts with docker env variable template syntax.
You can scale the service just by launching more processes.
Please note that the actual scale limit is the number of partitions the source topic has.
For example: if you have 3 partitions you will get the best throughput using 3 Kafka Transform processes (each will consume a single partition of the topic).
Configuration parameters can be defined with environment variables.
Look on the following sections for the specific parameter for each component.
The default configuration is defined in src/main/resources/application.conf
.
Under the hood the application uses Kafka Streams client 1.0 (retro-compatible with previous server versions) to consume and produce messages to and from Kafka topics.
It transform the messages with stateless transformations and filtering without modifying the original key of the messages.
You can configure every property of the Kafka Streams client using the prefix "kafka." on the environment variables. See More
kafka.application.id = ???
kafka.bootstrap.servers = ???
kafka.topic.source = ???
kafka.topic.sink = ???
Rules are Json objects provided by an HTTP GET request with the following format:
{
"query": "lucene query",
"template": {}
}
Example:
{
"query": "value:9 OR value:15",
"template": {
"display": "{{value}} is 9 or 15",
"value": "{{value}}"
}
}
The service must return a Json Array of rule objects containing the string field query
and the field template
(it can be any Json Template valid expression, see below).
Only objects with name = John
will be copied to de sink topic.
{
"query": "name:john",
"template": "{{$this}}"
}
All messages will be transformed to an object having a single field mixing the user id and the address number on the value.
{
"query": "*:*",
"template": {
"user-{{id}}": "{{address.number}}"
}
}
You can have rules hosted in an external HTTP server or defined locally using rules.local
env variable.
For remote rules we use play-ws as HTTP client.
The rules service must be able to listen to GET requests to rules.url
and return a Json Array with Rule objects as defined before.
By default the rules.type
is remote
. If you want to use local rules, you must change rules.type
to local
and define rules.local
with a json array of rules.
The url of the rules is a required for remote
rules:
rules.url=http://rules.service.com/rules
Also, you can override the default values of the following reference.conf files using env variables:
Every request is retried with exponential backoff. To change the retry defaults use:
retry.maxExecutions=10
retry.baseWait="50 millis"
retry.maxWait="5 seconds"
To configure the maximum time of a single request:
sync.timeout="5 minutes"
The HTTP client will try to cache the response of the rules service to avoid calling it on every message.
For example, if you use on the service:
Cache-Control: max-age=20, stale-while-revalidate=10, stale-if-error=600
This will only call the server once every 20 seconds, reloading the current rules on a background thread (to avoid stopping the processing) and allowing to continue the usage of the current rules after 10 minutes if the server is offline or returning 5xx responses.
Due to a current limitation on the HTTP client, if the rule service is down the cached rules will continue to be returned without error even after the stale-if-error
time.
To minimize this effect there is a configuration key to clean the cache if it was not updated on a defined time period:
play.ws.cache.expire="30 minutes"
You can use any time unit defined here: Time Units
Queries are Lucene standard query expressions matched against the json object found on each message.
You can use all expressions supported by the Lucene default query parser.
Every value field is temporarily "indexed" using a MemoryIndex. If an object or array is empty, it doesn't get indexed.
Warning: You can't use expressions without a field (there is no default field defined). So, in order to match multiple terms against a single field you could:
- Enclose the terms with
()
:fullName:(John Doe)
- Repeat the field name for each term:
fullName:John fullName:Doe
Warning: To allow user input as plain text for a value, you must escape it with QueryParserBase.escape
. If not, it could be interpreted as operators or other types of expressions.
You can use:
- Match mandatory or optionally:
AND
,OR
(between query expressions) - Negate a term:
-
(in front of a query expression)
The default operator is AND
. This means that:
firstName:John lastName:Doe
is the same as firstName:John AND lastName:Doe
.
Nested fields are flattened using LuceneMatcher.fieldSeparator
config (defaults to .
).
Fields of objects inside an array are all indexed on the same field. Because of this, multiple values will match at the same time.
{
"users": [
{ "name": "John", "age": 22 },
{ "name": "Jane", "age": 33 },
{ "name": "Other", "age": 44 }
]
}
So, the following queries will match:
users.name:John
users.name:Jane
users.name:John AND users.name:Jane
users.name:John AND users.age:22
users.name:John AND users.age:33
(there is no relation between fields of an array)
Note: Order of terms inside an array is important, the terms are indexed as if all where concatenated one after the other in order.
- This will match:
users.name:"John Jane Other"
- This won't match:
users.name:"Other John Jane"
We need to differentiate the types inside the json message and the types inside the query values.
For query values, type inference is applied to see if they are Integer Numbers, Floating Point Numbers or String.
Single value:
- If the value is quoted, its a terms query without change
- If the value can be parsed as a Long its converted to a query using LongPoint
- If the value can be parsed as a Double its converted to a query using DoublePoint
- If none of the above match, its a terms query without change
Multiple values for a single field ("IN" query):
- Doing
field:(value1 value2)
is the same asfield:value1 field:value2
(all values are required /AND
mixed). - If you wan't something similar to an "IN" query:
field:(value1 OR value2)
.
Ranges:
- If one of the ends is a wildcard
*
- If one of the ends can be parsed as Long, its converted to a range query using
LongPoint
- If one of the ends can be parsed as Double, its converted to a range query using
DoublePoint
- If one of the ends can be parsed as Long, its converted to a range query using
- If both ends are defined
- If both ends can be parsed as Long, its converted to a range query using
LongPoint
. - If both ends can be parsed as Double, its converted to a range query using
DoublePoint
.
- If both ends can be parsed as Long, its converted to a range query using
- If none of the above match, its a terms range query without change
Note: Ranges can be inclusive [X TO X]
and exclusive {X TO X}
. Also, both types can be combines [0 TO 123}
.
Warning: Ranges currently doesn't support quoting to provide type inference.
Warning: You can't match all values (numericField:*
or numericField:[* TO *]
) on a numeric field (the query is interpreted as a terms query because there is no possible inference without data or schema).
Although you can create range for numeric fields if one of both ends is defined (e.g. numericField:[* TO 123]
).
Warning: Integer queries won't match floating point values and vice versa (on single values and rages).
So, if the values are floating point you must get sure to use at least a .
to induce the inference (e.g. 7.0
).
This expression will match any message: *:*
.
Json message booleans are converted to the strings true
and false
.
Use those values to match on the query.
Json message integer values are indexed using LongPoint
Json message floating point values are indexed using DoublePoint
For both types of numbers, remember to always escape the -
(minus) sign with \
. If not escaped, the term will be negated.
- Json message and query values are analyzed with:
- Query values are only parsed as string terms if they can't be parsed as Long or Doubles (see warning below).
This implies that matching is case insensitive, tokens are split using the standard tokenizer and non ascii characters will get translated to its ascii representation (e.g. à
-> a
).
Warning: many unicode characters will be lost.
Warning: if the query value is numeric, it will get parsed as a LongPoint or DoublePoint.
In that case, matching against a string field will always fail. To fix it you must escape the query number using double quotes "
.
Json Template uses Json to transform Json. The final value will be the result of replacing every placeholder or operation with the corresponding values of the message data. The rest of the template will be copied as is.
A valid template is any valid json value (even single values like strings or numbers).
Delimiters and operations must be defined between "{{" a "}}". The parts of the template without delimiters will be constant and copied without any change.
You can customize the delimiters and other parts of the syntax with the following keys:
JsonTemplate.delimiters.start = "{{"
JsonTemplate.delimiters.end = "}}"
JsonTemplate.fieldSeparator = "."
JsonTemplate.metaPrefix = "$"
JsonTemplate.thisIdentifier = "this"
JsonTemplate.commandPrefix = "#"
Note: Currently, you can't escape delimiters.
Fields can be nested with "." to allow access nested objects.
You can't access fields inside a nested array (the template won't be able to chose which one to chose). To access array elements you must use map
or flatmap
.
$this
: to copy, interpolate, etc. the current json value
If the templates expects a field missing on the data or the operations expects other data type than the provided, an exception will be thrown.
This prevent the processing of invalid data. Please check the Error Handling section for more information.
Strings can be interpolated, even inside field names.
Use {{fieldName}}
inside any string in your template to replace it with the value (converted to a string).
Warning: Only non-container nodes (any value besides array or objects) can be interpolated, they will get translated with the default toString implementation.
Data
{
"id": 1,
"name": "Smith"
}
Template
{ "user-{{id}}": "Name: {{name}}" }
Result
{ "user-1": "Name: Smith" }
You can copy a part of the tree using a string interpolated with a single variable and no text around it: "{{fieldName}}"
.
Note that the data type and the nested nodes will remain untouched.
Note: Field names will always only be interpolated (because they need to be a string).
Data
{
"id": 1,
"user": {
"firstName": "John",
"lastName": "Smith"
}
}
Template
"{{user}}"
Result
{
"firstName": "John",
"lastName": "Smith"
}
Transform every element inside an array.
This operation changes the value of $this
on every iteration.
Use {{#map fieldName}}
as field name inside a single field object. The value of that field will be the template applied to every element of the data.
The wrapper object will be discarded (it is used only for syntactic purposes).
Note: The type of the field must be Array.
Data
{
"id": 1,
"users": [
{
"firstName": "John",
"lastName": "Smith"
},
{
"firstName": "Jane",
"lastName": "Doe"
}
]
}
Template
{
"{{#map users}}": "{{firstName}} {{lastName}}"
}
Result
[
"John Smith",
"Jane Doe"
]
FlatMap is very similar to map, but the the result of the template on each value must be an Array. This result will be appended to the resulting value.
This operation changes the value of $this
on every iteration.
Use {{#flatmap fieldName}}
as field name inside a single field object. The value of that field will be the template applied to every element of the data.
The wrapper object will be discarded (it is used only for syntactic purposes).
Note: The type of the field must be Array and the result of the tempalte must be an Array.
Note: The difference with map
is that if map returns an array, the final result will be an array of arrays.
Data
{
"id": 1,
"users": [
{
"name": "John",
"address": "123 Av"
},
{
"name": "Jane",
"address": "456 Av"
}
]
}
Template
{
"{{#flatmap users}}": [
{ "name": "{{name}}" },
{ "address": "{{address}}" }
]
}
Result
[
{ "name": "John" },
{ "address": "123 Av" },
{ "name": "Jane" },
{ "address": "456 Av" }
]
On any processing error, Kafka Transform will stop consuming and exit the process.
If the error is not transient and is related to the last message of a consumer group's topic partition, the following tools will be usefull to continue processing.
To know which is the current offset on a particular partition for a consumer group we can use:
kafka-consumer-groups.sh --bootstrap-server $SERVER --describe --group $APPLICATION_ID
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
kafka-transform sourceTopic 0 4499 4500 1 kafka-transform-98305c80-c535-4102-8b62-a5a7f5cf9239-StreamThread-1-consumer_/172.19.0.4
Note: The CURRENT-OFFSET
is not the actual offset the application is handling, it represents the last offset commit.
To get the actual value you should gracefully shutdown the application and then execute the command.
Note: If the consumers are stopped, after a little while the command will fail with Consumer group
APPLICATION_ID is rebalancing.
Using the offset and partition from kafka-consumer-groups.sh
we can see wich message is generating the error on the application.
kafka-console-consumer.sh --bootstrap-server $SERVER --max-messages 1 --topic $TOPIC --partition $PARTITION --offset $OFFSET
- Stop all consumers of the group
- Run kafka-verifiable-consumer.sh
- Stop it after consuming the message (ctrl+c)
- Start the application
kafka-verifiable-consumer.sh --broker-list $SERVER --group-id $APPLICATION_ID --topic sourceTopic --max-messages 1 --verbose
Note: If the application is running you will see an error like the following:
{"timestamp":1520368635720,"partitions":[],"name":"partitions_revoked"}
[2018-03-06 20:37:15,780] ERROR Attempt to join group kafka-transform failed due to fatal error: The group member's supported protocols are incompatible with those of existing members. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
{"timestamp":1520368635783,"name":"shutdown_complete"}
Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
Will move the offset of the application on the topic to 0.
- Stop all consumers of the group
- Run kafka-streams-application-reset.sh
- Start the application
kafka-streams-application-reset.sh --bootstrap-servers $SERVER --application-id $APPLICATION_ID --input-topics $TOPIC
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $SERVER --topic $TOPIC
<topic>:0:3137
We use logback to produce the log output. The default configuration can be found on src/main/resources/logback.xml
.
Run these commands from the root of the project:
docker-compose up -d
# Checkout the output topic messages
docker exec -ti kafkatransform_kafka_1 kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic sinkTopic --from-beginning
# Produce more data manually
docker exec -ti kafkatransform_kafka_1 kafka-console-producer.sh --broker-list kafka:9092 --topic sourceTopic
# Check consumer group status
docker exec -ti kafkatransform_kafka_1 kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group kafka-transform
# Peek message from topic
docker exec -ti kafkatransform_kafka_1 kafka-console-consumer.sh --bootstrap-server kafka:9092 --max-messages 1 --topic sourceTopic --partition 0 --offset 2332
# Advance consumer group by 1 (stop it with ctrl+c)
docker exec -ti kafkatransform_kafka_1 kafka-verifiable-consumer.sh --broker-list kafka:9092 --group-id kafka-transform --topic sourceTopic --max-messages 1 --verbose
# Clean up everything
docker-compose down
- Support custom error handling
- ignore missing field or type errors
- send errors to a different kafka topic
- Support constant rules without using a remote http server
- Json Template
- Default values on missing fields or type errors
- Add more meta-fields to the templates
- $parent
- $index
- $root
- Lookup missing fields on parent objects
- Add template playground page to test templates with values
- Lucene Matcher
- Improve performance using MemoryIndex.reset and caching queries
- Add support for Date fields
- Add multi field matching
- Add template playground page to test queries with values
- Support custom analyzers
- Support schemas to avoid inference over query values (e.g. avro + schema registry)
- Support quoted ranges for type inference
- Support match all (single value and ranges) on numeric fields