A PoC showing what can be done with streaming and batch sources of data, Apache Kafka and Confluent, and various data stores and tools.
This can run locally or using Confluent Cloud
-
Register for an account at https://datafeeds.networkrail.co.uk/
-
Set username and password in
/data/credentials.properties /data/set_credentials_env.sh
-
Make sure you’ve allocated Docker a bunch of memory. Like, at least 8GB. If you don’t then you’ll see containers appearing to randomly die and you’ll get frustrated 😕
-
Check how much memory Docker has using this:
docker system info | grep Memory
-
-
Launch the stack
docker-compose up -d
-
Check health
docker-compose ps
-
Launch ksqlDB CLI
$ docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088' =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2020 Confluent Inc. CLI v0.15.0, Server v0.15.0 located at http://ksqldb:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
N.B. A lot of the code is complete, but not documented below. The canonical version is the code; the docs below may or may not be accurate and/or complete. The code is sensibly named and laid out though so should be easy to follow.
./deploy.sh
SET 'auto.offset.reset' = 'earliest';
SELECT SCHEDULE_KEY,
TRAIN_STATUS,
POWER_TYPE,
SEATING_CLASSES,
ORIGIN_DESCRIPTION,
ORIGIN_PUBLIC_DEPARTURE_TIME,
DESTINATION_DESCRIPTION,
DESTINATION_PUBLIC_ARRIVAL_TIME
FROM SCHEDULE_00
WHERE ORIGIN_PUBLIC_DEPARTURE_TIME IS NOT NULL
EMIT CHANGES
LIMIT 1;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|SCHEDULE_KEY |TRAIN_STATUS |POWER_TYPE |SEATING_CLASSES |ORIGIN_DESCRIPTION |ORIGIN_PUBLIC_DEPARTU|DESTINATION_DESCRIPTI|DESTINATION_PUBLIC_AR|
| | | | | |RE_TIME |ON |RIVAL_TIME |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|N30592/2021-05-01/N |STP Bus |null |null |Oxenholme |1240 |Carlisle |1343 |
SELECT TIPLOC, NAME, DESCRIPTION, CRS, STANOX, LAT_LON FROM LOCATIONS WHERE TIPLOC='LEEDS';
+--------+------+------------+-----+--------+------------------------------------+
|TIPLOC |NAME |DESCRIPTION |CRS |STANOX |LAT_LON |
+--------+------+------------+-----+--------+------------------------------------+
|LEEDS | |Leeds |LDS |17132 |{lat=53.79516409, lon=-1.549093312} |
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') as ACTUAL_TIMESTAMP,
EVENT_TYPE,
MVT_DESCRIPTION,
PLATFORM,
VARIATION ,
TOC,
TRAIN_ID,
MVT_LAT_LON
FROM TRAIN_MOVEMENTS
EMIT CHANGES;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ACTUAL_TIMESTAMP |EVENT_TYPE |MVT_DESCRIPTION |PLATFORM |VARIATION |TOC |TRAIN_ID |MVT_LAT_LON |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|2021-03-23 21:03:51 |ARRIVAL |Flixton | |1 MINS EARLY |Arriva Trains Norther|332O781Z23 |{lat=53.44395983, lon|
| | | | | |n | |=-2.382366187} |
|2021-03-23 21:03:21 |ARRIVAL |null |Platform 1 |1 MINS EARLY |Arriva Trains Norther|092H731Z23 |null |
| | | | | |n | | |
|2021-03-23 21:04:01 |ARRIVAL |Down Passenger Loop |Platform 5 |ON TIME |Arriva Trains Norther|361N841Z23 |null |
| | | | | |n | | |
|2021-03-23 21:04:01 |ARRIVAL |Kidsgrove |Platform 2 |1 MINS EARLY |Arriva Trains Norther|432H841Z23 |{lat=53.08566846, lon|
| | | | | |n | |=-2.24481102} |
|2021-03-23 21:04:06 |ARRIVAL |Finsbury Park Sig K38| |4 MINS LATE |London North Eastern |541N34MZ23 |null |
| | |1 | | |Railway | | |
Hacky way to keep the connector running by restarting it after network glitches etc
while [ 1 -eq 1 ];
do
./data/ingest/movements/check_latest_timestamp_mac.sh ; ./data/ingest/movements/restart_failed_connector_tasks.sh
sleep 300
done
Regarding activations:
Most trains are called automatically (auto-call) before the train is due to run, either 1 or 2 hours depending on the train’s class. The TRUST mainframe runs an internal process every 30 seconds throughout the day, causing potentially two lots of train activation messages to be received every minute.
therefore the point at which you start the pipeline there may be movement messages for trains for which the activation message was sent prior to the pipeline starting. This consequently means that the movements won’t be linked to schedules because activations provide the conduit.
create or replace table mvt_activation_ct as SELECT TOC, sum(case when SCHEDULE_KEY='no_schedule_activation_found' then 1 else 0 end) as no_activation_found,sum(case when SCHEDULE_KEY='no_schedule_activation_found' then 0 else 1 end) as activation_found, COUNT(*) as ct, count_distinct(train_id) as unique_trains FROM TRAIN_MOVEMENTS_01 WHERE SCHEDULE_KEY='no_schedule_activation_found' GROUP BY TOC EMIT CHANGES;
select * from mvt_activation_ct emit changes; --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |TOC |NO_ACTIVATION_FOUND |ACTIVATION_FOUND |CT | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |East Midlands Trains |673 |0 |673 | |London North Eastern Railway |274 |0 |274 | |TransPennine Express |384 |0 |384 | |Arriva Trains Northern |2355 |0 |2355 |
Once all pipelines are up and running, execute ./data/configure_topics.sh
to set the retention period to 26 weeks on each topic.
Set up the sink connectors:
./data/egress/elasticsearch/00_create_template.sh
./data/egress/elasticsearch/01_create_sinks.sh
./data/egress/elasticsearch/02_set_kibana_config.sh
Status
./data/egress/elasticsearch/list_indices_stats.sh
Connectors
----------
sink-elastic-schedule_02-v01 | RUNNING | RUNNING
sink-elastic-train_cancellations_02-v01 | RUNNING | RUNNING
sink-elastic-train_cancellations_activations_schedule_00-v01 | RUNNING | RUNNING
sink-elastic-train_movements_01-v01 | RUNNING | RUNNING
sink-elastic-train_movements_activations_schedule_00-v01 | RUNNING | RUNNING
Indices and doc count
---------------------
train_movements_01 0
train_movements_activations_schedule_00 0
train_cancellations_activations_schedule_00 0
train_cancellations_02 0
schedule_02 42529
-
Explore in Kibana’s Discover view
-
Use Kibana’s Management → Saved Objects → Import option to import the
/data/egress/elasticsearch/kibana_objects.json
file
./data/egress/postgres/00_create_sink.sh
$ docker-compose exec postgres bash -c 'echo "select count(*) from \"TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00\";" | psql -U $POSTGRES_USER $POSTGRES_DB'
count
-------
450
(1 row)
SELECT "ACTUAL_TIMESTAMP", to_timestamp("ACTUAL_TIMESTAMP"/1000) FROM "TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00" ORDER BY "ACTUAL_TIMESTAMP" DESC LIMIT 5;