-
Notifications
You must be signed in to change notification settings - Fork 3
Angled Dream: Sossity SDK
Angled-Dream is the Sossity SDK. It serves several primary purposes:
- Serve as an abstraction on top of Cloud Dataflow stream processing
- Manage PubSub inputs, outputs, and error handling.
- Provide a way to load Pipelines (a Cloud Dataflow task connected to PubSubs) at run-time.
- Provide a universal interface for Pipelines, so they can be run outside CDF (e.g., for the Simulator)
Taxonomy note: a Pipeline is a series of transformations in a single Cloud Dataflow job. A Flow is a series of Pipelines connected by PubSub queues.
Note: all this information applies for both Java and Python -- because the Python example is still run inside the JVM using Jython.
A Java sample project is here: https://github.com/22Acacia/sossity-pipeline-java-sample
A Python sample project is here: https://github.com/22Acacia/sossity-pipeline-python-sample
Angled-Dream is a dependency for all pipelines run on Sossity.
Add the repository to pom.xml:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
And the dependency:
<dependency>
<groupId>com.github.22Acacia</groupId>
<artifactId>angled-dream</artifactId>
<version>-SNAPSHOT</version>
</dependency>
Note: there is a bug in IntelliJ that shows this dependency as broken in pom.xml
, with a red squiggly line. It is not. To fix, do a mvn package
from the command line.
AD uses heavy command line configuration in Main.java
(see README). All arguments necessary for inputs and outputs of data, as well as machine configuration and more, are handled by the CLI. These arguments are implemented as interfaces, automatically parsed by Google's internal dependency injection handler. An example is PipelineComposerOptions.java
.
After parsing options, Main.java
then constructs a Pipeline
, configures its input, applies the data transformations, and configures the one-or-more outputs. You can think of a Pipeline as performing f(x) -> y
.
A Pipeline describes a workflow -- it does not execute the workflow. This makes local JVM-attached debugging difficult for complex workflows and pipelines, because you need to then execute the pipeline locally. This is partially why Angled-Dream encourages a simpler way of constructing pipelines, and why we built the Sossity simulator. For more information on low-level testing of pipelines, see https://cloud.google.com/dataflow/pipelines/testing-your-pipeline.
The important line of code is PCollectionTuple t = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())).apply(new MultiTransform());
A PCollectionTuple is a set of data, separated by Tags
.
Cloud Dataflow uses Tags
as a way to categorize output. See https://cloud.google.com/dataflow/model/multiple-pcollections. Most output is consumed using the mainOutput
tag, and then written to the all the outputTopics
. Errors are consumed with the errorOutput
tag, and written to the error queue. See Errors section for more information.
If the output is a BigQuery table instead of another PubSub, Angled-Dream processes the options (like the table schema) to make a Cloud Dataflow task writing to BigQuery. Even though a BigQuery output is a sink
in Sossity, internally it is a Cloud Dataflow task.
Finally, the pipeline is pushed off to Google for processing with pipeline.run()
.
Angled-Dream simplifies Cloud Dataflow development by using only two primary classes for pipeline transforms: AbstractTransform
and AbstractTransformComposer
.
See reference code: Sossity Pipeline Sample
AbstractTransform
extends the CDF class DoFn<String,String>
, which describes a single transformation on a String, which then outputs a String. Inside processElement
, the transform
is applied and
It is simple to implement a transformation and build a pipeline:
- Include Angled-Dream in your maven
pom.xml
. - Create a class which extends
AbstractTransform
. Annotate with@AutoService(AbstractTransform.class)
-- this is extremely important, because Sossity works by searching jar files forAbstractTransform
services. - Implement required method
transform
. This takes input and produces output for the next stage of the pipeline. - Create a class which extends
AbstractTransformComposer
. If you have multiple transformation for a single pipeline (from external libraries or internal code), this is where you determine the order in whichAbstractTransform
s are applied. - Annotate with
@AutoService(AbstractTransformComposer.class)
-- this is so Sossity and Angled-Dream can find it at run-time. - Implement
getOrderedTransforms
and return aList<AbstractTransform>
, which is an in-order list of the transformations to be applied. - Make sure the above classes are in a unique package (like
com.22acacia.clickstreamtransform
). - Edit
pom.xml
to have the desiredartifactID
,groupID
, andversion
, along with any other maven dependencies or plugins. - Angled-Dream catches any thrown exceptions and outputs them to the
errorPipeline
.
Angled-Dream expects Strings (usually JSON) as inputs and outputs. This is because many tools and APIs could subscribe to a pipeline, and they need a standardized form of communication. A possible future improvement is a schema registry. Data is produced and consumed to PubSub in Base64
encoding.
From the CLI, AD expects a single input PubSub, A pipeline can have multiple inputs (file or streaming) and can handle multiple output PubSubs. This keeps routing logic simple (at the cost of duplicate data -- this overhead is relatively low). It also enables any other pipelines to subscribe to an existing Pipeline, and trust that it is getting the full data stream. Each Pipeline has a
A Pipeline can also output to BigQuery.
If any Exception (user-generated or internal) is thrown, AD catches it and outputs to the errorPipeline
Tag.
The error JSON format is: {resource_hash, resource:{original data}, id, timestamp, error, errortimestamp}
Angled-Dream treats errors as first-class data: they are output to queues like any other data in the system, and can be processed by pipelines themselves. Internally, the error pipeline name is derived from the pipeline name, such as sink3bts-to-sink3bts-error
.
Unconsumed errors remain in a Pipeline for 7 days, as per the PubSub specifications. You can use Sossity to create a pipeline for this (see Sossity documentation).
Sossity automatically creates a file Sink
for all error queues, so you can examine the errors at will, or use a batch job to re-process the data.
To create a package for local simulation/testing, execute mvn package
. This uses the standard Maven components for compiling and bundling the pipeline. It creates a file in targets/
with the name artifactid-version-standalone.jar
-- you will always want to use the standalone
since that has all the dependencies inside the .jar file. Then use this jar file name in the Sossity config.
All deployment is handled by Sossity. Reference its README
.
AD and Cloud Dataflow finds all dependencies using Java Classpaths -- this is how transformations are handled at run-time. All dependencies are bundled in .jar files and provided on the command line (internally, using java -cp
). See the AD README
for more information.
AD can also run in standalone mode, for error repair or testing. You'll need to run it from the CLI, and then cancel the job using the Google Console or gcloud
CLI tool. For more information, see the Angled-Dream README
.