Skip to content

Commit

Permalink
fix: stream memory counting during snapshot loading (#4346)
Browse files Browse the repository at this point in the history
* fix: stream memory counting during snapshot loading
  • Loading branch information
BorysTheDev authored Dec 27, 2024
1 parent 9fbb301 commit 5b9c7e4
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
19 changes: 19 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,25 @@ TEST_F(DflyEngineTest, DebugObject) {
EXPECT_THAT(resp.GetString(), HasSubstr("encoding:listpack"));
}

TEST_F(DflyEngineTest, StreamMemInfo) {
for (int i = 1; i < 2; ++i) {
Run({"XADD", "test", std::to_string(i), "var", "val" + std::to_string(i)});
}

int64_t stream_mem_first = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM];
EXPECT_GT(stream_mem_first, 0);

auto dump = Run({"dump", "test"});
Run({"del", "test"});
Run({"restore", "test", "0", facade::ToSV(dump.GetBuf())});

int64_t stream_mem_second = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM];

// stream_mem_first != stream_mem_second due to a preallocation in XADD command (see
// STREAM_LISTPACK_MAX_PRE_ALLOCATE)
EXPECT_GT(stream_mem_second, 0);
}

// TODO: to test transactions with a single shard since then all transactions become local.
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
Expand Down
3 changes: 3 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ extern "C" {
#include "server/serializer_commons.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/stream_family.h"
#include "server/transaction.h"
#include "strings/human_readable.h"

Expand Down Expand Up @@ -703,6 +704,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {

void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
stream* s;
StreamMemTracker mem_tracker;
if (config_.append) {
if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) {
return;
Expand Down Expand Up @@ -848,6 +850,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
if (!config_.append) {
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
}
mem_tracker.UpdateStreamSize(*pv_);
}

void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
Expand Down
30 changes: 12 additions & 18 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ namespace dfly {
using namespace facade;
using namespace std;

StreamMemTracker::StreamMemTracker() {
start_size_ = zmalloc_used_memory_tl;
}

void StreamMemTracker::UpdateStreamSize(PrimeValue& pv) const {
const size_t current = zmalloc_used_memory_tl;
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
pv.AddStreamSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
}

namespace {

struct Record {
Expand Down Expand Up @@ -612,24 +624,6 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) {
return 0;
}

class StreamMemTracker {
public:
StreamMemTracker() {
start_size_ = zmalloc_used_memory_tl;
}

void UpdateStreamSize(PrimeValue& pv) const {
const size_t current = zmalloc_used_memory_tl;
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
pv.AddStreamSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
}

private:
size_t start_size_{0};
};

OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.GetDbSlice();
Expand Down
13 changes: 13 additions & 0 deletions src/server/stream_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ namespace dfly {
class CommandRegistry;
struct CommandContext;

class CompactObj;
using PrimeValue = CompactObj;

class StreamMemTracker {
public:
StreamMemTracker();

void UpdateStreamSize(PrimeValue& pv) const;

private:
size_t start_size_{0};
};

class StreamFamily {
public:
static void Register(CommandRegistry* registry);
Expand Down
42 changes: 27 additions & 15 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .instance import DflyInstance, DflyInstanceFactory


@pytest.mark.slow
@pytest.mark.opt_only
@pytest.mark.parametrize(
"type, keys, val_size, elements",
Expand All @@ -23,7 +24,10 @@
# memory it might force the gh runner to run out of memory (since OOM killer might not even
# get a chance to run).
@dfly_args({"proactor_threads": 4, "maxmemory": "5gb"})
async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, elements):
async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
dbfilename = f"dump_{tmp_file_name()}"
instance = df_factory.create(dbfilename=dbfilename)
instance.start()
# Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that
# the gap between used_memory and rss is no more than `max_unaccounted_ratio`.
min_rss = 3 * 1024 * 1024 * 1024 # 3gb
Expand All @@ -35,7 +39,7 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
if type == "STREAM":
max_unaccounted = max_unaccounted * 3

client = df_server.client()
client = instance.client()
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly

cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}"
Expand All @@ -44,19 +48,27 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e

await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly

info = await client.info("memory")
logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory"] > min_rss, "Weak testcase: too little used memory"
delta = info["used_memory_rss"] - info["used_memory"]
# It could be the case that the machine is configured to use swap if this assertion fails
assert delta > 0
assert delta < max_unaccounted

if type != "STRING" and type != "JSON":
# STRINGs keep some of the data inline, so not all of it is accounted in object_used_memory
# We have a very small over-accounting bug in JSON
assert info["object_used_memory"] > keys * elements * val_size
assert info["used_memory"] > info["object_used_memory"]
async def check_memory():
info = await client.info("memory")
logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory"] > min_rss, "Weak testcase: too little used memory"
delta = info["used_memory_rss"] - info["used_memory"]
# It could be the case that the machine is configured to use swap if this assertion fails
assert delta > 0
assert delta < max_unaccounted

if type != "STRING" and type != "JSON":
# STRINGs keep some of the data inline, so not all of it is accounted in object_used_memory
# We have a very small over-accounting bug in JSON
assert info["object_used_memory"] > keys * elements * val_size
assert info["used_memory"] > info["object_used_memory"]

await check_memory()

await client.execute_command("SAVE", "DF")
await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs")

await check_memory()


@pytest.mark.asyncio
Expand Down

0 comments on commit 5b9c7e4

Please sign in to comment.