The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/no-more-silos-integrating-databases-and-apache-kafka
Start the environment
cd docker-compose
./scripts/setup.sh
Optionally, use something like screen
or tmux
to have these both easily to hand. Or multiple Terminal tabs. Whatever works for you :)
-
KSQL CLI:
cd docker-compose docker-compose exec ksql-cli ksql http://ksql-server:8088
-
SQL*Plus
cd docker-compose docker-compose exec oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'
(the
sleep
is necessary to avoidrlwrap: error: My terminal reports width=0 (is it emacs?) I can’t handle this, sorry!
ref)-
This uses rlwrap, which isn’t in the vanilla Oracle image. To include it in your own image, add this to
OracleDatabase/SingleInstance/dockerfiles/12.2.0.1/Dockerfile
(or use this fork):RUN rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm RUN yum install -y rlwrap
-
-
Oracle log
docker-compose logs -f oracle
-
Debezium log
docker-compose logs -f connect-debezium
COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CREATE_TS FOR A29
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CREATE_TS, UPDATE_TS FROM CUSTOMERS;
ID FIRST_NAME LAST_NAME CREATE_TS UPDATE_TS
---- --------------- --------------- ----------------------------- -----------------------------
1 Rica Blaisdell 04-DEC-18 08.22.32.933376 PM 04-DEC-18 08.22.32.000000 PM
2 Ruthie Brockherst 04-DEC-18 08.22.32.953342 PM 04-DEC-18 08.22.32.000000 PM
3 Mariejeanne Cocci 04-DEC-18 08.22.32.965713 PM 04-DEC-18 08.22.32.000000 PM
4 Hashim Rumke 04-DEC-18 08.22.32.977417 PM 04-DEC-18 08.22.32.000000 PM
5 Hansiain Coda 04-DEC-18 08.22.32.979967 PM 04-DEC-18 08.22.32.000000 PM
-
Debezium
$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort ora-source-debezium-xstream | RUNNING | RUNNING
-
JDBC
$ curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort ora-source-jdbc | RUNNING | RUNNING
In KSQL:
ksql> list topics;
Show contents
PRINT 'asgard.DEBEZIUM.CUSTOMERS' FROM BEGINNING;
SET AUTOCOMMIT ON;
INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');
Create a stream
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_DBZ_SRC WITH (KAFKA_TOPIC='asgard.DEBEZIUM.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');
LIST STREAMS;
It also supports nested data
DESCRIBE CUSTOMERS_STREAM_DBZ_SRC;
Pretty-print the source data to show why nested
echo '{"before": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Bronze", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000706000000}, "after": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Platinum", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000742000000}, "source": {"version": "0.9.0.Alpha2", "connector": "oracle", "name": "asgard", "ts_ms": 1544000742000, "txId": "6.26.734", "scn": 2796831, "snapshot": false}, "op": "u", "ts_ms": 1544000745823, "messagetopic": "asgard.DEBEZIUM.CUSTOMERS", "messagesource": "Debezium CDC from Oracle on asgard"}'|jq '.'
Look at before & after:
SELECT OP, AFTER->ID, BEFORE->CLUB_STATUS, AFTER->CLUB_STATUS FROM CUSTOMERS_STREAM_DBZ_SRC;
r | 1 | null | bronze
r | 2 | null | platinum
r | 3 | null | bronze
r | 4 | null | platinum
r | 5 | null | platinum
c | 42 | null | Bronze
u | 42 | Bronze | Platinum
JDBC only shows what it is now:
SELECT ID, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC;
Do an update in the database, do a delete - note the data you get with proper CDC vs not
UPDATE CUSTOMERS SET CLUB_STATUS='Silver' WHERE ID=2;
DELETE FROM CUSTOMERS WHERE ID=2;
Flattening data:
CREATE STREAM CUSTOMERS_STREAM_FLATTENED AS \
SELECT AFTER->ID AS ID, \
AFTER->FIRST_NAME AS FIRST_NAME, \
AFTER->LAST_NAME AS LAST_NAME, \
AFTER->EMAIL AS EMAIL, \
AFTER->GENDER AS GENDER, \
AFTER->CLUB_STATUS AS CLUB_STATUS, \
AFTER->COMMENTS AS COMMENTS \
FROM CUSTOMERS_STREAM_DBZ_SRC;
LIST TOPICS;
PRINT 'CUSTOMERS_STREAM_FLATTENED' FROM BEGINNING;
Checking lag
CREATE STREAM LAG_MONITOR_DBZ AS \
SELECT ROWTIME, \
SOURCE->TS_MS, \
ROWTIME - SOURCE->TS_MS AS LAG, \
OP, \
SOURCE->SNAPSHOT, \
BEFORE->ID, \
AFTER->ID \
FROM CUSTOMERS_STREAM_DBZ_SRC;
SELECT BEFORE__ID, AFTER__ID, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), LAG, OP FROM LAG_MONITOR_DBZ;
(what would be nice here is to hook up LAG_MONITOR_DBZ
to Elasticsearch or InfluxDB and have a little monitoring chart)
ksql> SELECT OP, COUNT(*) FROM CUSTOMERS_STREAM_DBZ_SRC GROUP BY OP;
c | 1
r | 9
u | 5
d | 3
In KSQL:
ksql> list topics;
Show contents
PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');
KSQL applies the schema to the data
DESCRIBE CUSTOMERS_STREAM_JDBC_SRC;
Lag
CREATE STREAM LAG_MONITOR_JDBC AS SELECT ROWTIME, UPDATE_TS, ROWTIME-UPDATE_TS AS LAG, ID FROM CUSTOMERS_STREAM_JDBC_SRC;
SELECT ID, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), LAG FROM LAG_MONITOR_JDBC;