Skip to content

PDP 43 (Large Events)

co-jo edited this page Jul 9, 2021 · 3 revisions

Status: POSTPONED.

See PDP-43A for an alternative.

Discussion and comments in Issue 5056.

Table of Contents:

  1. Motivation
  2. Requirements
  3. Other Considered Approaches
  4. Design
  5. Client
  6. Segment Store Changes
  7. Client and Wire Protocol Changes

Motivation

The current maximum Stream Event size is 8MB. This is adequate for the vast majority of Streaming use cases that Pravega can encounter, however there are a few that require more. Video streaming devices, for example, may require more than 8MB per atomic write (the reasons are many: an uncompressed 4K frame has just under 8.3M pixels, each of which taking more than 4 bytes each; a video compressor may end up writing multiple frames at once, with the total write size easily exceeding 100MB). There are other use cases out there, but this one is most relatable to the world of Event Streaming.

Requirements

After PDP 43 is implemented, Pravega should:

  1. Increase the maximum Event size from 8MB to 1024MB.
    • Future increases may be allowed in subsequent releases, but 1024MB is a good starting point.
    • It is OK to add new APIs for such Events, as long as backwards compatibility is preserved for existing code (and Events less than 8MB).
  2. Large Events should match the ingress/egress performance of writing/reading the same amount of data using the classic (8MB max) API.
    • Example. Appending a 1GB Event should have comparable throughput to appending 128 8MB Events in short sequence from the same Client Application/Writer. Similarly on the read side.
  3. There should not be any added memory pressure on the client. The Pravega Client should not buffer the whole event in the host application memory nor force the calling code to do so.
    • This is currently not possible with the current Client API so it is acceptable to provide a slightly modified API to support large events. Current API must be backwards compatible with no breaking contract changes.

Out of scope:

  • Large appends will not be supported cross-scaling boundaries. If an Append is initiated on Segment A, and Segment A is then sealed (as a result of a split/merge), then the Append should be aborted and retried by the calling code (the user).
    • Cross-scaling support may be added in a future Pravega release.
  • Large appends will not be supported on Transactions. This may be done in a future release.

Other Considered Approaches

Brute force

This approach will take the entire Large Event from the user code and send it (as one Append) to the Server. Seemingly simple, this approach is actually infeasible due to a number of factors:

  • We cannot buffer the whole Large Append on the Client Side (breaks Requirement 3)
  • While we may be able to buffer one such Large Append on the Server Side, this approach will not scale - a few concurrent Large Appends will quickly overrun the memory capacity of the Segment Store.
    • The Segment Store buffers an Append until all of its components are received (the Client may choose to split that append into smaller segments)
  • Transmission problems. The maximum frame size we can send over the Wire Protocol is 16MB. We are therefore forced to split the append into smaller segments and send individually. Any disconnect will force us to restart the whole process.

Using existing Stream Transactions

This approach would work in the following way:

  1. When a Large Event is about to be appended, a Transaction is opened (using the Controller).
  2. We split the Event in smaller chunks (8MB max each) and append them to the Transaction using the same Routing Key as the Large Append.
  3. When we finished, we commit the Transactions.

Pros:

  • Uses existing infrastructure.
  • Low cost of development.
  • Is not affected by scaling events.
  • Transactions are atomic and their contents (per Routing Key) are added as a contiguous block to the target Segment (so the Large Event could be reconstructed).

Cons:

  • Each individual append is an Event in itself, with Envelope Header included. Readers will need special handling to differentiate this from regular Events and compose the Large Event.
  • The application code itself would be responsible with breaking up the Large Event into smaller chunks and then recomposing it.
  • Stream Transactions incur noticeable costs in terms of resources and latency (due to their distributed nature). They are not appropriate for the high frequency required for Large Events and would significantly drag down system performance if used.

Design

We propose using a new concept called Micro Transactions.

This idea is inspired by the regular Stream Transaction workflow, however we can make certain improvements to eliminate the major pain points. We build upon the fact that a Large Append will go to a single Segment (and we know that Segment since we know the Routing Key). As such, we do not need to create an entire Stream Transaction using the Controller - the transaction can be localized at the Stream Segment level. In addition, by making certain tweaks to the internals of the Client, we can ensure that, even if we split the Large Event into smaller chunks, those chunks will not appear as individual Events within the Stream.

This approach would work as follows.

