Skip to content

PDP 34 (Simplified Tier 2)

co-jo edited this page Jul 9, 2021 · 3 revisions

Status: COMPLETE & MERGED.

Summary

This PDP discusses a change to our Tier 2 implementation that would remove the need for uncommon features and requiring only simplified operations to be supported by any Tier 2 Storage that Pravega is bound to.

Once implemented, this proposal will allow Pravega to be mounted on a wider variety of Tier 2 systems, including, but not limited to: Amazon S3, Google Cloud, Microsoft ADL, etc. It will also eliminate the need for any sort of Tier 2 fencing mechanisms and instead rely on Tier 1 write fencing (which is already built-in).

Shortcomings of the current implementation

  1. Too many verbs. The current implementation of Tier requires a lot of verbs, often with complex semantics. These are: Create File, Delete File, Append-with-offset (conditional, atomic append to a file given its length matches a known value), Read, Concat, Seal and Unseal. None of the current Tier 2 bindings in Pravega support all of these, and for some of them we had to do some gymnastics to make it work.
  2. Fencing is required. The same Tier 2 file may be modified by multiple instances of the same Segment Container (after failovers). This could lead to data being corrupted in case of runaway instances which is why we often need to implement unorthodox mechanism to prevent that. Since some of them require use of some sort of native Tier 2 feature (such as a server-side atomic operation or lock), each adapter would need to implement its own mechanism, if even possible.
  3. RollingStorage amplifies the problem. RollingStorage was added to support head truncation. It rolls over files when they get too big so that earlier ones may be physically deleted if the Segment is truncated. This required the addition of an additional file (the Header file) which is frequently accessed by all Container Instances every time a rollover, open-write/read, create or delete happens. Furthermore, many of the operations implemented by RollingStorage are made up of two or more underlying Tier 2 requests, which cannot be made atomic. This makes RollingStorage unnecessarily complex with a lot of code required to make operations resumable upon a retry.

Key Design Changes

Proposal

Below is the summary of this proposal.

Step 1: Implement Unified Chunk Management Layer.

  • Chunk is a unit of storage. It is a file or object on storage device.
  • This layer provides abstraction at segment level by stitching together chunks stored on underlying storages.
  • Store segment layout information i.e. segment and chunk metadata using Table Segment
  • Define a simple interface for ChunkStorage
  • Enable implementing mechanism for concatenating smaller files/objects into bigger objects.

Step 2: Re-Implement ChunkStorage for existing bindings.

  • NFS
  • HDFS
  • Extended S3

Step 3 (to be implemented in future as a separate PDP) :

Implement new ChunkStorage for other blob cloud storage providers including Amazon S3 compatible storages, GCP and Azure

API Changes

  • There are no user level API changes.
  • There are no changes to how tier-2 is configured.

Internal Changes

Architecture Change

Image

Internal API changes

Internal Functionality changes

  • Storage related metadata
    • All storage related metadata is stored in table segments.
    • Ability to import and export this data in backward compatible RollingStorage layout will be implemented.
  • Consolidation of functionality
    • Functionality in RollingStorage will be implemented by ChunkedSegmentStorage.
    • AsyncStorageWrapper and SyncStorage will no longer be needed.
  • Changes to how segment store handles failover.
    • [See more details here.] (PDP-34-(Simplified-Tier-2)#segment-store-failover )

Discarded Approaches

Patching existing code.

As explained above, the goal is to have immutable Tier 2 Chunks. A simple way to achieve this is to have RollingStorage create a new chunk (rollover) every time openWrite is invoked. However, that will solve only half the problem. In order to map offsets to Chunks, RollingStorage needs to store a Header Chunk, where new Data Chunk offsets and names are continuously appended. This is a very small and fragmented file (appends are less than 100 bytes in length), but the problem is that this Header Chunk is shared across multiple instances of the same Segment Container, so we are not really resolving anything.

Key Concepts

Chunk

A chunk is a basic unit of storage in tier-2. Conceptually it is a contiguous range of bytes. A segment is made up of sequence of non-overlapping chunks.

How chunks are stored:

  • A chunk is always stored contiguously on tier-2 as a single indivisible unit. It is never split into multiple tier-2 objects/files.
  • For current discussion, chunk is simply a separate file/object on tier-2

Structure of a persited chunk:

  • For each chunk, the actual sequence of data bytes stored inside a storage object/file must match byte by byte to sequence of bytes that user provided.
  • The persisted data will not include additional metadata in addition to user supplied bytes of data. This metadata resides in separate object.

Tier-2 storage providers operate only at chunk level

  • We require tier-2 bindings to provide following operations -
    • Create (C)
    • Delete(D)
    • Write (W)
    • Open (O)
    • Read (R)
    • List (L)
    • Stats (S)
  • In addition, optionally the following additional operations might be provided.
    • Merge (M)
    • Truncate(T)
    • Make Writable/Read-only
  • Note on Write operation: the specific binding may optimize its internal implementation by utilizing additional functionality like append. Append is not necessary, and not included in API contract. It is entirely up to tier-2 implementation to optimize using whatever primitives underlying storage provides.
  • Any internal state for calls is stored in an implementation specific ChunkHandle.
    • eg. Any etags returned by previous calls etc.)
    • Write Handles are cached – and reused.

