-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7a6206b
commit 4d64c01
Showing
4 changed files
with
190 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
{ | ||
"namespace": "ksql", | ||
"name": "payment_aml_status", | ||
"type": "record", | ||
"fields": [ | ||
{"name": "OFFER_ID", "type": { | ||
"type": "int", | ||
"arg.properties": { | ||
"iteration": { "start": 1, "restart": 5} | ||
} | ||
}}, | ||
{"name": "OFFER_NAME", "type": { | ||
"type": "string", | ||
"arg.properties": { | ||
"options": [ | ||
"new_savings", | ||
"new_checking", | ||
"new_home_loan", | ||
"new_auto_loan", | ||
"no_offer" | ||
] | ||
} | ||
}}, | ||
{"name": "OFFER_URL", "type": { | ||
"type": "string", | ||
"arg.properties": { | ||
"options": [ | ||
"http://google.com.br/magnis/dis/parturient.json", | ||
"https://earthlink.net/in/ante.js", | ||
"https://webs.com/in/ante.jpg", | ||
"http://squidoo.com/venenatis/non/sodales/sed/tincidunt/eu.js", | ||
"https://ezinearticles.com/ipsum/primis/in/faucibus/orci/luctus.html" | ||
] | ||
} | ||
}} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
#!/usr/bin/env bash | ||
|
||
set -e | ||
export POSTGRES_USER="postgres-user" | ||
export POSTGRES_DB="customers" | ||
|
||
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" -d "$POSTGRES_DB" <<-EOSQL | ||
CREATE TABLE IF NOT EXISTS customers ( CUSTOMER_ID INT NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(26), LAST_NAME VARCHAR(26), EMAIL VARCHAR(26), GENDER VARCHAR(26), INCOME INT, FICO INT ); | ||
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (1,'Waylen','Tubble','[email protected]','Male',403646, 465); | ||
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (2,'Joell','Wilshin','[email protected]','Female',109825, 705); | ||
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES (3,'Ilaire','Latus','[email protected]','Male',407964, 750); | ||
EOSQL |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,148 @@ | ||
# Create datagen connector for Stocktrades | ||
We have an avro schema in datagen/ for stocks. In this lab we will create datagen conenctor for this. You can use datagen connector to generate your own data. | ||
check the schema script first | ||
# Create personalized banking promotions | ||
Consumers often face never-ending generic marketing messages not tailored to their needs, resulting in poor customer conversion rates. A better approach is known as 'Next Best Offer,' which leverages predictive analytics to analyze a customer’s spending habits and activities to create more targeted promotions. This recipe demonstrates how ksqlDB can take customer banking information to create a predictive analytics model and improve customer conversions through personalized marketing efforts. | ||
|
||
```bash | ||
cd ksqldbWorkshop-main/docker/ | ||
cat datagen/stocks_service.avro | ||
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088 | ||
``` | ||
Create the connector: | ||
```bash | ||
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-stocktrades/config \ | ||
-d '{"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", | ||
"key.converter": "org.apache.kafka.connect.storage.StringConverter", | ||
"kafka.topic": "stocktrades", | ||
"max.interval": 1000, | ||
"schema.filename": "/datagen/stocks_service.avro", | ||
"schema.keyfield": "userid" | ||
}' | ||
``` | ||
check now in [Control Center](http://localhost:9021) and play around in KSQL (in cli or in Control Center) | ||
ksql> | ||
CREATE SOURCE CONNECTOR customers WITH ( | ||
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector', | ||
'database.hostname' = 'postgres', | ||
'database.port' = '5432', | ||
'database.user' = 'postgres-user', | ||
'database.password' = 'postgres-pw', | ||
'database.dbname' = 'customers', | ||
'database.server.name' = 'customers', | ||
'table.whitelist' = 'public.customers', | ||
'transforms' = 'unwrap', | ||
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState', | ||
'transforms.unwrap.drop.tombstones' = 'false', | ||
'transforms.unwrap.delete.handling.mode' = 'rewrite' | ||
); | ||
``` | ||
check that we are doing CDC from postgres Database | ||
```bash | ||
ksql> print 'customers.public.customers' from beginning; | ||
|
||
ksql> SET 'auto.offset.reset' = 'earliest'; | ||
|
||
ksql> CREATE STREAM customers WITH ( | ||
kafka_topic = 'customers.public.customers', | ||
value_format = 'avro' | ||
); | ||
ksql> select * customer_activity emit changes; | ||
``` | ||
Lets create a table offers | ||
```bash | ||
ksql> CREATE TABLE offers ( | ||
OFFER_ID INTEGER PRIMARY KEY, | ||
OFFER_NAME VARCHAR, | ||
OFFER_URL VARCHAR | ||
) WITH ( | ||
KAFKA_TOPIC = 'OFFERS_STREAM', | ||
VALUE_FORMAT = 'AVRO', | ||
PARTITIONS = 1 | ||
); | ||
``` | ||
now we are going to insert some data in the topic | ||
```bash | ||
ksql> INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (1,'new_savings','http://google.com.br/magnis/dis/parturient.json'); | ||
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (2,'new_checking','https://earthlink.net/in/ante.js'); | ||
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (3,'new_home_loan','https://webs.com/in/ante.jpg'); | ||
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (4,'new_auto_loan','http://squidoo.com/venenatis/non/sodales/sed/tincidunt/eu.js'); | ||
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (5,'no_offer','https://ezinearticles.com/ipsum/primis/in/faucibus/orci/luctus.html'); | ||
``` | ||
Now we are going to create an activity stream | ||
```bash | ||
ksql> CREATE STREAM customer_activity_stream ( | ||
CUSTOMER_ID INTEGER KEY, | ||
ACTIVITY_ID INTEGER, | ||
IP_ADDRESS VARCHAR, | ||
ACTIVITY_TYPE VARCHAR, | ||
PROPENSITY_TO_BUY DOUBLE | ||
) WITH ( | ||
KAFKA_TOPIC = 'CUSTOMER_ACTIVITY_STREAM', | ||
VALUE_FORMAT = 'AVRO', | ||
PARTITIONS = 1 | ||
); | ||
``` | ||
and we are going to insert some activity | ||
```bash | ||
ksql> INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 1,'121.219.110.170','branch_visit',0.4); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 2,'210.232.55.188','deposit',0.56); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 3,'84.197.123.173','web_open',0.33); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 4,'70.149.233.32','deposit',0.41); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 5,'221.234.209.67','deposit',0.44); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 6,'102.187.28.148','web_open',0.33); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 7,'135.37.250.250','mobile_open',0.97); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 8,'122.157.243.25','deposit',0.83); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 9,'114.215.212.181','deposit',0.86); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 10,'248.248.0.78','new_account',0.14); | ||
``` | ||
lets create our application logic | ||
```bash | ||
ksql> CREATE STREAM next_best_offer | ||
WITH ( | ||
KAFKA_TOPIC = 'NEXT_BEST_OFFER', | ||
VALUE_FORMAT = 'AVRO', | ||
PARTITIONS = 1 | ||
) AS | ||
SELECT | ||
cask.CUSTOMER_ID as CUSTOMER_ID, | ||
cask.ACTIVITY_ID, | ||
cask.PROPENSITY_TO_BUY, | ||
cask.ACTIVITY_TYPE, | ||
ct.INCOME, | ||
ct.FICO, | ||
CASE | ||
WHEN ct.INCOME > 100000 AND ct.FICO < 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 1 | ||
WHEN ct.INCOME < 50000 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 2 | ||
WHEN ct.INCOME >= 50000 AND ct.FICO >= 600 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 3 | ||
WHEN ct.INCOME > 100000 AND ct.FICO >= 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 4 | ||
ELSE 5 | ||
END AS OFFER_ID | ||
FROM customer_activity_stream cask | ||
INNER JOIN customers ct WITHIN 1 HOURS ON cask.CUSTOMER_ID = ct.CUSTOMER_ID; | ||
|
||
``` | ||
```bash | ||
|
||
CREATE STREAM next_best_offer_lookup | ||
WITH ( | ||
KAFKA_TOPIC = 'NEXT_BEST_OFFER_LOOKUP', | ||
VALUE_FORMAT = 'AVRO', | ||
PARTITIONS = 1 | ||
) AS | ||
SELECT | ||
nbo.CUSTOMER_ID, | ||
nbo.ACTIVITY_ID, | ||
nbo.OFFER_ID, | ||
nbo.PROPENSITY_TO_BUY, | ||
nbo.ACTIVITY_TYPE, | ||
nbo.INCOME, | ||
nbo.FICO, | ||
ot.OFFER_NAME, | ||
ot.OFFER_URL | ||
FROM next_best_offer nbo | ||
INNER JOIN offers ot | ||
ON nbo.OFFER_ID = ot.OFFER_ID; | ||
``` | ||
keep watching the stream | ||
```bash | ||
ksql> select * from next_best_offer_lookup emit changes; | ||
``` | ||
open a new terminal and insert more activity | ||
```bash | ||
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088 | ||
ksql> print 'stocktrades' from beginning; | ||
ksql> create stream stocktrades with(kafka_topic='stocktrades', value_format='avro'); | ||
ksql> select * from stocktrades emit changes limit 20; | ||
ksql> select userid, sum(quantity*price) as money_invested from stocktrades group by userid emit changes; | ||
ksql> exit; | ||
|
||
ksql> INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 8,'122.157.243.25','deposit',0.99); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 9,'1.215.212.181','deposit',0.78); | ||
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 10,'248.248.0.77','new_account',0.14); | ||
``` | ||
End Lab 3 | ||
[go back to Agenda](https://github.com/jr-marquez/ksqldbWorkshop/blob/main/README.md#hands-on-agenda-and-labs) |