Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Extra timestamp tests #217

Merged
merged 2 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ if(TESTS_RAPIDCHECK)
build_test_rc(NAME singly_linked_list_state SRC_FILES unittest/singly_linked_list_state.cpp)
add_test_generic(NAME singly_linked_list_state TRACERS none memcheck)

build_test_rc(NAME timestamp SRC_FILES unittest/timestamp.cpp LIBS miniasync)
add_test_generic(NAME timestamp TRACERS none memcheck pmemcheck)

build_test_rc(NAME util_common SRC_FILES unittest/util_common.cpp)
add_test_generic(NAME util_common TRACERS none)

Expand Down
148 changes: 145 additions & 3 deletions tests/common/stream_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
#include <cstring>
#include <functional>
#include <map>
#include <numeric>
#include <string>
#include <tuple>
#include <vector>

#define PMEMSTREAM_INVALID_TIMESTAMP (0ULL)
#define PMEMSTREAM_FIRST_TIMESTAMP (PMEMSTREAM_INVALID_TIMESTAMP + 1ULL)

#include "stream_span_helpers.hpp"
#include "unittest.hpp"

Expand Down Expand Up @@ -227,6 +231,84 @@ struct stream {
std::unique_ptr<struct pmemstream, std::function<void(struct pmemstream *)>> c_stream;
}; /* struct stream */

// XXX: replace with actual global entry iterator once implemented
/* pmememstream entry iterator wrapper, which helps to manage entries from
* different regions in global order. */
class timestamp_iterator {
public:
timestamp_iterator(pmemstream *stream, pmemstream_region &region) : stream(stream)
{
struct pmemstream_entry_iterator *new_entry_iterator;
if (pmemstream_entry_iterator_new(&new_entry_iterator, stream, region) != 0) {
throw std::runtime_error("Cannot create entry iterators");
}
it = std::shared_ptr<pmemstream_entry_iterator>(new_entry_iterator, [](pmemstream_entry_iterator *eit) {
pmemstream_entry_iterator_delete(&eit);
});
pmemstream_entry_iterator_seek_first(it.get());
}

void operator++()
{
pmemstream_entry_iterator_next(it.get());
}

bool operator<(timestamp_iterator &other)
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0)
return false;

if (pmemstream_entry_iterator_is_valid(other.it.get()) != 0)
return true;

return get_timestamp() < other.get_timestamp();
}

bool operator==(timestamp_iterator &other)
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0)
return false;

if (pmemstream_entry_iterator_is_valid(other.it.get()) != 0)
return false;

return get_timestamp() == other.get_timestamp();
}

uint64_t get_timestamp()
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("Invalid iterator");
}

auto this_entry = pmemstream_entry_iterator_get(it.get());
return pmemstream_entry_timestamp(stream, this_entry);
}

bool is_valid()
{
return pmemstream_entry_iterator_is_valid(it.get()) == 0;
}

pmemstream_entry_iterator *raw_iterator()
{
return it.get();
}

static std::vector<timestamp_iterator> get_entry_iterators(pmemstream *stream,
const std::vector<pmemstream_region> &regions)
{
std::vector<timestamp_iterator> entry_iterators;
for (auto region : regions) {
entry_iterators.emplace_back(timestamp_iterator(stream, region));
}
return entry_iterators;
}

private:
pmemstream *stream;
std::shared_ptr<pmemstream_entry_iterator> it;
}; /* class timestamp_iterator */
} // namespace pmem

