Skip to content

PDP 30 (ByteStream API)

co-jo edited this page Oct 26, 2020 · 1 revision

Motivation

Today we build all of our APIs independently. Ideally we would be able to take a layered approach so that we don't have to construct every possible way to use Pravega's constructs but rather expose them at a lower level so they are easier to build infrastructure on.

Goal

The Goal is to provide a way to stream bytes in and out of a segment without any framing or demarcation of events.

The byte oriented API provides a way to write data to Pravega without writing events. It presents an interface of a InputStream and an OutputStream. Data written in this way is not framed or interpreted by Pravega. So there are no length headers or event boundries. As such byte offsets into the stream are meaningful and directly exposed.

This API is very low level, and does not provide a lot of value on it's own, as one can always write bytes to a normal Pravega stream or directly to the Tier-2 store without involving Pravega at all. It's value comes in being combined with other Pravega constructs, because it can act a building block for more sophisticated components.

non-goals

This is not attempting to tackle stream algebra or other forms of data manipulation. It does not propose any particular way to deal with more than a single segment. As such scaling and transactions are out of scope.

Method

The implementation would proceed in three steps.

  1. The Serialization of Event inside of an Append object would be moved up into the constructor of append when it is invoked by the client when passing a PendingEvent.

This enables the wire protocol to send appends that don't have any wrapping headers without changing the protocol itself.

  1. Making changes to the internal implementation need to add support for the new APIs.

  2. Add the new APIs and their impls and tests.

API changes

Two new APIs would be added on the ClientFactory. For writing data

    @Beta
    ByteStreamWriter createByteStreamWriter(String streamName, long segmentId);

For reading data

    @Beta
    ByteStreamReader createByteStreamReaders(String streamName, long segmentId);

Each of these is presumed to work with an existing segment, and returns new classes listed below.

It is an open question if we want to pass long for the segmentId or if we should use some wrapper like an explicit Segment object.

Wire protocol changes

None.

Compatibility

New APIs would be marked Beta. No changes should be needed to existing APIs.

Client changes

The client would add a new reader and writer Interface. As well as change where the serialization of Event occurs relative to Append. Currently an Append is constructed from a PendingEvent and passed to CommandEncoder which extracts the data from the Append and wraps it in an Event to write that earlier. The change would be to move this logic higher up in the call stack so that it happens above the SegmentOutputStream level. One way to do this would be to do this at the time the PendingEvent is constructed by placing an Event inside of it rather than a raw byte buffer and have two different constructors, one which directly takes the byteBuffer. It is this later API that would be used by the new class ByteStreamWriterImpl.

Changes on the writer side

A new abstract class would be created ByteStreamWriter and it would have a corresponding implementation class. (Unfortunately it needs to be an abstract class and not an interface because OutputStream is a class)

/**
 * Allows for writing raw bytes directly to a segment. This is intended as low level building block
 * for creating higher level components and is not appropriate for most applications to use
 * directly.
 * 
 * This class does not frame, attach headers, or otherwise modify the bytes written to it in any
 * way. So unlike {@link EventStreamWriter} or {@link RevisionedStreamClient} the data written
 * cannot be split apart when read. As such, any bytes written by this API can ONLY be read using
 * {@link ByteStreamReader}. Similarly, unless some sort of framing is added it is probably an error
 * to have multiple ByteStreamWriters write to the same segment as this will result in interleaved
 * data.
 * 
 * The methods on this class are non-blocking unless otherwise specified. As such data passed to a
 * {@link #write(byte[])} call cannot be assumed to be persisted until a flush has been called.
 * 
 * It is safe to invoke methods on this class from multiple threads but doing so will not result in
 * an increase in performance.
 */
public abstract class ByteStreamWriter extends OutputStream {  
    
    /**
     * Similar to {@link #write(byte[], int, int)}
     * 
     * Writes the provided data to the segment. The data is buffered internally to avoid blocking.
     * As such it cannot be assumed to be durably stored until a flush completes.
     * 
     * It is intended that this method not block, but it may in the event that the server becomes
     * disconnected for sufficiently long or is sufficiently slow that that backlog of data to be
     * written becomes a memory issue. If this behavior is undesirable the method
     * {@link #setThrowBeforeBlocking(boolean)} can be used to make this call throw an exception
     * instead of blocking.
     */
    public abstract void write(ByteBuffer src) throws IOException;
    
    /**
     * @see java.io.OutputStream#write(byte[], int, int)
     * 
     * Writes the provided data to the segment. The data is buffered internally to avoid blocking.
     * As such it cannot be assumed to be durably stored until a flush completes.
     * 
     * It is intended that this method not block, but it may in the event that the server becomes
     * disconnected for sufficiently long or is sufficiently slow that that backlog of data to be
     * written becomes a memory issue. If this behavior is undesirable the method
     * {@link #setThrowBeforeBlocking(boolean)} can be used to make this call throw an exception
     * instead of blocking.
     */
    @Override
    public abstract void write(byte b[], int off, int len) throws IOException;
    
    /**
     * @param shouldThrow True if {@link #write(byte[])} should throw an exception rather than block
     *            in the event that data is backing up.
     */
    abstract void setThrowBeforeBlocking(boolean shouldThrow);

    /**
     * Flushes the buffer and closes the writer.
     * If there is data to flush, this is a blocking method.
     * 
     * @see java.io.OutputStream#close()
     */
    @Override
    public abstract void close();
    
    /**
     * Blocks until all data written has been durably persisted.
     * @see java.io.OutputStream#flush()
     */
    @Override
    public abstract void flush() throws IOException;
    
    /**
     * Closes the writer similar to {@link #close()} but also seals it so that no future writes can
     * ever be made.
     */
    public abstract void closeAndSeal();

