Skip to content

Commit

Permalink
chore(snapshot): Small cleanup in Snapshot code (#4377)
Browse files Browse the repository at this point in the history
* chore(snapshot): Small cleanup in Snapshot code

Signed-off-by: Stepan Bagritsevich <[email protected]>

* refactor: address comments

Signed-off-by: Stepan Bagritsevich <[email protected]>

---------

Signed-off-by: Stepan Bagritsevich <[email protected]>
  • Loading branch information
BagritsevichStepan authored Dec 26, 2024
1 parent 0065c27 commit 9fbb301
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class RdbSerializer : public SerializerBase {
// Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
// This function might preempt if flush_fun_ is used.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
uint32_t mc_flags, DbIndex dbid);

Expand Down
31 changes: 16 additions & 15 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;

SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice,
SnapshotDataConsumerInterface* consumer, Context* cntx)
: db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
db_array_ = slice->databases();
: db_slice_(slice),
db_array_(slice->databases()),
compression_mode_(compression_mode),
consumer_(consumer),
cntx_(cntx) {
tl_slice_snapshots.insert(this);
}

Expand Down Expand Up @@ -163,16 +166,15 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {

uint64_t last_yield = 0;
PrimeTable* pt = &db_array_[db_indx]->prime;
current_db_ = db_indx;

VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
do {
if (cntx_->IsCancelled()) {
return;
}

PrimeTable::Cursor next =
pt->TraverseBuckets(cursor, [this](auto it) { return BucketSaveCb(it); });
PrimeTable::Cursor next = pt->TraverseBuckets(
cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); });
cursor = next;
PushSerialized(false);

Expand Down Expand Up @@ -248,7 +250,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
}
}

bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) {
std::lock_guard guard(big_value_mu_);

++stats_.savecb_calls;
Expand All @@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

auto* blocking_counter = db_slice_->BlockingCounter();
Expand All @@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
// zero.
std::lock_guard blocking_counter_guard(*blocking_counter);

stats_.loop_serialized += SerializeBucket(current_db_, it);
stats_.loop_serialized += SerializeBucket(db_index, it);

return false;
}
Expand All @@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
while (!it.is_done()) {
++result;
// might preempt due to big value serialization.
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
SerializeEntry(db_index, it->first, it->second);
++it;
}
serialize_bucket_running_ = false;
return result;
}

void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
optional<uint64_t> expire, RdbSerializer* serializer) {
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
if (pv.IsExternal() && pv.IsCool())
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer);
return SerializeEntry(db_indx, pk, pv.GetCool().record->value);

time_t expire_time = expire.value_or(0);
if (!expire && pv.HasExpire()) {
time_t expire_time = 0;
if (pv.HasExpire()) {
auto eit = db_array_[db_indx]->expire.Find(pk);
expire_time = db_slice_->ExpireTime(eit);
}
Expand All @@ -322,7 +323,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
{db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags});
++type_freq_map_[RDB_TYPE_STRING];
} else {
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
io::Result<uint8_t> res = serializer_->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
CHECK(res);
++type_freq_map_[*res];
}
Expand Down
11 changes: 4 additions & 7 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class SliceSnapshot {
void SwitchIncrementalFb(LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeTable::bucket_iterator it);
bool BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it);

// Serialize single bucket.
// Returns number of serialized entries, updates bucket version to snapshot version.
unsigned SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator bucket_it);

// Serialize entry into passed serializer.
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);

// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
Expand Down Expand Up @@ -150,9 +149,7 @@ class SliceSnapshot {
};

DbSlice* db_slice_;
DbTableArray db_array_;

DbIndex current_db_;
const DbTableArray db_array_;

std::unique_ptr<RdbSerializer> serializer_;
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal
Expand All @@ -161,7 +158,7 @@ class SliceSnapshot {
bool serialize_bucket_running_ = false;
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
util::fb2::CondVarAny seq_cond_;
CompressionMode compression_mode_;
const CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

// version upper bound for entries that should be saved (not included).
Expand Down

0 comments on commit 9fbb301

Please sign in to comment.