Segment Store-managed Transactions (aka, Micro Transactions)

  • These are per-segment transactions. The Segment Store creates and manages them at the request of the Client. Each Micro Transaction is implemented by a Transient Segment.
  • Workflow
    1. Create Transient Segment for Segment S (request received via Wire Protocol).
    2. Segment Store generates name TS as a function of S (i.e., by appending some UUID to the name). It registers TS in the in-memory metadata (not in the Container Segment Metadata).
      • In-memory is sufficient (since this implies a record of it is in Tier 1). Since we do not expect this segment to be around for long, there is no need to store it in the Container Segment Metadata (avoid the overhead).
    3. TS behaves like any other Segment. We can write to it, read from it, have attributes, and be merged into others.
      • TS will have a maximum of 4 Extended Attributes (down from Unlimited for regular Segments). Since a single Writer Id is expected to write to it, this should be sufficient).
      • TS will be tiered down to LTS similarly to any other Segment.
      • TS has a short lifespan. Any Transient Segment has an Idle Expiration Time. If no append has been made to a Transient Segment within this system-wide elapsed time, the Transient Segment TS will be automatically deleted by the Segment Store. This can be piggybacked onto the Metadata Cleanup job that runs periodically. The idle time tracking will be reset upon a Segment Container failover - to give writers enough time to reconnect and resume writing.
    4. When the Writer is done writing data to TS, it can do a (new) Merge Segment with Header (MSH) operation on TS->S.
      • A MSH is just like a Merge Segment operation, but it allows the calling code to atomically insert a header immediately before the start of the merged Segment (we'll see how this is used later on).
      • Ex. Segment S has contents 01234, and TS has contents 789. We do a MSH(TS, S, 56) and after it is complete, S will be 0123456789.
    5. TS can be deleted at the request of the Writer (if the Large Append is aborted) or automatically if it becomes idle.

Writer Side

We need a new API to support this. The current Serializer-based API requires us to have all the Event's data at-hand, which violates Requirement 3. Let's name this as EventStreamWriter.beginAppend(routingKey); this will return a SegmentedEventStreamWriter (SESW) object. The SESW is a ByteStreamWriter that has explicit abort and commit methods. The SESW has the added benefit of not adding Event Envelopes (so whatever the application writes, it writes as-is). This is a behavior inherited from the ByteStreamWriter.

  1. EventStreamWriter receives a Large Append to process for segment S.
    • A Transient Segment TS is created for Segment S and a SESW is returned that writes solely to TS.
  2. The application code uses the SESW to upload the Large Append to TS. The SESW exposes an OutputStream API which should make this a familiar programming interface for anyone used to dealing with writing large amounts of data.
  3. If the application decides to abort the Large Append, it can do one of the following:
    • SESW.abort()
    • Do nothing (Transient Segment will be auto-deleted once idle).
  4. When the application is done with the Large Append it can invoke SESW.commit (TBD: Can we do this with just flush/close?). This will cause the following to happen:
    • The accumulated length of the Large Append is L (the SESW can keep track of that)
    • A MergeSegmentWithHeader request is made to the Segment Store, instructing to merge the Transient Segment that we have just uploaded to with the Header being the regular Event Header corresponding to L (i.e., add the Event Envelope).
  5. The Writer will need to handle disconnects/reconnects. It should use offset-conditional appends or attribute-conditional appends on the Transient Segment to keep track of what has been written so far and what not. It should buffer all unacknowledged data for seamless retransmission.
    • The ByteStreamWriter may already do that, but for clarity, this needs to be done here.
  6. The Writer will abort the Large Append if the target segment is Sealed (for whatever reason). This situation is out of scope for this proposal.
  7. TBD. How to handle ordering guarantees for Writers (i.e., we have a number of other regular appends on the same EventStreamWriter while we perform this Large Append - how do we guarantee order?)

Reader Side

We need a modification on the Reader side as well. The EventStreamReader has a method called EventRead<T> readNextEvent(Timeout) which produces the next Event to be consumed by this reader. We want to keep this consumption pattern, however the EventRead currently encapsulates the whole Event data (it requires it to be stored in memory and run through the Serializer to create a Java object for the application). This is not ideal as it would break Requirement 3.

We suggest the following reading workflow (modification are highlighted):

  1. EventStreamReader.readNextEvent() works as before: it returns an EventRead object.
  2. While fetching the Event, the Reader will look at the Event header and inspect the Length of the Event. If the Length is less than or equal to 8MB, this is a regular Event and nothing will work differently. Otherwise this is a Large Event and we'll need to go through a modified workflow.
  3. If a Small Event, then EventRead will work exactly as today. No changes.
  4. If a Large Event:
  • The Reader may pre-buffer the first 8MB (since it may have already read them as part of reading the header).
  • EventRead.getEvent will throw an EventTooLargeException when invoked. This exception will instruct to use the API below.
  • EventRead.beginGetEvent will return a ByteStreamReader object that is targeting the Event's Segment and is bounded to the length of the Event (it may not read beyond the Event's boundary).
    • This call may also be used to read regular events.
    • Care must be taken such that getEvent() may prefetch some Event but beginGetEvent must return that whole data again. Also, once an Event has been fully served (either via getEvent or beginGetEvent), that Event should no longer be available via either API.
    • getEvent should not allow advancing the reader while beginGetEvent has an open ByteStreamReader (i.e., no concurrent reads from the same reader).

Event Serialization

Using the approach above will deviate from our pattern of accepting a Java Object and using a Serializer to convert that to a ByteBuffer (on the writer side) and then using the same Serializer to convert it back into a Java Object on the reader side. Attempting to use the Serializer in its current form would cause us to violate Requirement 3 (buffering data at Writer/Reader side), which is why we suggested integrating the ByteStreamWriter/ByteStreamReader into the EventStreamWriter and EventStreamReader.

