Table of Contents
This demo is a streaming pipeline using Apache Kafka. It connects to the Wikimedia Foundation's IRC channels (e.g. #en.wikipedia, #en.wiktionary) and streams the edits happening to Kafka via kafka-connect-irc. The raw messages are transformed using a Kafka Connect Single Message Transform: kafka-connect-transform-wikiedit and the parsed messages are materialized into Elasticsearch for analysis by Kibana.
Components:
- Confluent Control Center
- Kafka Connect
- Kafka Streams
- KSQL
- Confluent Schema Registry
- kafka-connect-irc source connector
- kafka-connect-elasticsearch sink connector
- Elasticsearch
- Kibana
- Since this repository uses submodules, clone with
--recursive
:
$ git clone --recursive [email protected]:confluentinc/ConfluentPlatformWikipediaDemo.git
Otherwise, git clone and then submodule init/update:
$ git clone [email protected]:confluentinc/ConfluentPlatformWikipediaDemo.git
$ cd ConfluentPlatformWikipediaDemo
$ git submodule init
Submodule 'kafka-connect-irc' (https://github.com/cjmatta/kafka-connect-irc) registered for path 'kafka-connect-irc'
Submodule 'kafka-connect-transform-wikiedit' (https://github.com/cjmatta/kafka-connect-transform-wikiedit) registered for path 'kafka-connect-transform-wikiedit'
$ git submodule update
-
Increase the memory available to Docker. Default is 2GB, increase to at least 6GB.
-
Run
make clean all
to build the IRC connector and the transformer that will parse the Wikipedia edit messages to data. These are saved toconnect-plugins
path, which is a shared volume to theconnect
docker container
$ make clean all
...
$ ls connect-plugins
- Start Docker Compose. It will take about 2 minutes for all containers to start and for Confluent Control Center GUI to be ready.
$ docker-compose up -d
- Wait till Confluent Control Center is running fully. You can check when it's ready when the logs show the following event
$ docker-compose logs -f control-center | grep -e HTTP
control-center_1 | [2017-09-06 16:37:33,133] INFO Started NetworkTrafficServerConnector@26a529dc{HTTP/1.1}{0.0.0.0:9021} (org.eclipse.jetty.server.NetworkTrafficServerConnector)
Now you must decide how you want to run the demo, whether you want to run data either:
- Straight through Kafka from Wikipedia IRC to Elasticsearch without KSQL. The connectors use Schema Registry and Avro.
$ export DEMOPATH=scripts_no_app
or
- From Wikipedia IRC to Elasticsearch with KSQL. The connectors use Json instead of Avro because KSQL does not support Avro with Schema Registry at this time.
$ export DEMOPATH=scripts_ksql_app
- Setup the cluster and connectors
$ ./$DEMOPATH/setup.sh
- If you are demo'ing KSQL.
2a. Start KSQL
$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:9092 --properties-file /tmp/ksqlproperties
2b. Run saved KSQL commands which generates an output topic that feeds into the Elasticsearch sink connector.
ksql> run script '/tmp/ksqlcommands';
2c. Leave KSQL application open for the duration of the demo to keep Kafka clients running. If you close KSQL, data processing will stop.
-
Open Kibana http://localhost:5601/.
-
Navigate to "Management --> Saved Objects" and click
Import
. Then choose of these two options:
- If you are running traffic straight from Wikipedia IRC to Elasticsearch without KSQL, then load the
scripts_no_app/kibana_dash.json
file - If you are running traffic from Wikipedia IRC through KSQL to Elasticsearch, then load the
scripts_ksql_app/kibana_dash.json
file
-
Click "Yes, overwrite all".
-
Navigate to the Dashboard tab (speedometer icon) and open your new dashboard.
-
Use Google Chrome to view the Control Center GUI at http://localhost:9021 and see the message delivery status, consumer groups, connectors.
To simulate a slow consumer, we will use Kafka's quota feature to rate-limit consumption from the broker side.
- Start consuming from topic
wikipedia.parsed
with a new consumer groupapp
which has two consumersconsumer_app_1
andconsumer_app_2
. It will run in the background.
$ ./$DEMOPATH/start_consumer_app.sh
-
Let the above consumers run for a while until it has steady consumption.
-
Add a consumption quota for one of the consumers in the consumer group
app
$ ./$DEMOPATH/throttle_consumer.sh 1 add
-
View in C3 how this one consumer starts to lag.
-
Remove the consumption quota for the consumer.
$ ./$DEMOPATH/throttle_consumer.sh 1 delete
- Stop consuming from topic
wikipedia.parsed
with a new consumer groupapp
.
$ ./$DEMOPATH/stop_consumer_app.sh
In a different terminal, watch the live messages from the wikipedia.parsed
topic:
$ ./$DEMOPATH/listen_wikipedia.parsed.sh
In a different terminal, watch the SMT failed messages (poison pill routing) from the wikipedia.failed
topic:
$ ./$DEMOPATH/listen_wikipedia.failed.sh
Stop and destroy all components and clear all volumes from Docker.
$ ./$DEMOPATH/reset_demo.sh