The API contract is described later in the document.

Storage of chunks

Image

  • Each chunk is a unique file.
  • If storage provider does not support append (eg. vanilla S3), then each append/write is written to a separate chunk on tier-2.
  • If storage provider does support conditional appends (or otherwise concurrency safe appends), then multiple appends/writes can be stored in a single chunk on tier-2. Instead of creating new chunk for each write, a data is simply written/appended to an existing chunk.

Names are arbitrary.

  • Except possibly for the system segments, the actual chunk names used while persisting on tier-2 are arbitrary. There is no special meaning attached to the format of the name. The names could be UUIDs or may contain any combination of segment name, segment store epoch, offset etc. However the name of chunk is required to be globally unique.

Single Writer Guarantees.

  • It is guaranteed that two segment store instances will never write to a same underlying file. (By imposing restriction that a segment store instance never appends to file/object created by another instance. Explained in detail later)

Some size estimates/assumptions

It will be useful to have a rough estimate of number of chunks and segments for understanding design space. The intention here is to capture estimate of orders of magnitudes.

  • A chunk could be as small as one byte, but we prefer them to be really large. There is no upper limit on the size of the chunk.
  • The average size of chunk is expected to be low 10s of GB.
  • There can be up to 25K active segments per container with estimated average around 1K segments.
  • There is no upper limit on number of segments in a container.
  • A segment can have up to 100s of chunks. The average number of chunks is expected to be order of magnitude lower with aggressive and efficient merging.

Segment

A segment is made up of sequence of non-overlapping chunks.

  • Conceptually the segment metadata consists of a header describing various properties of a segment plus a linked list of chunk metadata records describing each chunk.

Below is how segment metadata records are organized. Image

Metadata is stored in table segment:

  • The segment metadata and layout information are stored using key-value tuples using pinned table segment (as opposed to single record containing all metadata including chunk layout data of the segment).

There are two ways to store metadata using key value store.

  • Using single key-value pair for all metadata
    • Complete metadata is stored in a serialized value field.
    • For any metadata update the entire value is re-written.
    • Pros: The number of keys is kept to minimum. This will improve the table segment index performance.
    • Con: Cannot support large number of chunks because total number of bytes written is O(N^2) to number of metadata updates.
  • Using multiple-value pairs for metadata (Prefered for V1)
    • There is one KV pair for the header for each segment. In addition to segment properties it contains pointer to first/last chunk metadata records.
    • There is one KV pair for each chunk. In addition to chunk properties it contains pointer to previous/next chunks.
    • NOTE - The pointer here means a string that can be used as a key to retrieve the full record by calling get on KV store.
    • Pros: Only small part of metadata is updated. Storage overhead is reduced.
    • Con: Frequent metadata update. Although these can be small and buffered.

Metadata updates are atomic.

  • All metadata operations are performed using conditional updates and using a single transaction that updates multiple keys at once. This guarantees that concurrent operations will never leave the metadata in inconsistent state.

At any time for a given segment, only one file/object is allowed to be open for writing.

  • Given the append only nature of segments, there can be only one file to which data is appended. When a new file is opened using OpenWrite the old file is considered "non-active".
  • In case of failover, the new segment store instance creates a new file and makes it active. The offset of the old file seen by the new instance at the time of opening a new file is recorded in the metadata.

Metadata updates are infrequent and updated lazily:

  • To avoid frequent updates to metadata, the metadata about all the chunks in the file/object is updated lazily only when required.
  • More specifically the length of actively written file is not updated with each write. It is updated only when the file is no more considered active.
  • The updates are required only in following cases.
    • Creating new file/object, so that name of the file can be recorded for new object and final offset can be updated for previous file.
    • When two segments are concatenated. (eg. Transactions)
    • When segments are truncated.
    • When segment is created, sealed, unsealed, deleted.

Metadata updates are fenced by tier-1:

  • Table segments internally use Tier-1. Therefore, a segment store instance is fenced in a failover scenario and it can no longer make change to segment metadata. This provides strong guarantees against data corruption by zombie segment stores. (For any operations that modify the key-value pairs, the table implementation writes change log to underlying tier-1. If the SS instance is fenced, then this operation will fail, thus preventing now "zombie" instance from making any metadata changes.)

Metadata updates are efficient:

  • By splitting the data in multiple records, we avoid writing same data again and again but update only the records regarding modified (or last) chunk.
  • Problem of writing tiny metadata updates to tier-2 is also solved as table store will aggregate updates to metadata for number of segments into large writes to tier-2.(recall that table store saves data in system segments).

Metadata records are cached using write-through cache:

  • Given the “append only” only semantics of segments, a very few KV records are updated during segment metadata operations except for last chunk.
  • The segment level operations like concat, seal, unseal are relatively infrequent.
  • Therefore, metadata records are excellent candidates for being cached. (eg. Guava cache)

Metadata offers points of extensibility:

  • All metadata persisted should contain a version.
  • In future, Segment metadata may contain some additional fields – Eg. Access Control Lists (ACL)
  • In future, Chunk metadata may contain some additional fields – Eg. CRC/Checksums,