Given the sheer size of the Events we are dealing with, it is hard to imagine how a plain Java Object could be serialized to such a large value. However, there might be cases where each such Large Event may have some tiny metadata (relatively speaking) that is associated with it. In this case, we may recommend encoding this data along with the rest of the object using the ByteStreamWriter. Since this essentially extends OutputStream, the user can easily wrap that in a DataOutputStream which will allow encoding typed data.

Client

From an API perspective, on the Client we will be creating a separate API for writing large events and reuse existing APIs (with certain changes) for reading. The Client API will be fully backwards compatible, with no changes required to any applications that are currently written against the API.

On the Writer side:

  • EventStreamWriter. No changes; this will work exactly as it does now. It will continue to require a Serializer and accept up to 8MB Events.
  • LargeEventStreamWriter. A sibling to EventStreamWriter, this can be used to write Events of any size without the need for a Serializer.
    • EventWriter streamEvent(String routingKey) - returns an EventWriter object that can be used to "stream" the contents of an Event in.
    • Multiple EventWriter objects can be opened concurrently.
  • class EventWriter extends OutputStream
    • void commit() - commits Transient Segment and closes the instance.
    • void close() - if commit() has not been invoked, deletes Transient Segment and closes the instance.
    • void flush() - flushes any buffered data onto the wire to the Transient Segment.
    • void write(ByteBuffer) - similar to write(byte[], int, int) but accepts a ByteBuffer
    • Uses ByteStreamWriter underneath. We don't want inheriting from this one since it exposes additional methods that we do not want the application to use.
    • Optimization: Only create the Transient Segment on the first invocation to flush(). If the total amount of accumulated data is less than 8MB (buffered on the client), invoking commit will convert this to a regular Append on the target segment (and thus preclude the need for a Transient Segment in the first place).

On the Reader side

  • Define EventTooLargeException extends RuntimeException
    • We will allow the Serializer.deserialize to throw this if it has been programmed to accept a maximum event size. We will not throw it directly from the Client code (see below).
  • EventRead<> (returned as part of EventStreamReader.readNextEvent).
    • T getEvent() will work as before and attempt to fetch the whole Event and deserialize it. Since this does not pose any OOM danger to the Segment Store, we will allow Events of any size to be deserialized with this method. If the said Event is too large to fit in the Client Application's memory, an OOM/HeapOOM will be thrown by the runtime. In addition, the Serializer can be configured with a maximum deserialization limit, in which case it may choose to throw an EventTooLargeException if the Event data was indeed buffered in memory but it cannot be deserialized.
    • It is worth noting that EventRead.getEventPointer().getLength() can be used to get the length of an Event prior to invoking getEvent.
    • EventReader streamEvent() - will return an EventReader instance that can be used to read the raw Event data.
    • Note: some changes may be required in the EventStreamWriterImpl in case it pre-fetches the whole event when returning an EventRead. We may choose to buffer it out of the SegmentStore in 8MB increments.
  • EventStreamReader.fetchEvent(EventPointer p)
    • This method is not currently supported for large events and we are not planning to add equivalent support for them. See EventRead.getEvent above.
  • class EventReader extends InputStream
    • Uses ByteStreamReader underneath that begins at EventPointer.getSegmentOffset and is bounded to at most EventPointer.getLength() bytes. We don't want inheriting from this one since it exposes additional methods that we do not want the application to use.

Segment Store Changes

Support for Transient Segments:

  • CompletableFuture<String> SegmentStore.createTransientSegment(String parentSegmentName)
    • Same as CreateSegment but fewer arguments allowed (no initial attributes and Segment-Store assigned LTS rolling policy)
    • Returns the name of the newly created Transient Segment.
    • This can be used for the following APIs (inherited from regular segments): append, read, mergeSegment.
    • The following APIs will be disallowed: seal, truncate, updateAttributes.
  • CompletableFuture<Void> SegmentStore.mergeSegment(String sourceSegmentName, String parentSegmentName, BufferView header)
    • Performs a Merge-With-Header. See above in Design about details.
  • Internal Segment Metadata
    • Add a boolean field isTransient
    • Keep track of last used time (wall clock). This is reset with every append or when a container failover happens.
  • Metadata Cleaner
    • Delete Transient Segments that have been idle.
  • Segment Store Configuration
    • Define idle timeout.
  • MergeWithHeaderOperation
    • New operation that allows a Merge Segment with Header
    • This will be a hybrid of the StreamSegmentAppendOperation and MergeSegmentOperation but we need a new one to ensure atomicity.
    • Internally it may be split in 2: a CachedStreamSegmentAppendOperation (for the header) and a MergeSegmentOperation (for the merge) - as long as the two are atomically accepted into the system.

Wire Protocol Changes

(TBD on final wire command naming)

  • CreateSegmentTransaction/CreateTransientSegment
    • Requests the creation of a Transient Segment (wired up into SegmentStore.createTransientSegment)
  • MergeSegmentTransaction
    • Requests the merger of a Transient Segment with a specified header (wired up into SegmentStore.mergeSegment(Source, Target, Header)).
Clone this wiki locally