From bdc06cfcd23e9f2ae2c8dc392c67ed22e763b2d2 Mon Sep 17 00:00:00 2001 From: jr-marquez Date: Thu, 22 Apr 2021 00:07:20 +0200 Subject: [PATCH] yeskaf --- labs/mqtt_demo.adoc | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/labs/mqtt_demo.adoc b/labs/mqtt_demo.adoc index 57fa879..8fccb31 100644 --- a/labs/mqtt_demo.adoc +++ b/labs/mqtt_demo.adoc @@ -11,8 +11,8 @@ cd /home/ec2-user/ksqldbWorkshop-main/docker === Data -* Option 1 : load sample data from local -+ +load sample data from local + [source,bash] ---- kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/dummy_data.kcat @@ -221,12 +221,6 @@ We will need to create an index pattern in kibana * Go to the Kibana dashboard - :5601 - - -image::images/mqtt_kafka_07a.png[] - -But who is `rmoff`, and does he mind us having access to all this information about him? - Check out the source data in MySQL: [source,sql] @@ -253,6 +247,7 @@ SELECT USERID, EMAIL, SHARE_LOCATION_OPTIN FROM USERS; | hugh | hugh@example.com | 0 | +--------+------------------+----------------------+ ---- +exit mysql Ingest the data into ksqlDB @@ -291,17 +286,18 @@ Declare the KSQL table on the topic populated from the database: [source,sql] ---- -CREATE STREAM USERS_STREAM_CDC WITH (KAFKA_TOPIC='workshop.demo.USERS-cdc', VALUE_FORMAT='AVRO'); -create stream USERS_STREAM with (partitions=1) as select after->USERID as USERID, - after->EMAIL as EMAIL, - after->SHARE_LOCATION_OPTIN as SHARE_LOCATION_OPTIN, - after->PRIVACY_LOCATION_LAT as PRIVACY_LOCATION_LAT, - after->PRIVACY_LOCATION_LON as PRIVACY_LOCATION_LON, - after->PRIVACY_ZONE_KM as PRIVACY_ZONE_KM, - after->CREATE_TS as CREATE_TS, - after->UPDATE_TS as UPDATE_TS +CREATE STREAM USERS_STREAM_CDC WITH (KAFKA_TOPIC='mysql2-workshop.demo.USERS', VALUE_FORMAT='AVRO'); + +create stream USERS_STREAM with (partitions=1) as select USERID, + EMAIL, + SHARE_LOCATION_OPTIN, + PRIVACY_LOCATION_LAT, + PRIVACY_LOCATION_LON, + PRIVACY_ZONE_KM, + CREATE_TS, + UPDATE_TS from USERS_STREAM_CDC -partition by after->USERID; +partition by USERID; CREATE TABLE USERS ( @@ -324,7 +320,7 @@ Examine the data: ---- SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS, - R.WHO, + R.WHOSIMPLE, U.EMAIL, U.SHARE_LOCATION_OPTIN, R.LAT, @@ -338,6 +334,7 @@ Open a new ssh terminal and run datagen [source,bash] ---- +cd /home/ec2-user/ksqldbWorkshop-main/docker ./run_datagen.sh ---- @@ -404,21 +401,23 @@ We can apply this logic in the SQL as part of the streaming application: [source,sql] ---- +SET 'auto.offset.reset' = 'earliest'; + CREATE STREAM PHONE_LOCATION_OPTIN AS SELECT WHOSIMPLE, EVENT_TIME_EPOCH_MS_TS, CASE WHEN U.SHARE_LOCATION_OPTIN = 1 THEN CASE - WHEN GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') > PRIVACY_ZONE_KM + WHEN GEO_DISTANCE (LAT,LON,U.PRIVACY_LOCATION_LAT,U.PRIVACY_LOCATION_LON,'KM') > U.PRIVACY_ZONE_KM THEN LOCATION ELSE '' END WHEN U.SHARE_LOCATION_OPTIN = 0 THEN '' ELSE '' END AS LOCATION, - GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE, - PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM, + GEO_DISTANCE (LAT,LON,U.PRIVACY_LOCATION_LAT,U.PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE, + U.PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM, BATTERY_PCT, BATTERY_STATUS, U.EMAIL AS EMAIL