-
Notifications
You must be signed in to change notification settings - Fork 0
PDP 32 (Improve Scalability of Controller Metadata)
Currently the way we store stream's time series are in monolithic blobs. There are chiefly three time series that are stored in single blobs namely segments record, history and sealed segments record. If a stream has large number of average active segments and scales very aggressively then its time series metadata values (segment table, history table and sealedSegmentsRecord) can grow to become unwieldy. As an example, if a scale is performed every second then in only one year, we will have at least over 31M segments and 31M epochs. If average number of segments are large, then history time series for 31M records with list of active segments will be proportionally high.
So we need to reevaluate our schema to make it scale well for any stream comprising of arbitrarily large volumes of metadata.
As stated at the start, the reason our metadata does not scale currently is because we store time series as a monolithic blob and perform computations by fetching the entire time series. There are obvious gains to be made if we stored individual records instead of time series in terms of scalability. But, if we simply put all records individually, we lose the locality benefits and the downside could be increased number of store calls.
I will describe a scheme where, by restructuring the metadata columns across timeseries tables and storing individual rows of timeseries tables and changing the way we query against them, we can improve the scalability of metadata without compromising on query processing times and number of store calls.
- Before: segment table with each row representing a segment.
- After: Remove table and segment records.
To get all metadata corresponding to a segment:
i. segmentId captures the segment's creation epoch.
ii. Fetch epoch record and get all information about the Segment from the epoch record.
- segment creation time = epoch creation time
- segment range = stored in epoch record
- Before: history table
- After: Store individual epoch records against the epoch key. Include segment range information in the epoch.
- Maintain additional time based index for entire history.
- Maintain additional chunked time series of deltas between epochs.
Note: maintaining an index allows us to perform any time-based queries. However, this index can suffer from scalability problems and can grow unmanagably large.
For example, if a scale happens every second, then in 1 year, we will have 31 million epoch records and hence 31M records to be indexed. at 16 bytes per index record, it will grow to a size of 500Mb in one year! Storing it as an n-ary tree of depth `2` and branching factor of 100k we can store 1B chunks of index with each chunk taking about 800kb in size.
index => key: "historyIndex", value: root node for N-ary tree. Root node contains keys which are time start -> leaf node. Leaf nodes to contain time -> epoch keys>
public class HistoryTimeIndexRootNode {
private final List<Long> leaves;
int findLeafAtTime(long timestamp);
}
public class HistoryTimeIndexLeaf {
private final List<Long> records;
// perform binary search on records to find timestamp
int findIndexAtTime(long timestamp);
}
Individual records =>
Key: epoch-number, value: EpochRecord
public class EpochRecord {
private final int epoch;
private final int referenceEpoch;
private final List<SegmentRecord> segments;
private final long creationTime;
}
SegmentRecord:
public class SegmentRecord {
public static final SegmentRecordSerializer SERIALIZER = new SegmentRecordSerializer();
private final int segmentNumber;
private final int creationEpoch;
private final long creationTime;
private final double keyStart;
private final double keyEnd;
}
Since we are losing epoch's timeseries (history table), to compute successors we need to store an additional information about each segment's sealing epoch. This can be stored as additional individual records against each segment. This is used only for getSuccessors call and keeping this information separately makes the query optimal.
Segment Sealed Record:
Key: long segmentId
Value: int sealing epoch
Tail of stream:
Additionally maintain another record called CurrentEpochRecord
.
key: "current-epoch-record"
value: EpochRecord currentEpochRecord
Head of stream: Truncation point (head of stream):
key: "truncationRecord",
value: StreamTruncationRecord (defaults to Truncation.Empty)
// existing class. no change
In addition to individual Epoch records and index, queries on streamcut require timeseries. Fetching individual epoch records will result in large number of calls to the store which would make those processing sub-optimal. We can store additional information in history time series. Since time series has scalability problems, we will store time series of deltas across epochs. We will chunk this time series to have fixed number of records. The chunk size will be chosen to ensure that each chunk is individually relatively smaller.
So each row in time series will look as follows:
public class HistoryTimeSeries {
private final List<HistoryTimeSeriesRecord> historyRecords;
}
public class HistoryTimeSeriesRecord {
private final int epoch;
private final int referenceEpoch;
private final List<SegmentRecord> segmentsSealed;
private final List<SegmentRecord> segmentsCreated;
private final long scaleTime;
}
Typically, with auto scale, segments to seal will be bounded by 3
segments. NewSegments will be bounded by maxScaleFactor but would typically be small. Also, for rolling txn, we will simply use the reference epoch rather than storing all sealed and new segments delta as that captures the required information.
At a chunk size of 1000 and average delta size of 100 segments, we will need 800 kb of data. Typically each chunk would be much smaller.
Note: this time series will otherwise be sub-optimal for processing queries about segments and epochs and we will use individual epoch records for those. This is only be used for stream cut related queries where we need time series to find epochs and segments between two stream cut points.
We also need to store each segment's size at the time of sealing. This is used for retention, to compute size till any given stream cut. We used to store this information in a single monolith map of segmentId -> size:
- Before: Single record which is a serialized "java map" of sealed segment id -> last offset. This is not scalable! We can't store this record as java map just as before because it can grow very large and face scalabiity problems (31M records if one segment is sealed every second for a year).
This will clearly not scale for very large number of segments and we need to shard this map. After: But segments can seal at any arbitrary times, so sharding has to be based on a criteria that fairly divides the data across these shards. Segments have a segment Number. This is a monotonically increasing value. However, segments can get duplicate segments created which share the same segment number.
We will shard the map such that each a segmentId belongs to a shard if its segment number belongs to that shard. Each shard will contain fixed range of segment numbers say groups of 1000 segments.
Note: finding the shard where a particular segment would be found is a simple hash (segment-number divided by shard-size-for-sealed-segment-map). Another interesting property of such a sharded map would be that all segments that were created around similar times are found in the same shard.
public class SealedSegmentsMapShard {
private final int shardNumber;
/**
* Sealed segments with size at the time of sealing.
* segmentId -> sealed segment record.
* Each shard contains segments from a range of `segment number`.
* So each shard size would be say 10k segment numbers. Then the number of records in the map would be 10k * average number of duplicate epochs.
*
* So to get sealed segment record -> extract segment number from segment id. compute the shard by dividing segment number by 10k.
* Fetch the record from the shard.
*/
private final Map<Long, Long> sealedSegmentsSizeMap;
}
Updates and reads from these shards will be efficient because we will read values for large number of segments in a single read from the store.
For updates to the map, we can group our sealedSegments by shards and then update individual shards:
Map<Integer, List<Long>> shards = sealedSegmentSizes.values().stream()
.collect(Collectors.groupingBy(x -> StreamSegmentNameUtils.getSegmentNumber(x) / SHARD_SIZE));
shards.keySet.stream().foreach(//update)
For reading sealedSegment sizes for multiple segments:
Map<Integer, List<Segment>> shards = segments.stream().collect(Collectors.groupingBy(x -> x.getNumber() / SHARD_SIZE));
shards.keySet.stream().foreach(//read segment sizes)
// No changes.
- Before: map of time -> StreamCutRecord StreamCuts can be comprised of large number of segments. Storing all stream cuts together in a monolith blob can be very costly.
- After: break it into
retention-set + individual stream cut records
We dont need to chunk RetentionSet because retention frequency will be quite reasonable at say around 30 minutes. Which means in a year 48 * 365 (=17.5k) records would be there in the retentionSet. And each records' size would be 16 bytes. So this will be a fairly small blob even if it stores many years worth of data.
index => key: "retentionSet", value: RetentionSet
public class RetentionSet {
private final List<RetentionSetRecord> retentionRecords;
}
public class RetentionSetRecord {
final long recordingTime;
final long recordingSize;
}
individual records => key: time, value: StreamcutRecord
public class StreamCutRecord {
final long recordingTime;
final long recordingSize;
final Map<SegmentRecord, Long> streamCut;
}
Query: Get streamCut before size s
(or time t
)
Suboptimal: purge stale stream cut records (all streamcut records before newly selected truncation record). Will result in O(number of streamCut records to be purged calls to the store. ) Though purge can be done in background as its presence does not cause any correctness issues.
All other metadata records are either fixed sizes or individual values and will not become scalabiity bottlenecks.
These include: ConfigurationRecord, TruncationRecord, EpochTransitionRecord, State and ActiveTxnRecord and CompletedTxnRecords.
By changing underlying metadata schema the corresponding processing of queries on the metadata will also change. However, we do not need changes to StreamMetadataStore interface as we change the underlying metadata. There will be some changes to Api signatures which returned older schema objects and will now return newer schema objects. But the change will not affect business logic per around metadatastore interface in most cases and we can optimize certain processing to take advantage of new metadata schema by reducing some processing steps.
Next we will examine all
operations and evaluate the impact of the above metadata schema changes on the processing costs.
O(1)
read the `currentEpochRecord`
O(constant) // 4 calls to store
i. read history index root to find the correct leaf node. // binary search root
ii. read history index leaf node to find the epoch at the time. // binary search leaf
iii. read history epoch node found at the given time from step ii
iv. read the `TruncationRecord`
v. compute active segments by overlaying `truncationRecord` on historyEpoch
O(constant) // 3 calls to store
i. segment-sealed-epoch = getSegmentSealedEpoch(segmentId).
ii. sealedEpoch = getEpochRecord(segment sealed epoch).
iii. previousEpoch = getEpochRecord(segment sealed epoch - 1)
iv. compute successorsWithPredecessors(sealedEpoch, previousEpoch, segmentId)
O(number of segments to be sealed) // typically small for auto scale. Can be as many as `number of segments in an epoch` for manual scale.
i. read currentEpoch
ii. Compute epoch transition record.
iii. create new epoch record.
iv. seal old segments // O(number of segments to be sealed) calls to store to update sealed segment sizes and sealing epoch
v. update history time series chunk.
vi. history time index leaf. update history time index root if needed.
vii. update `currentEpochRecord`
viii. reset epochtransitionrecord.
ix. reset state.
O(`number of segments in txn epoch + number of segments in current epoch`) // this can be large if we have large number of segments in an epoch (say 50k).
i. read txn epoch
ii. read current epoch
iii. create txn epoch duplicate
iv. create current epoch duplicate
v. commit txns // O(number of segments in txn epoch) calls to store to update sealed segment size for duplicate txn epoch segments.
vi. seal current epoch // O(number of segments to be sealed) calls to store to update sealed segment size and sealing epoch
vii. update history time index leaf. update history time index root if needed.
viii. history time series chunk.
ix. update `currentEpoch`
x. update committingTxnRecord
xi. update state.
i fetch existing `truncationRecord`
ii. verify newTruncationStreamCut > `truncationRecord.streamCut` // see `streamCutComparison`.
iii. update truncationRecord.
iv. identify segments to delete and segments to truncate
v. update truncationRecord
Typically a stream cut should not span large number of epochs. Which means we will typically need to fetch one or more history time series chunks. We should be able to fetch history time series corresponding to epoch number and we should find all concerned epochs within one or two chunks. If chunk size is 1000 epochs, then a stream cut has to span more than 1000 epochs for us to need to fetch 3 chunks.
streamCutComparison
:
O(number of epochs that stream cut spans) // for computing epoch cut maps.
i. computeEpochCut
ii. compare(SC1, SC2) => SC1.foreach(s1 -> SC2.noneMatch(s2 -> s1.overlaps(s2) && s1.id > s2.id))
computeSegmentsToDelete
:
O(`current.epochCutMap.highEpoch - previous.epochCutMap.lowEpoch` / shard-size-for-sealed-segment-map) calls to store. Should be fairly small.
i. delete getSegmentsBetweenStreamCuts (`stream head`, currentStreamCut).
stream cut span => this is a data structure that takes a streamCut and computes epochs corresponding to segments in the streamCut to get the smallest possible span of the streamCut.
epoch = highestSegmentNumber(SC1).epoch
toFind = streamCut.segments
while(!toFind.isEmpty) {
epoch.getRecord.contains(toFind).foreach(x -> toFind.remove(x); span.put(segmentRecord(x), epoch));
epoch--;
}
i. compute stream cut span map for both stream cuts.
ii. Fetch history time series for SC1.span.lowEpoch till SC2.span.highEpoch and keep including segments in the result set.
O(number of epochs between previous and new stream cut) + O(number of segments to delete) calls to store
i. getRetentionIndex
ii. compute new truncation record
- getCurrentSegments and generate streamCut // O(1) call to store
// O(1) store call to generate epoch cut map for the given stream cut.
- computeSizeTillStreamCut
iii. find truncation streamcut record from retention index that satisfies retention condition. Compute new truncation record and submit.
computeSizeTillStreamCut
->
O(number of epochs between two stream cuts / shard-size-for-sealed-segment-map)
i. see if there exists a previous streamcut till which we know the size (previous stream cut in retention set).
ii. Find segments between two stream cuts.
iii. find sizes of all such segments and compute size of segments between streamcuts.
iv. compute new stream cut size as increment over previous stream cut size.
Note: Typically number of epochs between two stream cuts / shard-size-for-sealed-segment-map
should be fairly small number if shard-size-for-sealed-segment-map is large enough. For example, if two stream cuts are far apart (say a year) and we have 31M epochs between them, if the shard size is 1000, we would have reduced the number of calls to store to 31k.
O(constant) calls to the store.
i. find the epochs corresponding to `from` and `to` times from history index
ii. fetch history time series chunks.
iii. fetch epoch corresponding to `from` and build the time series.
Pravega - Streaming as a new software defined storage primitive
- Contributing
- Guidelines for committers
- Testing
-
Pravega Design Documents (PDPs)
- PDP-19: Retention
- PDP-20: Txn Timeouts
- PDP-21: Protocol Revisioning
- PDP-22: Bookkeeper Based Tier-2
- PDP-23: Pravega Security
- PDP-24: Rolling Transactions
- PDP-25: Read-Only Segment Store
- PDP-26: Ingestion Watermarks
- PDP-27: Admin Tools
- PDP-28: Cross Routing Key Ordering
- PDP-29: Tables
- PDP-30: Byte Stream API
- PDP-31: End-to-End Request Tags
- PDP-32: Controller Metadata Scalability
- PDP-33: Watermarking
- PDP-34: Simplified-Tier-2
- PDP-35: Move Controller Metadata to KVS
- PDP-36: Connection Pooling
- PDP-37: Server-Side Compression
- PDP-38: Schema Registry
- PDP-39: Key-Value Tables Beta 1
- PDP-40: Consistent Order Guarantees for Storage Flushes
- PDP-41: Enabling Transport Layer Security (TLS) for External Clients
- PDP-42: New Resource String Format for Authorization
- PDP-43: Large Events
- PDP-44: Lightweight Transactions
- PDP-45: Health Check
- PDP-46: Read Only Permissions For Reading Data
- PDP-47: Pravega Message Queues
- PDP-48: Key-Value Tables Beta 2
- PDP-49: Segment Store Admin Gateway
- PDP-50: Stream Tags
- PDP-51: Segment Container Event Processor
- PDP-53: Robust Garbage Collection for SLTS