template <typename FutureT>
Expand Down Expand Up @@ -284,7 +366,13 @@ struct pmemstream_helpers_type {
void append(struct pmemstream_region region, const std::vector<std::string> &data)
{
for (const auto &e : data) {
auto [ret, entry] = stream.append(region, e, region_runtime[region.offset]);
pmemstream_region_runtime *rrt = nullptr;
auto it = region_runtime.find(region.offset);
if (it != region_runtime.end()) {
rrt = it->second;
}

auto [ret, entry] = stream.append(region, e, rrt);
UT_ASSERTeq(ret, 0);
}
}
Expand All @@ -296,8 +384,12 @@ struct pmemstream_helpers_type {

struct pmemstream_entry entry;
for (size_t i = 0; i < data.size(); ++i) {
auto [ret, new_entry] =
stream.async_append(thread_mover, region, data[i], region_runtime[region.offset]);
pmemstream_region_runtime *rrt = nullptr;
auto it = region_runtime.find(region.offset);
if (it != region_runtime.end()) {
rrt = it->second;
}
auto [ret, new_entry] = stream.async_append(thread_mover, region, data[i], rrt);
UT_ASSERTeq(ret, 0);
entry = new_entry;
}
Expand Down Expand Up @@ -523,10 +615,60 @@ struct pmemstream_helpers_type {
UT_ASSERT(std::equal(extra_data_start, all_elements.end(), extra_data.begin()));
}

std::vector<pmemstream_entry> get_entries_from_regions(const std::vector<pmemstream_region> &regions)
{
std::vector<pmemstream_entry> entries;
auto entry_iterators = pmem::timestamp_iterator::get_entry_iterators(stream.c_ptr(), regions);
while (entry_iterators.size() > 0) {
auto oldest_iterator = std::min_element(entry_iterators.begin(), entry_iterators.end());
if (oldest_iterator->is_valid()) {
entries.push_back(pmemstream_entry_iterator_get(oldest_iterator->raw_iterator()));
++(*oldest_iterator);
} else {
entry_iterators.erase(oldest_iterator);
}
}
return entries;
}

/* checks timestamps' order across regions */
bool validate_timestamps_possible_gaps(const std::vector<struct pmemstream_region> &regions)
{
return validate_timestamps(regions, true);
}

/* checks timestamps' order across regions when expectation is to have all generated
* timestamps */
bool validate_timestamps_no_gaps(const std::vector<struct pmemstream_region> &regions)
{
return validate_timestamps(regions, false);
}

pmem::stream &stream;
std::map<uint64_t, pmemstream_region_runtime *> region_runtime;
bool call_region_runtime_initialize = false;
thread_data_mover_type thread_mover_handle;

private:
bool validate_timestamps(const std::vector<struct pmemstream_region> &regions, bool possible_gaps = true)
{
auto entries = get_entries_from_regions(regions);
std::vector<uint64_t> timestamps;
std::transform(entries.begin(), entries.end(), std::back_inserter(timestamps),
[this](const auto &entry) { return pmemstream_entry_timestamp(stream.c_ptr(), entry); });

if (!std::is_sorted(timestamps.begin(), timestamps.end())) {
return false;
}

if (!possible_gaps) {
return std::adjacent_find(timestamps.begin(), timestamps.end()) == timestamps.end() &&
timestamps.front() == PMEMSTREAM_FIRST_TIMESTAMP &&
timestamps.back() == PMEMSTREAM_FIRST_TIMESTAMP + timestamps.size() - 1;
}
/* Check timestamp duplications */
return std::adjacent_find(timestamps.begin(), timestamps.end()) == timestamps.end();
}
}; /* struct pmemstream_helpers_type */

struct pmemstream_test_base {
Expand Down
89 changes: 89 additions & 0 deletions tests/unittest/timestamp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2022, Intel Corporation */

#include "libpmemstream.h"
#include "rapidcheck_helpers.hpp"
#include "stream_helpers.hpp"
#include "thread_helpers.hpp"
#include "unittest.hpp"

/**
* timestamp - unit test for testing method pmemstream_entry_timestamp()
*/

void multithreaded_synchronous_append(pmemstream_test_base &stream, const std::vector<pmemstream_region> &regions,
const std::vector<std::string> &data)
{
parallel_exec(regions.size(), [&](size_t thread_id) { stream.helpers.append(regions[thread_id], data); });
}

int main(int argc, char *argv[])
{
if (argc != 2) {
std::cout << "Usage: " << argv[0] << " file-path" << std::endl;
return -1;
}

struct test_config_type test_config;
test_config.filename = std::string(argv[1]);

return run_test(test_config, [&] {
return_check ret;
ret += rc::check(
"timestamp values should increase in each region after synchronous append",
[&](pmemstream_with_multi_empty_regions &&stream, const std::vector<std::string> &data) {
RC_PRE(data.size() > 0);
auto regions = stream.helpers.get_regions();

/* Multithreaded append to many regions with global ordering. */
multithreaded_synchronous_append(stream, regions, data);

/* Single region ordering validation. */
for (auto &region : regions) {
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps({region}));
}
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after synchronous append",
[&](pmemstream_with_multi_empty_regions &&stream, const std::vector<std::string> &data) {
RC_PRE(data.size() > 0);
auto regions = stream.helpers.get_regions();

/* Multithreaded append to many regions with global ordering. */
multithreaded_synchronous_append(stream, regions, data);

/* Global ordering validation */
UT_ASSERT(stream.helpers.validate_timestamps_no_gaps(regions));
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after synchronous append to respawned region",
[&](pmemstream_with_multi_empty_regions &&stream, const std::vector<std::string> &data,
const std::vector<std::string> &extra_data) {
RC_PRE(data.size() > 0);
RC_PRE(extra_data.size() > 0);
auto regions = stream.helpers.get_regions();

/* Multithreaded append to many regions with global ordering. */
multithreaded_synchronous_append(stream, regions, data);

size_t pos = *rc::gen::inRange<size_t>(0, regions.size());
auto region_to_remove = regions[pos];
auto region_size = stream.sut.region_size(region_to_remove);
UT_ASSERTeq(stream.helpers.remove_region(region_to_remove.offset), 0);
regions.erase(regions.begin() + static_cast<int>(pos));

/* Global ordering validation. */
if (regions.size() > 1)
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));

regions.push_back(stream.helpers.initialize_single_region(region_size, extra_data));
UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
(regions.size() - 1) * data.size() + extra_data.size());
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));
});

// XXX: implement asynchronous cases
});
}