title |
---|
Tutorial 2 - Reading from Pravega |
In this tutorial, we'll use stream_pravega_to_console.py.
-
The beginning of this script is the same as
stream_generated_data_to_pravega.py
. Now we are reading from a Pravega stream source.(spark .readStream .format("pravega") .option("controller", controller) .option("allow_create_scope", allowCreateScope) .option("scope", scope) .option("stream", "streamprocessing1")
-
The first time that this Spark application runs, we can choose where in the stream to begin reading from. We can choose
earliest
orlatest
. If the previous execution of this Spark application saved a checkpoint in the checkpoint directory, then this option is ignored and the application will resume from exactly where it left off..option("start_stream_cut", "earliest") .load()
-
When reading from a Pravega stream, the following columns will be available:
Column name Data Type Description event binary The serialized event. If a string was written, this will be a UTF-8 string. scope string The name of the Pravega scope. stream string The name of the Pravega scope. segment_id long The ID of the Pravega segment containing this event. offset long The byte offset in the Pravega segment that contains this event. -
Since we wrote a string event, we need to cast it from a UTF-8 string to a Spark string.
.selectExpr("cast(event as string)", "scope", "stream", "segment_id", "offset")
-
Next, we write the output to the console. We will see the result in the Spark driver log.
.writeStream .trigger(processingTime="3 seconds") .outputMode("append") .format("console") .option("truncate", "false")
-
Stateful operations in Spark must periodically write checkpoints which can be used to recover from failures. The checkpoint directory identified by the environment variable
CHECKPOINT_DIR
should be used for this purpose. It should be highly available until the Spark application is deleted. This should be used even for Spark applications which do not use Pravega..option("checkpointLocation", checkPointLocation)
import CheckpointDir from '../snippets/spark-connectors/checkpoint-dir.md';
To run this application, refer to the steps in Tutorial 1.