Table of Contents
-
Bring up the stack
git clone https://github.com/confluentinc/demo-scene.git cd kafka-ecosystem docker-compose up -d
This brings up the stack ready for use.
-
Wait for Kafka Connect to be started
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
-
Make sure that the JDBC connector is available:
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'JdbcSinkConnector'
Expect:
"io.confluent.connect.jdbc.JdbcSinkConnector"
docker exec -it kafkacat kafkacat -b kafka:29092 -t test -P -K:
docker exec kafkacat kafkacat -b kafka:29092 -t test -C -u
docker exec kafkacat kafkacat -b kafka:29092 -t test -C -u -f '[Topic %t/partition %p/offset %o]\tKey: %k\tValue: %s\n'
docker exec kafkacat kafkacat -b kafka:29092 -t trades -C -u -q -o 5 -r http://schema-registry:8081 -s key=s -s value=avro
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
CREATE STREAM TRADES WITH (KAFKA_TOPIC='trades', VALUE_FORMAT='AVRO');
SELECT * FROM TRADES WHERE SYMBOL='ZWZZT' EMIT CHANGES;
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE ZWZZT_TRADES_BY_15_MIN AS
SELECT SIDE AS ROWKEY,
WINDOWSTART AS WINDOW_TS,
AS_VALUE(SIDE) AS SIDE,
COUNT(*) AS TRADE_COUNT,
SUM(PRICE * QUANTITY) AS TOTAL_VALUE,
SUM(QUANTITY)/COUNT(*) AS AVG_QUANTITY
FROM TRADES WINDOW TUMBLING (SIZE 15 MINUTES)
WHERE SYMBOL='ZWZZT'
GROUP BY SIDE EMIT CHANGES;
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'ZWZZT_TRADES_BY_15_MIN',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_value',
'pk.fields' = 'SIDE, WINDOW_TS',
'transforms' = 'setTimestampType',
'transforms.setTimestampType.type'= 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field'= 'WINDOW_TS',
'transforms.setTimestampType.target.type' ='Timestamp'
);
docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
SELECT * FROM "ZWZZT_TRADES_BY_15_MIN" ORDER BY 1 DESC,2;
Optional: pull query
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS,
ROWKEY,
TRADE_COUNT
FROM ZWZZT_TRADES_BY_15_MIN
WHERE ROWKEY='BUY';
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'topics' = 'trades',
'key.ignore' = 'true',
'schema.ignore' = 'false',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'transforms'= 'ExtractTimestamp',
'transforms.ExtractTimestamp.type'= 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'TS'
);
Open Kibana http://localhost:5601 and show the data