diff --git a/README.md b/README.md index 3f313f1..6c4bd75 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Confluent Platform 7.0.1 KSQLDB Hands-on Workshop +# Confluent Platform 7.2.2 KSQLDB Hands-on Workshop This github project describes a Hands-on Workshop around Confluent KSQLDB. The structure of the Hands-on is as followed * Explaining and Introduce KSQLDB * Labs: Get to know the environment diff --git a/labs/05_usecase_realtime_inventory_ETL.md b/labs/05_usecase_realtime_inventory_ETL.md index ba04734..10b684f 100644 --- a/labs/05_usecase_realtime_inventory_ETL.md +++ b/labs/05_usecase_realtime_inventory_ETL.md @@ -10,6 +10,8 @@ mongo -u $MONGO_INITDB_ROOT_USERNAME -p mongo-pw admin rs.initiate() use logistics show collections +exit +exit ``` Lets start loading Orders into our collections ```bash @@ -24,7 +26,7 @@ db.orders.insert({"customer_id": "5", "order_id": "17", "price": 25.25, "currenc db.orders.insert({"customer_id": "5", "order_id": "15", "price": 13.75, "currency": "usd", "ts": "2020-04-03T02:55:00"}) db.orders.insert({"customer_id": "7", "order_id": "22", "price": 29.71, "currency": "aud", "ts": "2020-04-04T00:12:00"}) ``` -Lets start running ksqldb commands +Lets start running ksqldb commands in another terminal ```bash docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088 ksql> SET 'auto.offset.reset' = 'earliest'; @@ -47,6 +49,7 @@ CREATE SOURCE CONNECTOR users_reader WITH ( ); ``` Create another source conector to capture orders and shippments in real time + ```bash CREATE SOURCE CONNECTOR logistics_reader WITH ( 'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector', @@ -65,6 +68,16 @@ CREATE SOURCE CONNECTOR logistics_reader WITH ( ``` Check connect in control center if both connectors are up and running. +Do the same directly in ksqlDB + +```bash +show connectors; +show topics; +``` +Lets print some messages to analyze what is happening (you can remove limit to analyze in real time) +```bash +print 'my-replica-set.logistics.orders' LIMIT 5; +``` Create a stream for all data comming in **Why am I doing this?** ```bash CREATE STREAM users WITH ( @@ -117,7 +130,7 @@ check what happend in the ksqldb terminal, now lets see latest value of the pull ```bash ksql> select * from users_by_key where id='4'; ``` -lets do a left join to get who is ordering what +lets do a left join to get who is ordering what (STREAM-TABLE JOIN, any idea?) ```bash CREATE STREAM enriched_orders AS SELECT o.order_id, @@ -131,7 +144,20 @@ CREATE STREAM enriched_orders AS ON o.customer_id = c.id EMIT CHANGES; ``` -Now lets see where should the orders be shipped +stop the MongoDB script and run +```bash +select * from enriched_orders emit changes; +``` +Nothing is happening no? When will a new event be triggered? + +Run again (in another terminal) the mongoDB script +```bash +cd /home/ec2-user/ksqldbWorkshop-main/docker +python loadOrders.py + +``` + +Now lets see where should the orders be shipped to ```bash CREATE STREAM shipped_orders WITH ( kafka_topic = 'shipped_orders' @@ -146,7 +172,7 @@ CREATE STREAM shipped_orders WITH ( o.currency FROM enriched_orders AS o INNER JOIN shipments s - WITHIN 7 DAYS + WITHIN 7 DAYS GRACE PERIOD 15 MINUTES ON s.order_id = o.order_id EMIT CHANGES; ``` diff --git a/labs/06_usecase_track-and-trace.md b/labs/06_usecase_track-and-trace.md index d9938cd..7ecd7ec 100644 --- a/labs/06_usecase_track-and-trace.md +++ b/labs/06_usecase_track-and-trace.md @@ -15,8 +15,12 @@ ksql> SET 'auto.offset.reset' = 'earliest'; ksql> select * from orders_stream emit changes; ksql> describe orders_stream; ``` -**Drop stream shipped_orders and terminate query, do it in control center** -do the same for Shipments +**Drop stream shipped_orders and shipments** +```bash +drop stream IF EXISTS shipped_orders ; +drop stream IF EXISTS shipments; +``` +Once droped please execute ```bash ksql> CREATE STREAM shipments_stream (shipmentid varchar key, shipment_id VARCHAR, shipment_ts VARCHAR, order_id VARCHAR, delivery VARCHAR) WITH (KAFKA_TOPIC='shipments', @@ -42,18 +46,19 @@ ksql> select * from shipped_orders emit changes; ksql> CREATE STREAM shipment_statuses_stream (shipment_id VARCHAR, status VARCHAR, warehouse VARCHAR) WITH (KAFKA_TOPIC='shipment_status', VALUE_FORMAT='JSON'); -ksql> select * from shipment_statuses_stream emit changes; -ksql> describe shipment_statuses_stream ; +ksql> select * from shipment_statuses_stream emit changes; ``` -You can also try to insert data via `insert statements` +You can also try to insert data via `insert statements` , leave the previous statement online and open a new terminal. ```bash +docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088 ksql> INSERT INTO orders_stream (orderid, order_ts, shop, product, order_placed, total_amount, customer_name) VALUES ('"10"', '2019-03-29T06:01:18Z', 'Otto', 'iPhoneX','Berlin', 133548.84, 'Mark Mustermann'); INSERT INTO shipments_stream (shipmentid, shipment_id, shipment_ts, order_id, delivery) VALUES ('"ship-ch83360"','ship-ch83360', '2019-03-31T18:13:39Z', '10', 'UPS'); INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'in delivery', 'BERLIN'); INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'in delivery', 'FRANKFURT'); INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'delivered', '@customer'); -ksql> select * from shipment_statuses_stream emit changes; ``` +close that terminal and continue with the previous one + symmetric update to table (topic behind is compacted unlimited retention) ```bash ksql> CREATE TABLE shipment_statuses_table AS SELECT