Skip to content

Latest commit

 

History

History
57 lines (54 loc) · 2.58 KB

02_usecase_finserv_2.md

File metadata and controls

57 lines (54 loc) · 2.58 KB

Stock pricing with UDF

Consider a topic of stock price events that you want to calculate the volume-weighted average price (VWAP) for each event, publishing the result to a new topic. There is no built-in function for VWAP, so we'll write a custom KSQL UDF that performs the calculation. see official tutorial

In our workshop the UDF is already prepared for you and linked into KSQLDB. Please see the tutorial description and check proceedings. Start this lab:

docker exec -it workshop-ksqldb-cli ksql http://ksqldb-server:8088
ksql> SHOW FUNCTIONS;
ksql> DESCRIBE FUNCTION VWAP;
CREATE STREAM raw_quotes
    (ticker varchar key, 
    bid int, 
    ask int, 
    bidqty int, 
    askqty int)
    WITH (kafka_topic='stockquotes', value_format='avro', partitions=1);
ksql> INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 15, 25, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV',   25, 35, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 35, 45, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 45, 55, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 10, 20, 50, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV',   30, 40, 100, 50);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 30, 40, 50, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 50, 60, 100, 50);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZTEST', 15, 20, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVV',   25, 35, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZVZZT', 35, 45, 100, 100);
INSERT INTO raw_quotes (ticker, bid, ask, bidqty, askqty) VALUES ('ZXZZT', 45, 55, 100, 100);
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM raw_quotes EMIT CHANGES LIMIT 12;
ksql> SELECT ticker, vwap(bid, bidqty, ask, askqty) AS vwap FROM raw_quotes EMIT CHANGES LIMIT 12;
CREATE STREAM vwap WITH (kafka_topic = 'vwap', partitions = 1) AS
    SELECT ticker,
           vwap(bid, bidqty, ask, askqty) AS vwap
    FROM raw_quotes
    EMIT CHANGES;
ksql> PRINT 'vwap' FROM BEGINNING LIMIT 12;
ksql> exit;

End of Lab 2

go back to Agenda