Skip to content

Commit

Permalink
ultimos cambios
Browse files Browse the repository at this point in the history
  • Loading branch information
povconfluent committed Oct 7, 2022
1 parent 8e279e1 commit 37ed834
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Confluent Platform 7.0.1 KSQLDB Hands-on Workshop
# Confluent Platform 7.2.2 KSQLDB Hands-on Workshop
This github project describes a Hands-on Workshop around Confluent KSQLDB. The structure of the Hands-on is as followed
* Explaining and Introduce KSQLDB
* Labs: Get to know the environment
Expand Down
34 changes: 30 additions & 4 deletions labs/05_usecase_realtime_inventory_ETL.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mongo -u $MONGO_INITDB_ROOT_USERNAME -p mongo-pw admin
rs.initiate()
use logistics
show collections
exit
exit
```
Lets start loading Orders into our collections
```bash
Expand All @@ -24,7 +26,7 @@ db.orders.insert({"customer_id": "5", "order_id": "17", "price": 25.25, "currenc
db.orders.insert({"customer_id": "5", "order_id": "15", "price": 13.75, "currency": "usd", "ts": "2020-04-03T02:55:00"})
db.orders.insert({"customer_id": "7", "order_id": "22", "price": 29.71, "currency": "aud", "ts": "2020-04-04T00:12:00"})
```
Lets start running ksqldb commands
Lets start running ksqldb commands in another terminal
```bash
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088
ksql> SET 'auto.offset.reset' = 'earliest';
Expand All @@ -47,6 +49,7 @@ CREATE SOURCE CONNECTOR users_reader WITH (
);
```
Create another source conector to capture orders and shippments in real time

```bash
CREATE SOURCE CONNECTOR logistics_reader WITH (
'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
Expand All @@ -65,6 +68,16 @@ CREATE SOURCE CONNECTOR logistics_reader WITH (
```
Check connect in control center if both connectors are up and running.

Do the same directly in ksqlDB

```bash
show connectors;
show topics;
```
Lets print some messages to analyze what is happening (you can remove limit to analyze in real time)
```bash
print 'my-replica-set.logistics.orders' LIMIT 5;
```
Create a stream for all data comming in **Why am I doing this?**
```bash
CREATE STREAM users WITH (
Expand Down Expand Up @@ -117,7 +130,7 @@ check what happend in the ksqldb terminal, now lets see latest value of the pull
```bash
ksql> select * from users_by_key where id='4';
```
lets do a left join to get who is ordering what
lets do a left join to get who is ordering what (STREAM-TABLE JOIN, any idea?)
```bash
CREATE STREAM enriched_orders AS
SELECT o.order_id,
Expand All @@ -131,7 +144,20 @@ CREATE STREAM enriched_orders AS
ON o.customer_id = c.id
EMIT CHANGES;
```
Now lets see where should the orders be shipped
stop the MongoDB script and run
```bash
select * from enriched_orders emit changes;
```
Nothing is happening no? When will a new event be triggered?
Run again (in another terminal) the mongoDB script
```bash
cd /home/ec2-user/ksqldbWorkshop-main/docker
python loadOrders.py

```
Now lets see where should the orders be shipped to
```bash
CREATE STREAM shipped_orders WITH (
kafka_topic = 'shipped_orders'
Expand All @@ -146,7 +172,7 @@ CREATE STREAM shipped_orders WITH (
o.currency
FROM enriched_orders AS o
INNER JOIN shipments s
WITHIN 7 DAYS
WITHIN 7 DAYS GRACE PERIOD 15 MINUTES
ON s.order_id = o.order_id
EMIT CHANGES;
```
Expand Down
17 changes: 11 additions & 6 deletions labs/06_usecase_track-and-trace.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ ksql> SET 'auto.offset.reset' = 'earliest';
ksql> select * from orders_stream emit changes;
ksql> describe orders_stream;
```
**Drop stream shipped_orders and terminate query, do it in control center**
do the same for Shipments
**Drop stream shipped_orders and shipments**
```bash
drop stream IF EXISTS shipped_orders ;
drop stream IF EXISTS shipments;
```
Once droped please execute
```bash
ksql> CREATE STREAM shipments_stream (shipmentid varchar key, shipment_id VARCHAR, shipment_ts VARCHAR, order_id VARCHAR, delivery VARCHAR)
WITH (KAFKA_TOPIC='shipments',
Expand All @@ -42,18 +46,19 @@ ksql> select * from shipped_orders emit changes;
ksql> CREATE STREAM shipment_statuses_stream (shipment_id VARCHAR, status VARCHAR, warehouse VARCHAR)
WITH (KAFKA_TOPIC='shipment_status',
VALUE_FORMAT='JSON');
ksql> select * from shipment_statuses_stream emit changes;
ksql> describe shipment_statuses_stream ;
ksql> select * from shipment_statuses_stream emit changes;
```
You can also try to insert data via `insert statements`
You can also try to insert data via `insert statements` , leave the previous statement online and open a new terminal.
```bash
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088
ksql> INSERT INTO orders_stream (orderid, order_ts, shop, product, order_placed, total_amount, customer_name) VALUES ('"10"', '2019-03-29T06:01:18Z', 'Otto', 'iPhoneX','Berlin', 133548.84, 'Mark Mustermann');
INSERT INTO shipments_stream (shipmentid, shipment_id, shipment_ts, order_id, delivery) VALUES ('"ship-ch83360"','ship-ch83360', '2019-03-31T18:13:39Z', '10', 'UPS');
INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'in delivery', 'BERLIN');
INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'in delivery', 'FRANKFURT');
INSERT INTO shipment_statuses_stream (shipment_id, status, warehouse) VALUES ('ship-ch83360', 'delivered', '@customer');
ksql> select * from shipment_statuses_stream emit changes;
```
close that terminal and continue with the previous one
symmetric update to table (topic behind is compacted unlimited retention)
```bash
ksql> CREATE TABLE shipment_statuses_table AS SELECT
Expand Down

0 comments on commit 37ed834

Please sign in to comment.