Skip to content

Commit

Permalink
Merge pull request ClickHouse#55164 from ClickHouse/vdimir/fix_file_c…
Browse files Browse the repository at this point in the history
…ache_tmp_write_buffer

Fix file cache temporary file segment range in FileSegment::reserve
  • Loading branch information
kssenii authored Oct 16, 2023
2 parents 6c543c7 + e816ec7 commit a4bd689
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/Interpreters/Cache/FileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
/// This (resizable file segments) is allowed only for single threaded use of file segment.
/// Currently it is used only for temporary files through cache.
if (is_unbound && is_file_segment_size_exceeded)
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
/// Note: segment_range.right is inclusive.
segment_range.right = range().left + expected_downloaded_size + size_to_reserve - 1;

/// if reserve_stat is not passed then use dummy stat and discard the result.
FileCacheReserveStat dummy_stat;
Expand Down
54 changes: 46 additions & 8 deletions src/Interpreters/tests/gtest_lru_file_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
#include <gtest/gtest.h>

#include <filesystem>
#include <iomanip>
#include <iostream>


#include <algorithm>
#include <numeric>
#include <random>
#include <memory>
#include <thread>

#include <Common/randomSeed.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>

#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Context.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <base/hex.h>
#include <base/sleep.h>
#include <gtest/gtest.h>
#include <Poco/DOM/DOMParser.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Common/CurrentThread.h>
Expand Down Expand Up @@ -187,6 +196,12 @@ class FileCacheTest : public ::testing::Test
else
setupLogs(TEST_LOG_LEVEL);

UInt64 seed = randomSeed();
if (const char * random_seed = std::getenv("TEST_RANDOM_SEED")) // NOLINT(concurrency-mt-unsafe)
seed = std::stoull(random_seed);
std::cout << "TEST_RANDOM_SEED=" << seed << std::endl;
rng = pcg64(seed);

if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
fs::create_directories(cache_base_path);
Expand All @@ -198,6 +213,7 @@ class FileCacheTest : public ::testing::Test
fs::remove_all(cache_base_path);
}

pcg64 rng;
};

TEST_F(FileCacheTest, get)
Expand Down Expand Up @@ -679,7 +695,7 @@ TEST_F(FileCacheTest, writeBuffer)
FileCache cache("6", settings);
cache.initialize();

auto write_to_cache = [&cache](const String & key, const Strings & data, bool flush)
auto write_to_cache = [&cache, this](const String & key, const Strings & data, bool flush, ReadBufferPtr * out_read_buffer = nullptr)
{
CreateFileSegmentSettings segment_settings;
segment_settings.kind = FileSegmentKind::Temporary;
Expand All @@ -694,24 +710,32 @@ TEST_F(FileCacheTest, writeBuffer)
WriteBufferToFileSegment out(&segment);
std::list<std::thread> threads;
std::mutex mu;
for (const auto & s : data)

/// get random permutation of indexes
std::vector<size_t> indexes(data.size());
std::iota(indexes.begin(), indexes.end(), 0);
std::shuffle(indexes.begin(), indexes.end(), rng);

for (auto i : indexes)
{
/// Write from diffetent threads to check
/// that no assertions inside cache related to downloaderId are triggered
const auto & s = data[i];
threads.emplace_back([&]
{
std::unique_lock lock(mu);
out.write(s.data(), s.size());
/// test different buffering scenarios
if (flush)
{
out.next();
}
});
}
for (auto & t : threads)
t.join();

out.finalize();
if (out_read_buffer)
*out_read_buffer = out.tryGetReadBuffer();
return holder;
};

Expand All @@ -721,17 +745,31 @@ TEST_F(FileCacheTest, writeBuffer)
file_segment_paths.emplace_back(holder->front().getPathInLocalCache());

ASSERT_EQ(fs::file_size(file_segment_paths.back()), 7);
ASSERT_TRUE(holder->front().range() == FileSegment::Range(0, 7));
EXPECT_EQ(holder->front().range().size(), 7);
EXPECT_EQ(holder->front().range().left, 0);
ASSERT_EQ(cache.getUsedCacheSize(), 7);

{
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"}, true);
ReadBufferPtr reader = nullptr;

auto holder2 = write_to_cache("key2", {"22", "333", "4444", "55555", "1"}, true, &reader);
file_segment_paths.emplace_back(holder2->front().getPathInLocalCache());

std::cerr << "\nFile segments: " << holder2->toString() << "\n";

ASSERT_EQ(fs::file_size(file_segment_paths.back()), 15);
ASSERT_EQ(holder2->front().range(), FileSegment::Range(0, 15));
EXPECT_TRUE(reader);
if (reader)
{
String result;
readStringUntilEOF(result, *reader);
/// sort result to make it independent of the order of writes
std::sort(result.begin(), result.end());
EXPECT_EQ(result, "122333444455555");
}

EXPECT_EQ(holder2->front().range().size(), 15);
EXPECT_EQ(holder2->front().range().left, 0);
ASSERT_EQ(cache.getUsedCacheSize(), 22);
}
ASSERT_FALSE(fs::exists(file_segment_paths.back()));
Expand Down

0 comments on commit a4bd689

Please sign in to comment.