Skip to content

Latest commit

 

History

History
97 lines (64 loc) · 4.25 KB

friction_log.adoc

File metadata and controls

97 lines (64 loc) · 4.25 KB

Data Pipeline - friction log

Issue 01 - amending KSQL pipelines

Pipeline has five dependent streams. I need to make a change to stream #2, to add some conditions to a CASE. There’ll be no change to the schema.

  • I can’t do this without dropping all dependent queries

  • I can’t drop all dependent queries automatically because there’s no CASCADE option and the query names are not deterministic so I can’t script it either

  • Only way is manually, or to trash everything and rebuild (https://rmoff.net/2019/03/25/terminate-all-ksql-queries/)

  • Alternatively restart KSQL server with a new app id

  • Once I drop and recreate a stream it’s going to either (a) reprocess _everything (SET 'auto.offset.reset' = 'earliest';) or it’s going to skip messages received in between the stream being dropped and recreated (SET 'auto.offset.reset' = 'latest';).

Issue 02 - composite keys

I want to take advantage of idempoent updates in Elasticsearch, so that records re-written by KSQL to a topic (e.g. when redefining a pipeline as above) are not duplicated in Elasticsearch (but instead updated)

This means using key.ignore=false in Kafka Connect, and making sure the message key is set correctly.

Can this be done in KSQL and PARTITION BY? Or in KSQL to create the composite key and then a series of SMT in Kafka Connect to set it as the message key?

Also can be done in KSQL with PARTITION BY

SELECT […​], TMA.TRAIN_ID + CAST(TMA.ACTUAL_TIMESTAMP AS VARCHAR) + TMA.LOC_STANOX AS MSG_KEY FROM […​] PARTITION BY MSG_KEY;

SMT:

SELECT […​], TMA.TRAIN_ID + CAST(TMA.ACTUAL_TIMESTAMP AS VARCHAR) + TMA.LOC_STANOX AS MSG_KEY

"transforms": "ValueToKey,extractKey", "transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.ValueToKey.fields":"MSG_KEY", "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field":"MSG_KEY",

Issue 02a - Composite keys in KSQL

I want to specify multiple columns for a key in a TABLE but can only specify one - so have to create intermediate step to build composite key first before declaring a table

Issue 03 - Scripting deployment

When using KSQL to re-partition a topic in order for a table to be declared on it, if the CREATE TABLE runs too soon it will fail because the schema doesn’t yet exist in the Schema Registry.

Issue 04 - Cleaning the environment is extremely cumbersome

  1. Bring down service

    docker-compose stop ksql-server
    ./data/ksql/delete_intermediate_topics.sh
  2. Increment KSQL_KSQL_SERVICE_ID in docker-compose.yml

  3. Bring back up service (up is necessary vs start as the latter just resumes the existing container, whereas up will re-parse the docker-compose.yml and apply any changes before restart the container)

    docker-compose up -d ksql-server

    Check metastore is clean:

    SHOW STREAMS;
    SHOW TABLES;

Issue 05 - Which topics are live? How big are they?

Once streams are built and data is flowing, it can be hard to keep track of what is what, particularly in a dev environment where there are lots of versions of streams etc.

When I do SHOW STREAMS I really want to know:

  • When was the last message received? → which streams are stale (not popualted) or dead (problem with the source query, e.g. seriallisation of source records)

  • How many messages are there on it? → is the number going up? Does it match my mental model of how much data there should be?

Issue 06 - What’s the current status of my KSQL queries?

I did docker-compose stop and then docker-compose start and I’m not seeing any new data. I think my KSQL queries are dead, but I have no way to see. We should add this info to SHOW QUERIES

Issue 07 - What queries do I have running?

SHOW QUERIES shows all my queries and the query text. For anything except the smallest query this results in screenfuls of text, making it pretty unusable. We should have SHOW QUERIES to show just the query id, Kafka topic, the beginning of the query string (truncated at one line), and add query status and ideally source/sink object names