The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/no-more-silos-integrating-databases-and-apache-kafka
-
Bring up the stack
git clone https://github.com/confluentinc/demo-scene.git cd no-more-silos docker-compose up -d
This brings up the stack ready for use.
-
Wait for Kafka Connect to be started
bash -c ' echo "Waiting for Kafka Connect to start listening on localhost ⏳" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) " Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) '
-
Make sure the connector plugins are available
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'debezium.*mysql|JdbcSink'
"io.confluent.connect.jdbc.JdbcSinkConnector" "io.debezium.connector.mysql.MySqlConnector"
-
Launch MySQL CLI
docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'
-
In MySQL, examine the data
DESCRIBE customers;
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
+----+------------+-----------+----------------------------+---------------------+ | ID | FIRST_NAME | LAST_NAME | EMAIL | UPDATE_TS | +----+------------+-----------+----------------------------+---------------------+ | 1 | Bibby | Argabrite | bargabrite0@google.com.hk | 2019-04-01 16:51:18 | | 2 | Auberon | Sulland | asulland1@slideshare.net | 2019-04-01 16:51:18 | | 3 | Marv | Dalrymple | mdalrymple2@macromedia.com | 2019-04-01 16:51:18 | | 4 | Nolana | Yeeles | nyeeles3@drupal.org | 2019-04-01 16:51:18 | | 5 | Modestia | Coltart | mcoltart4@scribd.com | 2019-04-01 16:51:18 | +----+------------+-----------+----------------------------+---------------------+ 5 rows in set (0.00 sec)
-
Create the connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "source-jdbc-mysql-00", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-00-", "poll.interval.ms": 1000, "tasks.max":1, "mode":"timestamp", "table.whitelist" : "demo.customers", "timestamp.column.name": "UPDATE_TS", "validate.non.null": false } }'
-
Check it’s running
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
source | source-datagen-item_details_01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
-
Examine the data
docker run --net host --rm edenhill/kafkacat:1.5.0 \ -b localhost:9092 \ -r http://localhost:8081 \ -s avro \ -t mysql-00-customers \ -C -o beginning -u -q | jq -c '.'
-
Split the screen to show Kafka topic output along with MySQL.
-
Make changes in MySQL and observe that the Kafka topic (as shown by KSQL) updates automatically
-
Insert a new row in MySQL:
INSERT INTO customers (ID, FIRST_NAME, LAST_NAME, EMAIL, GENDER, COMMENTS) VALUES (42, 'Rick', 'Astley', '', 'Male', '');
-
Insert a new row in MySQL:
UPDATE customers SET EMAIL = '[email protected]' WHERE ID = 42;
-
-
In MySQL, examine the data
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
+----+------------+-----------+----------------------------+---------------------+ | ID | FIRST_NAME | LAST_NAME | EMAIL | UPDATE_TS | +----+------------+-----------+----------------------------+---------------------+ | 1 | Bibby | Argabrite | bargabrite0@google.com.hk | 2019-04-01 16:51:18 | | 2 | Auberon | Sulland | asulland1@slideshare.net | 2019-04-01 16:51:18 | | 3 | Marv | Dalrymple | mdalrymple2@macromedia.com | 2019-04-01 16:51:18 | | 4 | Nolana | Yeeles | nyeeles3@drupal.org | 2019-04-01 16:51:18 | | 5 | Modestia | Coltart | mcoltart4@scribd.com | 2019-04-01 16:51:18 | | 42 | Rick | Astley | Never.gonna.give.you@up.com| 2019-04-01 17:59:43 | +----+------------+-----------+----------------------------+---------------------+ 5 rows in set (0.00 sec)
-
Create the connector
curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \ -d '{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "42", "database.allowPublicKeyRetrieval":"true", "database.server.name": "asgard", "table.whitelist": "demo.customers", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "asgard.dbhistory.demo" , "include.schema.changes": "true" }'
-
Check it’s running
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
source | source-datagen-item_details_01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector source | source-debezium-mysql-00 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
-
Examine the data with kafkacat
docker run --net host --rm edenhill/kafkacat:1.5.0 \ -b localhost:9092 \ -r http://localhost:8081 \ -s avro \ -t asgard.demo.customers \ -C -o beginning -u -q | jq '.'
{ "before": null, "after": { "Value": { "id": 42, "first_name": { "string": "Rick" }, "last_name": { "string": "Astley" }, "email": { "string": "[email protected]" }, "gender": { "string": "Male" }, "comments": { "string": "" }, "UPDATE_TS": { "string": "2019-10-23T16:29:53Z" } } }, "source": { "version": "0.10.0.Final", "connector": "mysql", "name": "asgard", "ts_ms": 0, "snapshot": { "string": "last" }, "db": "demo", "table": { "string": "customers" }, "server_id": 0, "gtid": null, "file": "binlog.000002", "pos": 873, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": { "long": 1571848220368 } }
-
Split the screen to show Kafka topic output along with MySQL.
-
Rerun kafkacat to show compact output
docker run --net host --rm edenhill/kafkacat:1.5.0 \ -b localhost:9092 \ -r http://localhost:8081 \ -s avro \ -t asgard.demo.customers \ -C -o beginning -u -q | jq '.op, .before, .after'
-
Make changes in MySQL and observe that the Kafka topic (as shown by KSQL) updates automatically
-
Update a new row in MySQL:
UPDATE customers SET EMAIL = '[email protected]' WHERE ID = 42;
UPDATE customers SET FIRST_NAME = 'BOB' WHERE ID = 42;
-
Delete a row in MySQL:
DELETE FROM customers WHERE ID=2;
-
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_CDC_STREAM WITH (KAFKA_TOPIC='asgard.demo.customers', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_AFTER AS
SELECT AFTER->ID AS ID,
AFTER->FIRST_NAME AS FIRST_NAME,
AFTER->LAST_NAME AS LAST_NAME,
AFTER->EMAIL AS EMAIL,
AFTER->GENDER AS GENDER,
AFTER->COMMENTS AS COMMENTS
FROM CUSTOMERS_CDC_STREAM;
CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_AFTER PARTITION BY ID;
SELECT ROWKEY, ID FROM CUSTOMERS_STREAM EMIT CHANGES LIMIT 1;
CREATE TABLE CUSTOMERS_TABLE WITH (KAFKA_TOPIC='CUSTOMERS_STREAM', VALUE_FORMAT='AVRO');
-
In MySQL, query the state:
mysql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM customers WHERE ID=42;
+----+------------+-----------+-----------------------------+ | ID | FIRST_NAME | LAST_NAME | EMAIL | +----+------------+-----------+-----------------------------+ | 42 | Rick | Astley | Never.gonna.give.you@up.com | +----+------------+-----------+-----------------------------+ 1 rows in set (0.00 sec)
-
In KSQL query the table:
SET 'auto.offset.reset' = 'earliest'; SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM CUSTOMERS_TABLE WHERE ID=42 EMIT CHANGES; 42 | Rick | Astley | Never.gonna.give.you@up.com | 2019-04-01T22:42:58Z
-
In KSQL query the stream:
SET 'auto.offset.reset' = 'earliest'; SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM CUSTOMERS_STREAM WHERE ID=42 EMIT CHANGES; 42 | Rick | Astley | 42 | Rick | Astley | Never.gonna.give.you@up.com 42 | Rick | Astley | r.astley@example.com
-
Show before/after records:
SET 'auto.offset.reset' = 'earliest'; SELECT OP, BEFORE->EMAIL, AFTER->EMAIL FROM CUSTOMERS_CDC_STREAM WHERE AFTER->ID=42 EMIT CHANGES;
c | null | u | | [email protected] u | [email protected] | [email protected] u | [email protected] | [email protected]
-
Join to a stream of events
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
SELECT MESSAGE, STARS, USER_ID FROM RATINGS EMIT CHANGES;
SELECT R.RATING_ID, R.MESSAGE, R.STARS, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL FROM RATINGS R LEFT JOIN CUSTOMERS_TABLE C ON R.USER_ID = C.ROWKEY WHERE C.FIRST_NAME IS NOT NULL EMIT CHANGES;
CREATE STREAM RATINGS_ENRICHED AS SELECT R.RATING_ID, R.MESSAGE, R.STARS, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL FROM RATINGS R LEFT JOIN CUSTOMERS_TABLE C ON R.USER_ID = C.ROWKEY WHERE C.FIRST_NAME IS NOT NULL;
PRINT 'RATINGS_ENRICHED';