    /**
     * Similar to {@link #flush()} but does not block the calling thread, and instead returns a
     * future that will be completed when the flush is done.
     */
    public abstract CompletableFuture<Void> flushAsync();
    
    /**
     * This makes a synchronous RPC call to the server to obtain the total number of bytes written
     * to the segment in its history. This is the sum total of the bytes written in all calls to
     * {@link #write(byte[])} that have been flushed.
     */
    public abstract long fetchPersistedOffset();
    
}

Most of these methods extend the Java's existing OutputStream class. The reason for this choice is that it provides compatibility with a LOT of existing functions. What's more it is a simple interface and does not preclude us from writing asynchronously as writes are not guaranteed until flush() or close() return successfully.

Changes on the reader side

A new abstract class ByteStreamReader would be added an a corresponding implementation provided. (Unfortunately it needs to be an abstract class and not an interface because InputStream is a class)

/**
 * Allows for reading raw bytes from a segment. This class is designed such that it can be used with
 * or without blocking. To avoid blocking use the {@link #onDataAvailable()} method to make sure to
 * only call {@link #read(byte[])} when there is data {@link #available()}.
 *
 * It is safe to invoke methods on this class from multiple threads, but doing so will not increase
 * performance.
 */
public abstract class ByteStreamReader extends InputStream implements AsynchronousChannel, AutoCloseable {

    /**
     * Returns the current byte offset in the segment.
     * This call does not block.
     */
    abstract long getOffset();
    
    /**
     * Jumps to the provided offset. Future read calls will read from this position.
     * This makes a synchronous RPC to the server to validate the offset provided. 
     * 
     * @param offset The offset to jump to.
     * @throws InvalidOffsetException If the offset provided does not exist in the segment.
     */
    abstract void jumpToOffset(long offset) throws InvalidOffsetException;
    
    /**
     * Returns the number of bytes that can be read without blocking.
     * @see java.io.InputStream#available()
     */
    @Override
    public abstract int available();
    
    /**
     * This make a RPC to the server to fetch the highest offset in the segment.
     */
    public abstract long fetchTailOffset();
    
    /**
     * Don't call this. It is very wasteful.
     */
    @Override
    public abstract int read() throws IOException;
    
    /**
     * will only block if {@link #available()} is 0.
     */
    @Override
    public abstract int read(byte b[]) throws IOException;
    
    /**
     * If {@link #available()} is non-zero will read bytes out of a in-memory buffer into the
     * provided array.
     * If {@link #available()} is zero will wait for additional data to arrive and
     * then fill the provided array. This method will only block if {@link #available()} is 0.
     * 
     * @return The number of bytes copied into the provided buffer. Or -1 if the stream is sealed
     *         and there are no more bytes to read.
     * @see java.io.InputStream#read(byte[], int, int)
     */
    @Override
    public abstract int read(byte b[], int off, int len) throws IOException;
    
    /**
     * This method skips forward by the provided number of bytes. This method is non-blocking but
     * may not be able to skip n bytes.
     * 
     * @see java.io.InputStream#skip(long) in such a case it will return the number of bytes it
     *      skipped.
     * It may be preferable to call {@link #jumpToOffset(long)} for large jumps are that does
     * not have this property.
     */
    @Override
    public abstract long skip(long n);
    
    /**
     * @see java.io.InputStream#close()
     */
    @Override
    public abstract void close();
    
    /**
     * @see java.io.InputStream#mark(int)
     */
    @Override
    public abstract void mark(int readlimit);
    
    /**
     * @see java.io.InputStream#reset()
     */
    @Override
    public abstract void reset() throws IOException;
    
    /**
     * Returns true.
     * @see java.io.InputStream#markSupported()
     */
    @Override
    public abstract boolean markSupported();
    
    /**
     * Returns a future that will be completed when there is data available to be read. The Integer
     * in the result will be the number of bytes {@link #available()} or -1 if the reader has
     * reached the end of a sealed segment.
     */
    public abstract CompletableFuture<Integer> onDataAvailable();
    
}

Similar to ByteStreamWriter this is chosen to implement InputStream as a LOT of code uses and expects this interface. However unlike OutputStream this interface cannot be implemented in a purely non-blocking way. So a method onDataAvailable() is added. This method works in conjunction with the existing method available() which tells applications how many bytes can be read without blocking. The problem with just using available() is that if the value returned is zero applications don't have a way to be notified when the value becomes non-zero. onDataAvailable() is intended to solve this, by allowing the application to pass in a future that will be completed when data becomes available for read. (Or immediately if there already is some.)

Besides this one method the only thing not directly derived from the InputStream interface are getOffset() and jumpToOffset() which simply provide a way to move in the stream. It is actually an open question if these are needed as the interface already provides skip(), mark(), and reset() which were intended for this purpose. However they aren't very well designed as skip() does not allow for backwards traversal, so if the application did not mark() a position or marked one after the point they want to go back to there is no way to get back to that position without recreating the object. It is also commonly the case that mark() is not supported hence the markSupported() method. So many developers as simply not accustomed to using it.

Server side changes

None.

Transactions

Explicitly out of scope.

Open questions

  • Do we need jumpToOffset() on the reader. See above.
  • Should we take any special measures to prevent users from writing data to a stream that is being used by a normal reader? If they attempt to do so and are not writing event headers (a length) then the receiving reader will crash and not be able to recover.
  • Should we move the API to construct these objects off of the ClientFactory like we did for batch reads and writes until the API is stabilized.
  • Should users be referring to the segment by it's long when creating the reader / writer or do we want to use a Segment class.
Clone this wiki locally