Metadata can be exported to or imported from a tier-2 object:

  • This is useful for backward compatibility, backup and disaster recovery
  • A snapshot of segment metadata (with metadata for all its chunks) can be exported to tier-2 on demand. Current header file format will be supported.
  • A snapshot of segment metadata (with metadata for all its chunks) can be imported from existing tier-2 snapshot on demand.
  • Automatic import of current header file format will be implemented.

Key Operations

Segment Merge and other Layout change operations

Segment Merge and other Layout change operations are segment level operations that change the number and layout of the chunks that make up a segment. Segment layout is specified by a linked list of chunk records. (recall here that records are stored in table store as KV pair. However, they are conceptually linked list nodes and they “point” to other records using string key as a “pointer”)

Concatenation

  • When two segments are merged via concat operation, the data stored on the tier-2 is not changed. Only the layout (sequence) of chunks is changed which is purely metadata operation. This is a simple operation consisting of concatenating two linked list of chunk metadata records by appending one linked list to other. Only last chunk metadata record of the target segment needs to “point” to first chunk metadata record of the source. The properties of target segment is updated in its header record to reflect result of concat and header record for the source is deleted. This is purely metadata operation.

Defrag

  • Multiple small chunks are copied to a big chunk using storage binding’s native capabilities (Eg, multipart upload for S3 or concat for HDFS).
    • The sublist corresponding copied chunks is replaced with single new chunk record corresponding to new large file/object. Metadata operation is like deleting nodes from middle of the list.
    • It is assumed that both S3 and HDFS implement concat as mostly metadata operation on their side without moving data.
    • Therefore, once again it ends up being mostly a metadata operation for Pravega.
    • A special corner case of defrag is when series of chunks are in the same file consecutively and chunks can be merged in situ without modifying. This case can be optimized by inline merge while writing new chunk.

Truncate

  • The chunks at the head of the segment that are situated completely below the new start offset are deleted. Metadata operation is like deleting nodes from start of the list.

Rolling Storage

  • Whenever there is a need to rollover the segment or when large write needs to be split in smaller writes, new chunks is added as needed and metadata updated.

Metadata-only Operations

(When upgrade is supported - If segment header record does not exist, then attempt is made to create and populate segment header and chunk records using files found on tier-2.)

Create

New segment header added to KV store. No tier-2 object created.

OpenRead/OpenWrite

  • If segment header record exists in the KV store then the layout information from chunk metadata is read and cached.

Exists

  • Presence of segment header record which is marked active, indicates whether segment exists or not.

Seal

  • This is purely a metadata operation on segment header.

Unseal

  • This is purely a metadata operation on segment header.

Delete

  • Mark segment as deleted in segment header.
  • All tier-2 objects and chunk metadata is deleted asynchronously via Garbage collection.

Data Operations

Read

  • The list of chunk metadata is used to identify the chunks containing data.
  • Using chunk metadata, the corresponding tier-2 object/file is read. For efficient lookup, the chunk metadata is cached.

Write

  1. The list of chunk metadata is used to identify the last chunk if any.
  2. A new chunk is written to the tier-2 file/object. (to either an existing file or new depending on underlying storage provider.)
  3. The metadata for new chunk is added.
  4. If the new chunk can be merged inline or triggers a merge, it is merged. Relevant fields header and affected chunk metadata is updated in a single transaction.

Data Operations in Burst Mode (Optional)

Read

  • Data need not be read sequentially one chunk after another. All the relevant chunks could be read in parallel. This is beneficial where underlying storage itself is highly distributed and replicated (Eg. HDFS, Cloud storages like S3) giving higher effective throughput.

  • Reading chunks in parallel

    • Just like normal read The list of chunk metadata is used to identify the chunks containing data.
    • Using chunk metadata, the corresponding tier-2 object/file is read in parallel. For efficient lookup, the chunk metadata is cached.
    • Also the same chunk can be read using multiple parallel requests to read non-overlapping subranges to fully utilize the bandwidth/threadpool available.

Key Scenarios

Rolling Storage

  • All rolling storage functionality will be implemented by ChunkStorageManager.

Segment Store Failover

  1. New SS records the size of chunk that it sees.
  2. New SS seals the chunk at that offset (from previous step)
  3. Old SS can keep on writing even after this, but that doesn’t matter as we'll not read data after recorded offset.
  4. Old SS is fenced for tier-1 from making any metadata updates (all table segment updates go through tier-1).
  5. New SS starts a new chunk.
  6. New SS adds a metadata record for the new chunk.
  7. New SS replays the Write Ahead Log.
  8. New SS saves data to new chunk.
  9. If new SS fails, the process repeats

Key Concerns

Bootstrap

  1. During Segment Store startup container metadata and storage metadata table segments are added as pinned segments inside SLTS.
  2. During startup of the secondary services. Chunks for metadata and storage metadata table segments are discovered and metadata about them is populated in storage metadata segment. (This is done in order to avoid circular dependency.)

Why SLTS needs System Journal?

Turtle all the way downpaint

SLTS Storage System Segments.

