Skip to content

Latest commit

 

History

History
212 lines (161 loc) · 11.7 KB

Dec 23 2020 - Using Spark Streaming in Azure Databricks.md

File metadata and controls

212 lines (161 loc) · 11.7 KB

Dec 23 2020 - Using Spark Streaming in Azure Databricks

Azure Databricks repository is a set of blogposts as a Advent of 2020 present to readers for easier onboarding to Azure Databricks!

Series of Azure Databricks posts:

Yesterday we took a closer look into the nuts and bolts of DataFrames using Spark SQL and the power of using SQL to query data.

For today we will take a glimpse into Streaming with Spark Core API in Azure Databricks.

Spark Streaming is the process that can analyse not only batches of data but also streams of data in near real-time. It gives the powerful interactive and analytical applications across both hot and cold data (streaming data and historical data). Spark Streaming is a fault tolerance system, meaning due to lineage of operations, Spark will always remember where you stopped and in case of a worker error, another worker can always recreate all the data transformation from partitioned RDD (assuming that all the RDD transformations are deterministic).

Spark streaming has a native connectors to many data sources, such as HDFS, Kafka, S3, Kinesis and even Twitter.

Start your Workspace in Azure Databricks. Create new notebook, name it: Day23_streaming and use the default language: Python. If you decide to use EventHubs from reading data from HDFS or other places, Scala language might be slightly better.

If you will be using Spark context, otherwise just import pyspark.sql namespace.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

We will be using the demo data from databricks-datasets folder:

%fs ls /databricks-datasets/structured-streaming/events/

And you can check the structure of one file, by using:

%fs head /databricks-datasets/structured-streaming/events/file-0.json

You must do the initialisation of the stream with:

  • inputPath (where your files will be coming)
  • Schema of the input files
  • ReadStream function with a function if schema, input data additional options (As: picking one file at a time)
  • Aggregate function (for count of events in this particular case) and use of ReadStream function.
inputPath = "/databricks-datasets/structured-streaming/events/"

# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as stream of one at a time
    .json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)

You start a streaming computation by defining a sink and starting it. In this case, to query the counts interactively, set the completeset of 1 hour counts to be in an in-memory table.

Run the following command to examine the outcome of a query.

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

And once the cluster is running, you can do variety of analysis. The Key component is the ".start" method - embedded in the main function, that you can run the spark due to incoming poklikuc.

You can also further shape the data by using Spark SQL:

%sql 
SELECT 
 action
,date_format(window.end, "MMM-dd HH:mm") as time
,count 
FROM counts 
ORDER BY time, action

Tomorrow we will explore Spark's own MLlib package for Machine Learning using Azure Databricks.

Complete set of code and SQL notebooks (including HTML) will be available at the Github repository.

Happy Coding and Stay Healthy!