Skip to content

Commit

Permalink
yeskaf
Browse files Browse the repository at this point in the history
  • Loading branch information
jr-marquez committed Apr 21, 2021
1 parent a347ef2 commit bdc06cf
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions labs/mqtt_demo.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ cd /home/ec2-user/ksqldbWorkshop-main/docker

=== Data

* Option 1 : load sample data from local
+
load sample data from local

[source,bash]
----
kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/dummy_data.kcat
Expand Down Expand Up @@ -221,12 +221,6 @@ We will need to create an index pattern in kibana

* Go to the Kibana dashboard - <your lab ip>:5601



image::images/mqtt_kafka_07a.png[]

But who is `rmoff`, and does he mind us having access to all this information about him?

Check out the source data in MySQL:

[source,sql]
Expand All @@ -253,6 +247,7 @@ SELECT USERID, EMAIL, SHARE_LOCATION_OPTIN FROM USERS;
| hugh | [email protected] | 0 |
+--------+------------------+----------------------+
----
exit mysql

Ingest the data into ksqlDB

Expand Down Expand Up @@ -291,17 +286,18 @@ Declare the KSQL table on the topic populated from the database:
[source,sql]
----
CREATE STREAM USERS_STREAM_CDC WITH (KAFKA_TOPIC='workshop.demo.USERS-cdc', VALUE_FORMAT='AVRO');
create stream USERS_STREAM with (partitions=1) as select after->USERID as USERID,
after->EMAIL as EMAIL,
after->SHARE_LOCATION_OPTIN as SHARE_LOCATION_OPTIN,
after->PRIVACY_LOCATION_LAT as PRIVACY_LOCATION_LAT,
after->PRIVACY_LOCATION_LON as PRIVACY_LOCATION_LON,
after->PRIVACY_ZONE_KM as PRIVACY_ZONE_KM,
after->CREATE_TS as CREATE_TS,
after->UPDATE_TS as UPDATE_TS
CREATE STREAM USERS_STREAM_CDC WITH (KAFKA_TOPIC='mysql2-workshop.demo.USERS', VALUE_FORMAT='AVRO');
create stream USERS_STREAM with (partitions=1) as select USERID,
EMAIL,
SHARE_LOCATION_OPTIN,
PRIVACY_LOCATION_LAT,
PRIVACY_LOCATION_LON,
PRIVACY_ZONE_KM,
CREATE_TS,
UPDATE_TS
from USERS_STREAM_CDC
partition by after->USERID;
partition by USERID;
CREATE TABLE USERS (
Expand All @@ -324,7 +320,7 @@ Examine the data:
----
SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
R.WHO,
R.WHOSIMPLE,
U.EMAIL,
U.SHARE_LOCATION_OPTIN,
R.LAT,
Expand All @@ -338,6 +334,7 @@ Open a new ssh terminal and run datagen

[source,bash]
----
cd /home/ec2-user/ksqldbWorkshop-main/docker
./run_datagen.sh
----

Expand Down Expand Up @@ -404,21 +401,23 @@ We can apply this logic in the SQL as part of the streaming application:

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PHONE_LOCATION_OPTIN AS
SELECT WHOSIMPLE,
EVENT_TIME_EPOCH_MS_TS,
CASE
WHEN U.SHARE_LOCATION_OPTIN = 1 THEN
CASE
WHEN GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') > PRIVACY_ZONE_KM
WHEN GEO_DISTANCE (LAT,LON,U.PRIVACY_LOCATION_LAT,U.PRIVACY_LOCATION_LON,'KM') > U.PRIVACY_ZONE_KM
THEN LOCATION
ELSE '<Private>'
END
WHEN U.SHARE_LOCATION_OPTIN = 0 THEN '<Opted out>'
ELSE '<No user record>'
END AS LOCATION,
GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE,
PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM,
GEO_DISTANCE (LAT,LON,U.PRIVACY_LOCATION_LAT,U.PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE,
U.PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM,
BATTERY_PCT,
BATTERY_STATUS,
U.EMAIL AS EMAIL
Expand Down

0 comments on commit bdc06cf

Please sign in to comment.