SLTS stores metadata about all segments and their chunks in special metadata table segments. In this document we refer them as "Storage System Segments". There are two such segments

  • Container Metadata Segment (and it's attribute segment)
  • Storage Metadata Segment (and it's attribute segment)
Storage System Segments are table segments
  • Note that Storage System segments are table segments and as such the data in them is regularly flushed to LTS.
  • This means sometimes to read KV pair additional read requests are issued to fetch pages from those table segments.
Metadata about the Storage System Segments themselves creates circular dependency.
  • Each Storage operation must use metadata stored in Storage System Segments to fulfill the read, write or any other requests on any segment in the system.
  • That also includes the reads and write of the Storage System Segments themselves.
  • However the metadata about the Storage System Segments can not be stored in those segments themselves. It must be stored somewhere else. Otherwise it creates a circular dependency.

The solution : Log Structured Snapshotting Metadata Store

The solution is to pin metadata about Storage System Segments to memory and journal the changes to those records.
  • SLTS pins metadata about Storage System Segments to memory - which means it is never evicted from memory.
  • The metadata about the storage system segments is thus never written to those segments themselves.
  • In order to not loose changes when segment store is restarted and when container is relocated or otherwise tolerate any such failures, the changes to in-memory metadata are written to journal files as change log records.
Journals are on per container basis.
  • Each container has separate journal.
To recreate in-memory state of Storage System Segment metadata, journal records are replayed at boot time.
  • During the SLTS startup all journal records are applied sequentially to recreate in-memory state.
For efficient replay of the records periodically Snapshot records are created.
  • The snapshot records record the state of Storage System Segments at given time.
  • This means if we start from the known snapshot then only the journal records which are created after snapshot need be applied.

Where do we store pointer to latest snapshot?

Choice

Snapshot info store.

  • The pointer to latest snapshot is stored in Snapshot info store.

Snapshot info store implementation.

  • The snapshot info is stored as core attributes of container metadata segment.
  • The core attributes themselves are saved via book keeper

Key aspects of design.

  • journal records are written sequentially.
  • New journal files are started when
    • When there is a failure during write
    • Journal object reaches the size limit
    • New snapshot is created
  • With no append mode each journal record is written to its own file.
  • With append mode new records are appended to the same active journal.
  • Snapshot records are created after any of threshold below is reached first
    • fixed number of records are written to the journal (Default value is after every 100 records)
    • or after fixed time interval since last snapshot.(Default value is after every 5 minutes).

How the boot algorithm works.

  1. Find the latest valid snapshot by reading from Snapshot info store.
  2. Sequentially read and apply all the changes after latest snapshot. Starting from the last snapshot, read and apply each journal file sequentially to this snapshot
  3. Apply final truncation changes
  4. If applicable reconcile the length of last chunk for each storage system segment by comparing it
  5. Validate the changes and create new snapshot (Check that none of the invariants are broken and all data is consistent. Optionally, for each system segment check that all chunks actually exists on the LTS.)

Types of changes journaled

Journal Records

Addition of chunk

Following info is included in the journal record

  • Name of the segment.
  • Name of the new chunk added.
  • Offset at which the chunk is added.
  • Name of the previous last chunk.

Truncation of segment

Following info is included in the journal record

  • Name of the segment.
  • Name of the new first chunk.
  • Offset inside the chunk at which valid data starts
  • New start offset for segment

Snapshot record

Each snapshot record is saved in a separate file/object. Following info is included in the record

  • Epoch
  • Last journal file before snapshot is created.
  • For each system segment
    • Segment Record
    • List of chunk records

Snapshot info Record Check point record is simply the pointer to the latest/most recent snapshot record for given time interval. Stored in Following info is included in the record

  • epoch
  • id of the checkpoint record.

Key assumptions.

  • Each instance only writes to its own journal files only.
  • The epoch is monotonically increasing number. (Newer instances have higher epoch)
  • The time interval between two snapshots is several orders of magnitude larger time interval than possible clock skew. (see https://www.eecis.udel.edu/~mills/ntp/html/discipline.html See note below)
  • The time interval between two snapshots is order of magnitude larger than time taken by container to boot.
  • Chunk naming scheme is such that
    • No two journal chunks can have same name
    • names include container id
    • names include epoch of instance writing the journal.
    • names include monotonically increasing numbers for.

Note on clock skew

According to this document https://www.eecis.udel.edu/~mills/ntp/html/discipline.html

If left running continuously, an NTP client on a fast LAN in a home or office environment can maintain synchronization nominally within one millisecond.

To minimize clock skew and have smoothly functioning K8s cluster most k8s systems have npt installed on the host node. (verify)

Failure scenarios.

Failure during reads.
  • The read operation is retried fixed number of times (with fixed interval between the attempts)
Journal files created by Zombie instance
  • It is possible for Zombie instance to continue to write to old journal files.
  • The algorithm is able to detect such zombie records by detecting that valid owner has larger epoch number and has alternative history of change.
  • In such cases the history is reset to delete zombie changes and only valid changes are applied.
Partially written records
  • Any partially written records are ignored.
  • After each failure to write new journal is started.
Duplicate records
  • Duplicate records can be created when during retry where original request succeeds while writer encounters timeout and retries.
  • Upon encountering previously applied record again, such duplicate records are ignored.
Failure during write of snapshot and checkpoint.
  • Checkpoint is attempted fixed number of times. Each attempt gets a unique predictable name.
  • snapshot is not considered successful if the snapshot file can not be read back and validated.

Backup and Restore of the segments.

Backup

  • List all segments
  • For each segment metadata is exported to a file (using well-known file convention)

Restore

  • Import segment metadata previously imported to a file.

Compatibility

We wish to be backwards compatible with RollingStorage based Tier 2 layouts.

Backward compatibility
  • ChunkStorageManager will implement a mechanism to import existing RollingStorage header files into Segment Metadata Store. When ChunkStorageManager can not find any record it will attempt to import the RollingStorage header files.
  • In addition, ChunkStorageManager will implement a mechanism to export existing Segment Metadata Store records in RollingStorage header files on demand.

Migration and Upgrade

Breaking changes

The simplified tier-2 has several breaking changes.

Tier-2 Layout Changes

File changes

File What it is used for Before Format Change
Segment header Individual files per segment. Stores layout of chunks Append only list of chunks. Deleted
Segment chunk file Stores actual user data No change No change
Segment attributes Serialized change log to attributes key value pairs No change No change
Storage Metadata (for each container) Shared for all segments. Data is stored in a table segment. Does not exist Added two types of records viz.SegmentMetadata, ChunkMetadata

Data stored in header file with old version

Data is serialized using custom serializer that uses string key value pairs. https://github.com/pravega/pravega/blob/c7ac009970787df2633163644691aa3d72fc1a4e/segmentstore/storage/src/main/java/io/pravega/segmentstore/storage/rolling/HandleSerializer.java

Following data is serialized.

  • Rolling Policy Max Size

  • List of chunks where each chunk

    • StartOffset

    • ChunkName

Data stored in new version

The data is serialized/deserialized using VersionedSerializer that supports backward and forward compatibility.

Please refer to data structure section for new metadata records.

Naming convention changes

We intend to keep same naming convention as today.
Caveat -

  • Currently we implement concat by copying source data to the target, in process deleting old chunks and creating new chunks as required. However with newer version there is an option of “metadata only concat” in which the chunks or data don’t change but only the segment metadata about the layout changes. In this specific case the naming convention breaks as list of metadata for each chunks of source segments is appended into such list for target without actually renaming chunks or making any data changes.

  • Naming convention breaks for HDFS as we may no longer use epoch in name.

Behavior changes

Fencing is not required any more as such the fencing is no more file name based.

How data is imported/exported.

The conversion to and from metadata can be triggered in various ways (either on the fly internally, by the tool or as part of the background job). The storage system provides API that can be called to do conversion.

Import

  1. Find header file for given segment.

  2. Parse the content.

  3. Insert chunk information into the metadata table.

Export

  1. Gather all the metadata for the given segment from metadata store.

  2. Serialize this data to the old header format.

  3. Write header file for given segment.

Recommended Approach

In case of existing systems, the recommended approach is that Pravega starts writing in newer layout format only after post migration that too only when the user explicitly/manually changes config value to enable it. Pravega does not automatically upgrade to the new format and it will continue to write in old layout format.

For a brand new install, the recommended approach is that user choose to use newer layout directly (without need to first use old format)

Supported Options

Option for upgrade Code Version Layout Version Notes
NO_UPGRADE Old Old. ChunkedSegmentStorageis not used. New code is deployed but not activated.
CODE_ONLY_UPGRADE New Old ChunkedSegmentStorageis used but it writes in old format. Default option.
LAYOUT_UPGRADE New New ChunkedSegmentStorageis used it does not write in old format. This option assumes new code is already deployed. This upgrade is non reversible.
INSTALL_NEW New New ChunkedSegmentStorageis used. As it is a new install Pravega write all the metadata in new format.
  • Outline of approach

    • There is configuration option to select layout format from following

      • Write exclusively in old layout format, (selected by default)

      • Write exclusively write in new format.

    • By default, this option is set to write exclusively in old format. During and after upgrade the new code **always **writes in old format. Thus, during upgrade itself there is no possibility of layout version mismatch.

      • Post upgrade the user has to manually change option for layout.

      • Once new layout is enabled, old data is converted either on the fly.

        • For on the fly conversion, the trigger is when segment metadata is not found in the metadata store. In this case, the header file is read and converted into metadata records.
      • Once Pravega starts exclusively write in new format, it cannot go back to older format. (Even if user changes config option)

If necessary, an admin tool can be provided to export metadata in old format for DR and support scenarios.

  • Pros

    • Simplifies actual Pravega upgrade.

    • Explicit user triggered action to change format.

    • New format is off by default.

  • Cons

    • Once the user selects writing exclusively in new format, the system will be incompatible with the way old code works. So there is no downgrade path available.

Key Questions Answered

  1. How do we guarantee zombie segment store cannot write to tier-2? Write operation is not considered complete until metadata about that write is committed. When tier-1 ledger is closed, the zombie segment store is fenced out and cannot write to tier-1. This means it cannot metadata in the table segment neither can it truncate the ledger. Whatever data written to tier-2 by zombie after its last successful metadata update will be simply ignored. This guarantees consistency for tier-2.

  2. Will this break if tier-1 is changed to something other than Bookkeeper? Yes. Unless its replacement also provides automatic fencing on closed ledgers.

  3. How do we guarantee multiple segment stores never write to the same file? Each segment store instance gets a unique id when instantiated (epoch in current case). This id is used/included while generating a file/object name for a given segment.

  4. You mentioned both Append and Merge. Aren’t we supposed to have CRWD only? The API contract does not require append. But is free to use append to improve performance by writing multiple chunks to the same underlying file/object. Merge is also optional but if efficient implemntation is available we should be able to leverage it.

  5. There will be large number of small objects. Will this not be a problem? No.

  • (a) For the storage providers that do provide append operations - Same object/file will contain multiple consecutive chunks. Because of guaranteed Single Writer pattern no fencing gymnastics are necessary thereby providing fast write performance. Because same file is appended to the resultant file contains several chunks which improves read performance as well. The chunk metadata can be merged “inline”.
  • (b) For the storage providers that do not provide append we defragment the segment by using native concat capabilities which are assumed to be efficiently implemented by underlying storage.
  • (c) For reads – intermediate results for offset search will be aggressively cached.

Design Details

Components

Component Responsibilities Collaborations
ChunkMetadataStore Store all the metadata for the segments associated with for the given container. Data is stored as simple KV store. Interface follows repository pattern. This includes write through cache for SMTS entry
ChunkedSegmentStorage Translates segment operations to operations on individual chunks.
ChunkStorage (s) Component that provides bindings for various storage providers.
StorageWriter Periodically writes data to tier-2
StorageReader Prefetches the data from tier-2
StorageOptimiser Periodically merges set of small tier-2 objects into a larger object. Remove deleted objects from the tier-2.

API contracts

ChunkStorage

/**
 * Handle to a chunk
 */
public class ChunkHandle {
    /**
     * Gets the name of the Chunk.
     */
    String getSegmentName();

    /**
     * Gets a value indicating whether this Handle was open in ReadOnly mode (true) or ReadWrite mode (false).
     */
    boolean isReadOnly();
}

/**
 * Chunk Information.
 */
public class ChunkInfo {
    /**
     * Gets length of the chunk.
     * @return long length of the chunk.
     */
    long getLength();

    /**
     * Gets name of the chunk
     * @return String name of the chunk.
     */
    String getName();
   
}

@Beta
public interface ChunkStorage extends AutoCloseable {
    /**
     * Gets a value indicating whether this Storage implementation supports {@link ChunkStorage#truncate(ChunkHandle, long)} operation on underlying storage object.
     *
     * @return True or false.
     */
    boolean supportsTruncation();

    /**
     * Gets a value indicating whether this Storage implementation supports append operation on underlying storage object.
     *
     * @return True or false.
     */
    boolean supportsAppend();

    /**
     * Gets a value indicating whether this Storage implementation supports native merge operation on underlying storage object.
     *
     * @return True or false.
     */
    boolean supportsConcat();

    /**
     * Determines whether named file/object exists in underlying storage.
     *
     * @param chunkName Name of the storage object to check.
     * @return True if the object exists, false otherwise.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Boolean> exists(String chunkName);

    /**
     * Creates a new file.
     *
     * @param chunkName String name of the storage object to create.
     * @return ChunkHandle A writable handle for the recently created chunk.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<ChunkHandle> create(String chunkName);

    /**
     * Deletes a file.
     *
     * @param handle ChunkHandle of the storage object to delete.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Void> delete(ChunkHandle handle);

    /**
     * Opens storage object for Read.
     *
     * @param chunkName String name of the storage object to read from.
     * @return ChunkHandle A readable handle for the given chunk.
     * @throws IllegalArgumentException If argument is invalid.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<ChunkHandle>  openRead(String chunkName);

    /**
     * Opens storage object for Write (or modifications).
     *
     * @param chunkName String name of the storage object to write to or modify.
     * @return ChunkHandle A writable handle for the given chunk.
     * @throws IllegalArgumentException If argument is invalid.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<ChunkHandle>  openWrite(String chunkName);

    /**
     * Retrieves the ChunkInfo for given name.
     *
     * @param chunkName String name of the storage object to read from.
     * @return ChunkInfo Information about the given chunk.
     * @throws IllegalArgumentException If argument is invalid.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<ChunkInfo> getInfo(String chunkName);

    /**
     * Reads a range of bytes from the underlying storage object.
     *
     * @param handle       ChunkHandle of the storage object to read from.
     * @param fromOffset   Offset in the file from which to start reading.
     * @param length       Number of bytes to read.
     * @param buffer       Byte buffer to which data is copied.
     * @param bufferOffset Offset in the buffer at which to start copying read data.
     * @return int Number of bytes read.
     * @throws IllegalArgumentException  If argument is invalid.
     * @throws NullPointerException      If the parameter is null.
     * @throws IndexOutOfBoundsException If the index is out of bounds or offset is not a valid offset in the underlying file/object.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Integer> read(ChunkHandle handle, long fromOffset, int length, byte[] buffer, int bufferOffset);

    /**
     * Writes the given data to the underlying storage object.
     *
     * <ul>
     * <li>It is expected that in cases where it can not overwrite the existing data at given offset, the implementation should throw IndexOutOfBoundsException.</li>
     * For storage where underlying files/objects are immutable once written, the implementation should return false on {@link ChunkStorage#supportsAppend()}.
     * <li>In such cases only valid offset is 0.</li>
     * <li>For storages where underlying files/objects can only be appended but not overwritten, it must match actual current length of underlying file/object.</li>
     * <li>In all cases the offset can not be greater that actual current length of underlying file/object. </li>
     * </ul>
     *
     * @param handle ChunkHandle of the storage object to write to.
     * @param offset Offset in the file to start writing.
     * @param length Number of bytes to write.
     * @param data   An InputStream representing the data to write.
     * @return int Number of bytes written.
     * @throws IndexOutOfBoundsException When data can not be written at given offset.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Integer> write(ChunkHandle handle, long offset, int length, InputStream data);

    /**
     * Concatenates two or more chunks using native facility. The first chunk is concatenated to.
     *
     * @param chunks Array of ConcatArgument objects containing info about existing chunks to be appended together.
     *               The chunks must be concatenated in the same sequence the arguments are provided.
     * @return int Number of bytes concatenated.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Integer> concat(ConcatArgument[] chunks);

    /**
     * Truncates a given chunk.
     *
     * @param handle ChunkHandle of the storage object to truncate.
     * @param offset Offset to truncate to.
     * @return True if the object was truncated, false otherwise.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Boolean> truncate(ChunkHandle handle, long offset);

    /**
     * Makes chunk as either readonly or writable.
     *
     * @param handle     ChunkHandle of the storage object.
     * @param isReadonly True if chunk is set to be readonly.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     * @throws CompletionException If the operation failed, it will be completed with the appropriate exception. Notable Exceptions:
     * {@link ChunkStorageException} In case of I/O related exceptions.
     */
    CompletableFuture<Void> setReadOnly(ChunkHandle handle, boolean isReadonly);
}

BaseChunkStorage

The implementation of chunk provider will be further simplified by inheriting from BaseChunkStorage which provides logging and metrics. In this case, the derived class is supposed to override number of abstract methods.

    /**
     * Retrieves the ChunkInfo for given name.
     *
     * @param chunkName String name of the chunk to read from.
     * @return ChunkInfo Information about the given chunk.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected ChunkInfo doGetInfo(String chunkName) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Creates a new chunk.
     *
     * @param chunkName String name of the chunk to create.
     * @return ChunkHandle A writable handle for the recently created chunk.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected ChunkHandle doCreate(String chunkName) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Determines whether named chunk exists in underlying storage.
     *
     * @param chunkName Name of the chunk to check.
     * @return True if the object exists, false otherwise.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected boolean checkExist(String chunkName) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Deletes a chunk.
     *
     * @param handle ChunkHandle of the chunk to delete.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected void doDelete(ChunkHandle handle) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Opens chunk for Read.
     *
     * @param chunkName String name of the chunk to read from.
     * @return ChunkHandle A readable handle for the given chunk.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected ChunkHandle doOpenRead(String chunkName) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Opens chunk for Write (or modifications).
     *
     * @param chunkName String name of the chunk to write to or modify.
     * @return ChunkHandle A writable handle for the given chunk.
     * @throws ChunkStorageException    Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException If argument is invalid.
     */
    abstract protected ChunkHandle doOpenWrite(String chunkName) throws ChunkStorageException, IllegalArgumentException;

    /**
     * Reads a range of bytes from the underlying chunk.
     *
     * @param handle       ChunkHandle of the chunk to read from.
     * @param fromOffset   Offset in the chunk from which to start reading.
     * @param length       Number of bytes to read.
     * @param buffer       Byte buffer to which data is copied.
     * @param bufferOffset Offset in the buffer at which to start copying read data.
     * @return int Number of bytes read.
     * @throws ChunkStorageException     Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IllegalArgumentException  If argument is invalid.
     * @throws NullPointerException      If the parameter is null.
     * @throws IndexOutOfBoundsException If the index is out of bounds.
     */
    abstract protected int doRead(ChunkHandle handle, long fromOffset, int length, byte[] buffer, int bufferOffset) throws ChunkStorageException, NullPointerException, IndexOutOfBoundsException;

    /**
     * Writes the given data to the chunk.
     *
     * @param handle ChunkHandle of the chunk to write to.
     * @param offset Offset in the chunk to start writing.
     * @param length Number of bytes to write.
     * @param data   An InputStream representing the data to write.
     * @return int Number of bytes written.
     * @throws ChunkStorageException     Throws ChunkStorageException in case of I/O related exceptions.
     * @throws IndexOutOfBoundsException Throws IndexOutOfBoundsException in case of invalid index.
     */
    abstract protected int doWrite(ChunkHandle handle, long offset, int length, InputStream data) throws ChunkStorageException, IndexOutOfBoundsException;

    /**
     * Concatenates two or more chunks using storage native functionality. (Eg. Multipart upload.)
     *
     * @param chunks Array of ConcatArgument objects containing info about existing chunks to be concatenated together.
     *               The chunks must be concatenated in the same sequence the arguments are provided.
     * @return int Number of bytes concatenated.
     * @throws ChunkStorageException         Throws ChunkStorageException in case of I/O related exceptions.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     */
    abstract protected int doConcat(ConcatArgument[] chunks) throws ChunkStorageException, UnsupportedOperationException;

    /**
     * Truncates a given chunk.
     *
     * @param handle ChunkHandle of the chunk to truncate.
     * @param offset Offset to truncate to.
     * @return True if the object was truncated, false otherwise.
     * @throws ChunkStorageException         Throws ChunkStorageException in case of I/O related exceptions.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     */
    abstract protected boolean doTruncate(ChunkHandle handle, long offset) throws ChunkStorageException, UnsupportedOperationException;

    /**
     * Sets readonly attribute for the chunk.
     *
     * @param handle     ChunkHandle of the chunk.
     * @param isReadOnly True if chunk is set to be readonly.
     * @return True if the operation was successful, false otherwise.
     * @throws ChunkStorageException         Throws ChunkStorageException in case of I/O related exceptions.
     * @throws UnsupportedOperationException If this operation is not supported by this provider.
     */
    abstract protected boolean doSetReadOnly(ChunkHandle handle, boolean isReadOnly) throws ChunkStorageException, UnsupportedOperationException;

Data structures

Segment Metadata (1 per segment)

  • Key – segment name ,
  • Persisted to disk – yes (via table store)
  • Mutable – yes
  • Value - Serialized record containing following info
    • Status (bit flags indicating status of the segment - Sealed , Deleted etc etc.)
    • Start offset
    • Length
    • Last chunk pointer
    • First chunk pointer
    • Max Rolling Length - Per segment setting
    • Owner Epoch - epoch of the owner segment store.
    • Offset at which last chunk starts (optimization useful during calculations of correct offset for writing new data)
    • Offset at data in the first chunk starts (optimization useful during reading when truncation is involved.)

Chunk Metadata ( 1 per chunk, multiple )

  • Key – “chunk-” , where N is contiguous monotonically increasing chunk number starting with 0
  • Persisted to disk – yes (via table store)
  • Mutable – Mostly immutable , once written this data seldom changes.
  • Value - Serialized record containing following info
    • Chunk path/name on the tier-2 storage
    • length
    • Pointer to next chunk metadata record. (String name)

Assumptions

It is helpful for us to explicitly state these assumption to better understand available design space and it’s constraints.

Throughput

It is a fundamental assumption of Pravega architecture that Tier-1 provides low latency using high end storage devices whereas Tier-2 provides cheap long-term storage at higher throughput. The storage system design will take advantage of following facts

  • Tier-2 is not on a critical path for write operation. Write to tier-2 is designed to be async.
  • Tail reads should be mostly served from cache. Tier-2 should not be on a critical path for tail read.
  • In case of historical/Batch reads, throughput matters substantially more than fast “first byte read time” and any latency in reading the first bytes is eventually amortized by higher throughput.
  • In addition, for sequential reads the data can be prefetched in large read operations to increase performance further.

Segment is an opaque sequence of bytes

There is a hard requirement that the actual sequence of data bytes stored inside a storage object/file must match byte by byte to sequence of bytes that user provided.

  • Tier-2 does not interprete the user provided bytes in any way. For pravega the data is an opaque.
  • Tier-2 does not change the original sequence of bytes nor mutates them.
  • There are no any addition headers and footers added directly to or embeded in the user data.
  • The actual sequence of bytes is not changed at all. Eg. data is not encrypted, not compressed, no erasure encoding or error correction bits are added.
  • However we may store a single segment in Multiple storage objects
  • There may be additional headers/footers as separate metadata objects in addition to segment data.

Strong assumptions about tier-1 fencing.

This design critically depends on ability of Bookkeeper tier-1 to fence old/outdated bookie writer.

Priorities and Tradeoffs

Any system design inevitably involves making tough choices when conflicting requirements cannot be achieved simultaneously. Below are our guidelines when making such choices.

  1. Prefer consistency and correctness guarantees over higher performance.
  2. Prefer write throughput over write latency.
  3. Prefer focus on making normal performance fast and keeping it just-good-enough/tolerable/acceptable in exceptional situations.
  4. Prefer writing big chunks of data over eagerly evicting cache.
  5. Prefer not evicting system segments whenever possible.
  6. Prefer admin initiated maintenance operations.
  7. Prefer conservative approach to changing on-disk metadata formats by being backward and forward compatible.

Pros and Cons

The upside:

  • No longer using Tier 2 for Fencing. All such operations are done by relying in Tier 1 writes (which have fencing built-in).
  • No longer worrying about zombie cases (runaway segment Container instances) as Container instances will not be competing for the same Chunks anymore.
  • Support for wider variety of Tier 2 implementations
  • Less metadata in Tier 2 (no more Header files)

The downside:

  • Expecting a larger number of Tier 2 Chunks (Files, Objects, etc.)
  • Require access to Tier 1 (Metadata Table) so we can determine the Tier 2 layout of a Segment.
    • This can be mitigated by periodically snapshoting the SMTS contents as a Header file (see Backwards Compatibility)
Clone this wiki locally