-
Notifications
You must be signed in to change notification settings - Fork 87
Implementing a custom dataflow stage
Naiad programs are described as dataflow graphs, where the nodes in the graph (Naiad calls them stages) correspond to computation, and the edges describe the flow of data. A large collection of pre-fabricated stages exist, and while most programmers will likely use these predominantly, it can also be useful to be able to understand how stages are defined in case the programmer needs to introduce custom computations.
There are two steps to implementing a custom dataflow stage: one needs to define a dataflow vertex containing the associated state and logic, and one needs to tell Naiad how to connect inputs and outputs to this logic so that input records trigger the correct computation. We will go through each of these parts, demonstrating not only how to perform these steps from the ground up, but also how Naiad has wrapped up some common patterns to try and make life easier for the programmer.
Custom vertices are defined by inheriting from the Vertex<T>
class, which is a class wrapping the common logic for all vertices that may be brought in to existence in a Naiad stage. The generic parameter T
denote the type of Naiad timestamp the vertex uses, which will often be a generic parameter for your custom vertex, too. We'll get to the specific features of the Vertex class later. An example vertex with one input and one output might look like the following:
// Singe input, single output Naiad vertex. Each vertex must specify a time type.
public class ExampleVertex<TRecord, TTime> : Vertex<TTime>
where TTime : Time<TTime>
{
// each received message is simply forwarded to any listeners.
public void OnReceive(Message<TRecord, TTime> message)
{
foreach (var listener in this.Listeners)
recipient.Send(message);
}
// list of output recipients, each of which require all sent records.
public readonly List<SendWire<TRecord, TTime>> Listeners;
// each vertex requires its index in the stage, and the stage itself.
public ExampleVertex(int index, Stage<TTime> stage)
: base(index, stage)
{
this.Listeners = new List<SendWire<TRecord, TTime>>();
}
}
This vertex isn't especially interesting yet, and it isn't even clear how this list of recipients is populated. However, assuming that we sort that out, the vertex's behavior is specified: for each received message, it sends that message to every one in its list of recipients.
To complete this example, we have to assemble a stage consisting of many of these vertices, and describe how the inputs and outputs are connected. The standard pattern is to first create a new stage from a vertex factory, then add any number of inputs (defined by a message callback and partitioning requirement), and finally add any number of outputs (defined by a registration callback and partitioning guarantee). The example looks like:
// constructs a stage of our ExampleVertex, and returns the associated output stream.
public static Stream<TRecord, TTime> MakeStage(Stream<TRecord, TTime> stream,
Expression<Func<TRecord, int>> inputPartitionBy,
Expression<Func<TRecord, int>> outputPartitionBy,
string name)
{
// first we define the innards of the stage, supplying a context, a vertex factory, and a friendly name.
var stage = Foundry.NewStage(stream.Context, (index, parent) => new ExampleVertex<TRecord, TTime>(index, parent), name);
// each new input requires a source of data, a (message, vertex) callback, and a partitioning requirement.
var input = stage.NewInput(stream, (message, vertex) => vertex.OnReceive(message), inputPartitionBy);
// each new output requires a (listener, vertex) callback, and a partitioning guarantee.
var output = stage.NewOutput((listener, vertex) => vertex.Listeners.Add(listener), outputPartitionBy);
return output;
}
In more detail, the Foundry.NewStage
method is what Naiad uses to assemble a new stage, taking a Context
from which the stage understands its place in the dataflow graph, a factory capable of producing vertices for each vertex index and stage, and a tasteful name to use in describing the vertex. This stage will contain many vertices constructed by the factory, but does not yet have any incoming or outgoing edges. To add a new input, the NewInput
method needs a source of data, a callback for each vertex supplied as an Action
on messages and vertices, and a partitioning requirement. Naiad will make sure that all records passed along the input will be partitioned according to the requirement, in that two records evaluating to the same value will arrive at the same vertex instance. Once all inputs are added we can add outputs using NewOutput
, which require a registration callback, essentially what the vertex should do when another vertex expresses interest in its output, and a partitioning guarantee. Naiad uses the guarantee to pipeline communication when the interested consumer has a requirement matching the guarantee.