Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
povconfluent committed Mar 30, 2022
1 parent de94f8c commit ac9db14
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 89 deletions.
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ services:
MONGO_INITDB_ROOT_USERNAME: mongo-user
MONGO_INITDB_ROOT_PASSWORD: mongo-pw
MONGO_REPLICA_SET_NAME: my-replica-set
volumes:
- ./mongodb/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro
command: --replSet my-replica-set --bind_ip_all

postgres:
Expand Down
10 changes: 10 additions & 0 deletions docker/mongodb/mongo-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
mongo -u mongo-user -p mongo-pw admin <<EOF
rs.initiate();
use config;
db.createRole({ role: "dbz-role", privileges: [{resource: { db: "config", collection: "system.sessions" }, actions: [ "find", "update", "insert", "remove" ]}], roles: [ { role: "dbOwner", db: "config" },{ role: "dbAdmin", db: "config" }, { role: "readWrite", db: "config" }]});
use admin;
db.createUser({"user" : "dbz-user","pwd": "dbz-pw","roles" : [{ "role" : "root","db" : "admin" },{"role" : "readWrite", "db" : "logistics" }, { "role" : "dbz-role", "db" : "config"}]});
use logistics;
db.createCollection("orders");
db.createCollection("shipments");
EOF
11 changes: 11 additions & 0 deletions docker/postgres/db-init-scripts/00-inital.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,15 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" -d "$POSTGRES_DB" <<-EOSQL
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (1,'Waylen','Tubble','[email protected]','Male',403646, 465);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (2,'Joell','Wilshin','[email protected]','Female',109825, 705);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (3,'Ilaire','Latus','[email protected]','Male',407964, 750);
CREATE TABLE users (id TEXT PRIMARY KEY, name TEXT, age INT);
INSERT INTO users (id, name, age) VALUES ('1', 'fred', 34);
INSERT INTO users (id, name, age) VALUES ('2', 'sue', 25);
INSERT INTO users (id, name, age) VALUES ('3', 'bill', 51);
INSERT INTO users (id, name, age) VALUES ('4', 'ramon', 36);
INSERT INTO users (id, name, age) VALUES ('5', 'juan', 28);
INSERT INTO users (id, name, age) VALUES ('6', 'federico', 56);
INSERT INTO users (id, name, age) VALUES ('7', 'luis', 37);
INSERT INTO users (id, name, age) VALUES ('8', 'pedro', 27);
INSERT INTO users (id, name, age) VALUES ('9', 'pablo', 59);
INSERT INTO users (id, name, age) VALUES ('10', 'peter', 59);
EOSQL
108 changes: 19 additions & 89 deletions labs/05_usecase_realtime_inventory_ETL.md
Original file line number Diff line number Diff line change
@@ -1,81 +1,13 @@
Initial configuration
config postgres database
```bash
docker exec -it postgres /bin/bash
psql -U postgres-user customers
CREATE TABLE customers (id TEXT PRIMARY KEY, name TEXT, age INT);
INSERT INTO customers (id, name, age) VALUES ('1', 'fred', 34);
INSERT INTO customers (id, name, age) VALUES ('2', 'sue', 25);
INSERT INTO customers (id, name, age) VALUES ('3', 'bill', 51);
INSERT INTO customers (id, name, age) VALUES ('4', 'ramon', 36);
INSERT INTO customers (id, name, age) VALUES ('5', 'juan', 28);
INSERT INTO customers (id, name, age) VALUES ('6', 'federico', 56);
INSERT INTO customers (id, name, age) VALUES ('7', 'luis', 37);
INSERT INTO customers (id, name, age) VALUES ('8', 'pedro', 27);
INSERT INTO customers (id, name, age) VALUES ('9', 'pablo', 59);
INSERT INTO customers (id, name, age) VALUES ('10', 'peter', 59);
exit
exit
```
config mongodb database
**Open a new terminal using ssh ec2-user@publicip**
Check collections in mongoDB
```bash
docker exec -it mongo /bin/bash

mongo -u $MONGO_INITDB_ROOT_USERNAME -p mongo-pw admin

rs.initiate()

use config

db.createRole({
role: "dbz-role",
privileges: [
{
resource: { db: "config", collection: "system.sessions" },
actions: [ "find", "update", "insert", "remove" ]
}
],
roles: [
{ role: "dbOwner", db: "config" },
{ role: "dbAdmin", db: "config" },
{ role: "readWrite", db: "config" }
]
})

use admin

db.createUser({
"user" : "dbz-user",
"pwd": "dbz-pw",
"roles" : [
{
"role" : "root",
"db" : "admin"
},
{
"role" : "readWrite",
"db" : "logistics"
},
{
"role" : "dbz-role",
"db" : "config"
}
]
})

use logistics

db.createCollection("orders")

db.createCollection("shipments")

exit

exit
```
**Open a new terminal using ssh ec2-user@publicip**
show collections
```
Lets start loading Orders into our collections
```bash
cd /home/ec2-user/ksqldbWorkshop-main/docker
python loadOrders.py
Expand All @@ -93,17 +25,17 @@ Lets start running ksqldb commands
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088
ksql> SET 'auto.offset.reset' = 'earliest';
```
Create source connector for Postgres, it has customer information. you can do it from control center if you wish, give it a try.
Create source connector for Postgres, it has user information. you can do it from control center if you wish, give it a try.
```bash
CREATE SOURCE CONNECTOR customers_reader WITH (
CREATE SOURCE CONNECTOR users_reader WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'postgres',
'database.port' = '5432',
'database.user' = 'postgres-user',
'database.password' = 'postgres-pw',
'database.dbname' = 'customers',
'database.server.name' = 'customers',
'table.whitelist' = 'public.customers',
'table.whitelist' = 'public.users',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
Expand All @@ -129,12 +61,10 @@ CREATE SOURCE CONNECTOR logistics_reader WITH (
```
Check connect in control center if both connectors are up and running.

**Go to control center , terminate the query and drop the table customers** If not an error will appear
Create a stream for all data comming in **Why am I doing this?**
```bash
CREATE STREAM customers WITH (
kafka_topic = 'customers.public.customers',
CREATE STREAM users WITH (
kafka_topic = 'customers.public.users',
value_format = 'avro'
);
```
Expand All @@ -156,32 +86,32 @@ CREATE STREAM shipments WITH (
```
Check ksqldb flow in control center , the ideal world is having a ksqldb app per use case.

Lets create a materilized view of our customers
Lets create a materilized view of our users
```bash
CREATE TABLE customers_by_key AS
CREATE TABLE users_by_key AS
SELECT id,
latest_by_offset(name) AS name,
latest_by_offset(age) AS age
FROM customers
FROM users
GROUP BY id
EMIT CHANGES;
```
```bash
ksql> select * from customers_by_key emit changes;
ksql> select * from users_by_key emit changes;
```
**Open a new ssh terminal** We are going to check how pull queries maintain latest value of an specific key
```bash
docker exec -it postgres /bin/bash
psql -U postgres-user customers
update customers set age=99 where name='ramon';
psql -U postgres-user users
update users set age=99 where name='ramon';

exit
exit
```
check what happend in the ksqldb terminal, now lets see latest value of the pull query
```bash
ksql> select * from customers_by_key where id='4';
ksql> select * from users_by_key where id='4';
```
lets do a left join to get who is ordering what
```bash
Expand All @@ -193,7 +123,7 @@ CREATE STREAM enriched_orders AS
c.name AS customer_name,
c.age AS customer_age
FROM orders AS o
LEFT JOIN customers_by_key c
LEFT JOIN users_by_key c
ON o.customer_id = c.id
EMIT CHANGES;
```
Expand Down Expand Up @@ -221,7 +151,7 @@ Lets sink the data to ElasticSearch for full text query searches
CREATE SINK CONNECTOR enriched_writer WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elastic:9200',
'type.name' = 'kafka-connect',
'type.name' = 'kafkaconnect',
'topics' = 'shipped_orders'
);
exit
Expand Down

0 comments on commit ac9db14

Please sign in to comment.