Consider a topic of stock price events that you want to calculate the volume-weighted average price (VWAP) for each event, publishing the result to a new topic. There is no built-in function for VWAP, so we'll write a custom KSQL UDF that performs the calculation. see official tutorial
In our workshop the UDF is already prepared for and linked into KSQLDB. Please see the tutorial description and check proceedings. Start this lab:
docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088
ksql> SHOW FUNCTIONS;
ksql> DESCRIBE FUNCTION VWAP;
ksql> CREATE STREAM raw_quotes(ticker varchar, bid int, ask int, bidqty int, askqty int)
WITH (kafka_topic='stockquotes', value_format='avro', key='ticker', partitions=1);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 15, 25, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV', 25, 35, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 35, 45, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 45, 55, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 10, 20, 50, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV', 30, 40, 100, 50);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 30, 40, 50, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 50, 60, 100, 50);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 15, 20, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV', 25, 35, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 35, 45, 100, 100);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 45, 55, 100, 100);
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ticker, vwap(bid, bidqty, ask, askqty) AS vwap FROM raw_quotes EMIT CHANGES LIMIT 12;
ksql> CREATE STREAM vwap WITH (kafka_topic = 'vwap', partitions = 1) AS
SELECT ticker,
vwap(bid, bidqty, ask, askqty) AS vwap
FROM raw_quotes
EMIT CHANGES;
ksql> PRINT 'vwap' FROM BEGINNING LIMIT 12;
End of Lab 2