Inputs are used to bring external data into an Envelope pipeline. The data brought in from an input can then be further used to derive new data and to write out to external outputs.
Inputs can operate as a batch or a stream. When at least one input in an Envelope pipeline is a stream then the pipeline will run in Spark Streaming mode.
An input maps back to Spark by creating a DataFrame for a batch input and a DStream for a stream input. Envelope assigns the DataFrame of a batch input or the DataFrame of the micro-batch of a stream input to the data of the step that contains the input.
The process of turning a DStream micro-batch into a DataFrame is known as translation. Every stream input must specify a translator to convert the raw stream messages into structured rows for the DataFrame.
There are five inputs provided out of the box by Envelope. Custom inputs can be developed and provided to an Envelope pipeline to access other data sources.
There are four provided batch inputs: filesystem
, hive
, jdbc
, kudu
.
There is one provided streaming input: kafka
.
The filesystem
input can read in data from Hadoop-compatible file systems, such as HDFS and S3A, at the path given by the path
configuration, and interpret them as one of five formats.
For some of the supported file formats a schema must or can be given. There are two ways to specify a schema in the input configuration:
-
Provide the lists of field names and field types (in the same field order) to
field.names
andfield.types
. -
Provide an Avro schema (note this does not imply the data needs to be Avro formatted) either directly to
avro-schema.literal
or indirectly by giving the path to an Avro schema file toavro-schema.file
.
This input can read the contents of the path in five different formats (specified by the format
configuration):
-
parquet
will read the path as Parquet files. For this format the schema of the data is retrieved from the files so it is not specified in the configuration. This format uses Spark’sDataFrameReader#parquet
functionality. -
json
will read the path as JSON files. A schema can be provided, or if no schema is provided then it will be inferred from the data. This format uses Spark’sDataFrameReader#json
functionality. -
csv
will read the path as delimited text files. A schema can be provided, or if no schema is provided then it will be inferred from the data. See the configurations documentation for the many options for this format. This format uses Spark’sDataFrameReader#csv
functionality. -
input-format
will read the path using the given Hadoop InputFormat class and the given Envelope translator. This format allows input formats that already exist, or have been custom developed for the pipeline, to be plugged in without making an entirely new input. The InputFormat class is used to define how records will be retrieved from the path. The translator is then used to translate the unstructured records into typed fields so that the data can be represented as a DataFrame. This format uses Spark’sSparkContext#newAPIHadoopFile
functionality. -
text
will read the path as text files with a record per line and then translate the lines to typed fields using the given Envelope translator. This format uses Spark’sDataFrameReader#text
functionality.
The hive
input reads a table
from the Hive metastore, which includes tables created by Envelope’s Hive output and by Impala. This input uses Spark’s DataFrameReader#table
functionality.
The jdbc
input reads the contents of a table
at a given JDBC url
. A username
and password
can also be provided. This input uses Spark’s DataFrameReader#jdbc
functionality.
The kudu
input reads a table (specified by table.name
) from Kudu. The Kudu masters are specified with connection
.
The kafka
input reads a Kafka topic as a stream. Envelope will use the given translator
to turn each micro-batch into a DataFrame. The Kafka brokers are specified with brokers
and the topic with topic
.
The group ID (group.id
) is the unique identifier of the input across all executions of the pipeline. It is used to identify multiple runs of the same pipeline even when other pipelines might be reading from the same topic. If no group ID is provided a random UUID will be generated for each pipeline execution, and so the progress across runs will not be maintained.
The encoding
determines how Envelope should read the messages from the topic. Use string
to read the messages as strings and bytearray
to read the messages as raw binary. The encoding must match the given translator.
To enable Spark Streaming’s windowing support, which allows each micro-batch to contain a window of its previous micro-batches, set window.enable
to true
and set window.milliseconds
to the duration of the window.
The Kafka input supports offset management that will store the latest processed offset of each partition of each topic of each group ID, and when the pipeline starts these offsets will be retrieved to start the stream from where it last successfully processed, even when data has arrived in the topic between runs.
To enable the offset management set offset.manage
to true
, and then provide an output to store the offsets in with offset.output
.
The output must be support random upsert mutations (i.e. implement RandomOutput
and support the UPSERT mutation type) and must contain the four fields group_id
(string), topic
(string), partition
(int) and offset
(long). If the output requires the key fields to be specified (e.g. HBase, ZooKeeper) then provide the fields group_id
, topic
, partition
.
input { type = kafka brokers = "broker1:9092,..." topic = topicname group.id = applicationname encoding = string translator { type = delimited delimiter = "," field.names = [name,score,time] field.types = [string,int,long] } offsets { manage = true output { type = kudu connection = "master1:7051,..." table.name = "impala::default.offsets" } } }
In cases that Envelope does not provide an input for a required data source a custom input can be developed and referenced in the Envelope pipeline.
To create a batch input implement the BatchInput
interface, or to create a stream input implement the StreamInput
interface. With the implemented class compiled into its own jar file the input can be referenced in the pipeline by using the fully qualified class name (or alias—see below) as the input type
, and it can be provided to the Envelope application using the --jars
argument when calling spark2-submit
.
To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias
interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.input.Input
file on the class path - the usual method is to package the file with your JAR.