From ac9db14a19223306a5843f7692281cfbaba1f60f Mon Sep 17 00:00:00 2001 From: povconfluent Date: Wed, 30 Mar 2022 09:37:15 +0200 Subject: [PATCH] updated --- docker/docker-compose.yml | 2 + docker/mongodb/mongo-init.sh | 10 ++ docker/postgres/db-init-scripts/00-inital.sh | 11 ++ labs/05_usecase_realtime_inventory_ETL.md | 108 ++++--------------- 4 files changed, 42 insertions(+), 89 deletions(-) create mode 100755 docker/mongodb/mongo-init.sh diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e930559..7bdcee8 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -263,6 +263,8 @@ services: MONGO_INITDB_ROOT_USERNAME: mongo-user MONGO_INITDB_ROOT_PASSWORD: mongo-pw MONGO_REPLICA_SET_NAME: my-replica-set + volumes: + - ./mongodb/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro command: --replSet my-replica-set --bind_ip_all postgres: diff --git a/docker/mongodb/mongo-init.sh b/docker/mongodb/mongo-init.sh new file mode 100755 index 0000000..3fee0d8 --- /dev/null +++ b/docker/mongodb/mongo-init.sh @@ -0,0 +1,10 @@ +mongo -u mongo-user -p mongo-pw admin < SET 'auto.offset.reset' = 'earliest'; ``` -Create source connector for Postgres, it has customer information. you can do it from control center if you wish, give it a try. +Create source connector for Postgres, it has user information. you can do it from control center if you wish, give it a try. ```bash -CREATE SOURCE CONNECTOR customers_reader WITH ( +CREATE SOURCE CONNECTOR users_reader WITH ( 'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector', 'database.hostname' = 'postgres', 'database.port' = '5432', @@ -103,7 +35,7 @@ CREATE SOURCE CONNECTOR customers_reader WITH ( 'database.password' = 'postgres-pw', 'database.dbname' = 'customers', 'database.server.name' = 'customers', - 'table.whitelist' = 'public.customers', + 'table.whitelist' = 'public.users', 'transforms' = 'unwrap', 'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState', 'transforms.unwrap.drop.tombstones' = 'false', @@ -129,12 +61,10 @@ CREATE SOURCE CONNECTOR logistics_reader WITH ( ``` Check connect in control center if both connectors are up and running. -**Go to control center , terminate the query and drop the table customers** If not an error will appear - Create a stream for all data comming in **Why am I doing this?** ```bash -CREATE STREAM customers WITH ( - kafka_topic = 'customers.public.customers', +CREATE STREAM users WITH ( + kafka_topic = 'customers.public.users', value_format = 'avro' ); ``` @@ -156,32 +86,32 @@ CREATE STREAM shipments WITH ( ``` Check ksqldb flow in control center , the ideal world is having a ksqldb app per use case. -Lets create a materilized view of our customers +Lets create a materilized view of our users ```bash -CREATE TABLE customers_by_key AS +CREATE TABLE users_by_key AS SELECT id, latest_by_offset(name) AS name, latest_by_offset(age) AS age - FROM customers + FROM users GROUP BY id EMIT CHANGES; ``` ```bash -ksql> select * from customers_by_key emit changes; +ksql> select * from users_by_key emit changes; ``` **Open a new ssh terminal** We are going to check how pull queries maintain latest value of an specific key ```bash docker exec -it postgres /bin/bash -psql -U postgres-user customers -update customers set age=99 where name='ramon'; +psql -U postgres-user users +update users set age=99 where name='ramon'; exit exit ``` check what happend in the ksqldb terminal, now lets see latest value of the pull query ```bash -ksql> select * from customers_by_key where id='4'; +ksql> select * from users_by_key where id='4'; ``` lets do a left join to get who is ordering what ```bash @@ -193,7 +123,7 @@ CREATE STREAM enriched_orders AS c.name AS customer_name, c.age AS customer_age FROM orders AS o - LEFT JOIN customers_by_key c + LEFT JOIN users_by_key c ON o.customer_id = c.id EMIT CHANGES; ``` @@ -221,7 +151,7 @@ Lets sink the data to ElasticSearch for full text query searches CREATE SINK CONNECTOR enriched_writer WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' = 'http://elastic:9200', - 'type.name' = 'kafka-connect', + 'type.name' = 'kafkaconnect', 'topics' = 'shipped_orders' ); exit