Tutorial 3 - Writing to Pravega using Java |
In this tutorial, we will create a Java version of stream_generated_data_to_pravega.py
which was described in Tutorial 1.
Java 11: Java 11 JDK is required. Refer to Prepare Development Environment.
Gradle: This tutorial uses Gradle to build the Java package. If you are using Ubuntu, you may use this command to install it.
sudo apt-get install gradle
Create the main Java class file
.import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.streaming.Trigger; public class GeneratedDataToPravega { public static void main(String[] args) throws Exception { SparkSession spark = SparkSession.builder().getOrCreate(); String scope = System.getenv().getOrDefault("PRAVEGA_SCOPE", "examples"); boolean allowCreateScope = !System.getenv().containsKey("PROJECT_NAME"); String controller = System.getenv().getOrDefault("PRAVEGA_CONTROLLER_URI", "tcp://"); String checkpointLocation = System.getenv().getOrDefault("CHECKPOINT_DIR", "/tmp/spark-checkpoints-GeneratedDataToPravega"); spark .readStream() .format("rate") .load() .selectExpr("cast(timestamp as string) as event", "cast(value as string) as routing_key") .writeStream() .trigger(Trigger.ProcessingTime("3 seconds")) .outputMode("append") .format("pravega") .option("allow_create_scope", allowCreateScope) .option("controller", controller) .option("scope", scope) .option("stream", "streamprocessing1") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination(); } }
.apply plugin: "java" sourceCompatibility = "1.8" targetCompatibility = "1.8" archivesBaseName = "my-spark-app" repositories { mavenLocal() mavenCentral() } dependencies { compileOnly group: "org.apache.spark", name: "spark-sql_2.12", version: "3.0.1" }
Compile your Java source code to produce a JAR file.
This will build the file
.gradle build
Follow these steps to run this application locally and write to your local development installation of Pravega.
Run spark-submit.
spark-submit \ --master 'local[2]' \ --driver-memory 4g \ --executor-memory 4g \ --total-executor-cores 1 \ --packages io.pravega:pravega-connectors-spark-3.0_2.12:0.9.0 \ --class GeneratedDataToPravega \ build/libs/my-spark-app.jar
This job will continue to run and write events until stopped.
