Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
Refactor entry_iterator class
Browse files Browse the repository at this point in the history
Move entry_iterator implementation from 05_timestamp_based_order
to stream_helpers.
  • Loading branch information
KFilipek committed Jun 21, 2022
1 parent 4f4948b commit 49400dc
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 92 deletions.
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ if(TESTS_RAPIDCHECK)
add_test_generic(NAME singly_linked_list_state TRACERS none memcheck)

build_test_rc(NAME timestamp SRC_FILES unittest/timestamp.cpp LIBS miniasync)
add_test_generic(NAME timestamp TRACERS none)
add_test_generic(NAME timestamp TRACERS none memcheck pmemcheck)

build_test_rc(NAME util_common SRC_FILES unittest/util_common.cpp)
add_test_generic(NAME util_common TRACERS none)
Expand Down
146 changes: 145 additions & 1 deletion tests/common/stream_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,113 @@
#include <cstring>
#include <functional>
#include <map>
#include <numeric>
#include <string>
#include <tuple>
#include <vector>

#include "libpmemstream_internal.h"
#include "stream_span_helpers.hpp"
#include "thread_helpers.hpp"
#include "unittest.hpp"

// XXX: replace with actual global entry iterator once implemented
/* pmememstream entry iterator wrapper, which helps to manage entries from
* different regions in global order. */
template <typename T>
class entry_iterator {
public:
entry_iterator(pmemstream *stream, pmemstream_region &region) : stream(stream)
{
struct pmemstream_entry_iterator *new_entry_iterator;
if (pmemstream_entry_iterator_new(&new_entry_iterator, stream, region) != 0) {
throw std::runtime_error("Cannot create entry iterators");
}
it = std::shared_ptr<pmemstream_entry_iterator>(new_entry_iterator, [](pmemstream_entry_iterator *eit) {
pmemstream_entry_iterator_delete(&eit);
});
pmemstream_entry_iterator_seek_first(it.get());
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("No entries to iterate");
}
}

void operator++()
{
pmemstream_entry_iterator_next(it.get());
}

bool operator<(entry_iterator &other)
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0)
return false;

if (pmemstream_entry_iterator_is_valid(other.it.get()) != 0)
return true;

return get_timestamp() < other.get_timestamp();
}

bool operator==(entry_iterator &other)
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0)
return false;

if (pmemstream_entry_iterator_is_valid(other.it.get()) != 0)
return false;

return get_timestamp() == other.get_timestamp();
}

T get_data()
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("Invalid iterator");
}
return *reinterpret_cast<const T *>(
pmemstream_entry_data(stream, pmemstream_entry_iterator_get(it.get())));
}

uint64_t get_timestamp()
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("Invalid iterator");
}

auto this_entry = pmemstream_entry_iterator_get(it.get());
return pmemstream_entry_timestamp(stream, this_entry);
}

bool is_valid()
{
return pmemstream_entry_iterator_is_valid(it.get()) == 0;
}

pmemstream_entry_iterator *raw_iterator()
{
return it.get();
}

static std::vector<entry_iterator<T>> get_entry_iterators(pmemstream *stream,
std::vector<pmemstream_region> regions);

// static std::vector<pmemstream_entry *> get_stream_entries()
private:
pmemstream *stream;
std::shared_ptr<pmemstream_entry_iterator> it;
};

template <typename T>
std::vector<entry_iterator<T>> entry_iterator<T>::get_entry_iterators(pmemstream *stream,
std::vector<pmemstream_region> regions)
{
std::vector<entry_iterator<T>> entry_iterators;
for (auto &region : regions) {
entry_iterators.emplace_back(entry_iterator<T>(stream, region));
}
return entry_iterators;
}

static inline bool operator==(const struct pmemstream_region lhs, const struct pmemstream_region rhs)
{
return lhs.offset == rhs.offset;
Expand Down Expand Up @@ -524,6 +623,50 @@ struct pmemstream_helpers_type {
UT_ASSERT(std::equal(extra_data_start, all_elements.end(), extra_data.begin()));
}

std::vector<pmemstream_entry> get_entries_for_regions(std::vector<pmemstream_region> regions)
{
std::vector<pmemstream_entry> entries;
auto entry_iterators = entry_iterator<void>::get_entry_iterators(stream.c_ptr(), regions);
while (entry_iterators.size() > 0) {
auto oldest_iterator = std::min_element(entry_iterators.begin(), entry_iterators.end());
if (oldest_iterator->is_valid()) {
entries.push_back(pmemstream_entry_iterator_get(oldest_iterator->raw_iterator()));
} else {
entry_iterators.erase(entry_iterators.begin() +
std::distance(entry_iterators.begin(), oldest_iterator));
}
}
return entries;
}

bool validate_timestamps(bool possible_gaps = true)
{
auto regions = get_regions();
auto entries = get_entries_for_regions(regions);
std::vector<uint64_t> timestamps;
std::transform(entries.begin(), entries.end(), timestamps.begin(),
[this](decltype(get_entries_for_regions(regions))::value_type entry) {
return pmemstream_entry_timestamp(stream.c_ptr(), entry);
});

if (!std::is_sorted(timestamps.begin(), timestamps.end())) {
return false;
}

if (possible_gaps) {
// Check timestamp duplications
return std::adjacent_find(timestamps.begin(), timestamps.end()) == timestamps.end();
} else {
std::vector<uint64_t> correct_timestamps(timestamps.size());
std::iota(correct_timestamps.begin(), correct_timestamps.end(), PMEMSTREAM_FIRST_TIMESTAMP);
if (!std::includes(timestamps.begin(), timestamps.end(), correct_timestamps.begin(),
correct_timestamps.end())) {
return false;
}
}
return true;
}

pmem::stream &stream;
std::map<uint64_t, pmemstream_region_runtime *> region_runtime;
bool call_region_runtime_initialize = false;
Expand Down Expand Up @@ -558,7 +701,8 @@ struct pmemstream_test_base {
{
}

/* This function closes and reopens the stream. All pointers to stream data, iterators, etc. are invalidated. */
/* This function closes and reopens the stream. All pointers to stream data, iterators, etc. are
* invalidated. */
void reopen()
{
sut.close();
Expand Down
94 changes: 4 additions & 90 deletions tests/unittest/timestamp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,84 +43,6 @@ std::ostream &operator<<(std::ostream &os, const payload &data)
return os;
}

/* pmememstream entry iterator wrapper, which helps to manage entries from
* different regions in global order. */
class entry_iterator {
public:
entry_iterator(pmemstream *stream, pmemstream_region &region) : stream(stream)
{
struct pmemstream_entry_iterator *new_entry_iterator;
if (pmemstream_entry_iterator_new(&new_entry_iterator, stream, region) != 0) {
throw std::runtime_error("Cannot create entry iterators");
}
it = std::shared_ptr<pmemstream_entry_iterator>(new_entry_iterator, [](pmemstream_entry_iterator *eit) {
pmemstream_entry_iterator_delete(&eit);
});
pmemstream_entry_iterator_seek_first(it.get());
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("No entries to iterate");
}
}

void operator++()
{
pmemstream_entry_iterator_next(it.get());
}

bool operator<(entry_iterator &other)
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0)
return false;

if (pmemstream_entry_iterator_is_valid(other.it.get()) != 0)
return true;

return get_timestamp() < other.get_timestamp();
}

payload get_data()
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("Invalid iterator");
}
return *reinterpret_cast<const payload *>(
pmemstream_entry_data(stream, pmemstream_entry_iterator_get(it.get())));
}

uint64_t get_timestamp()
{
if (pmemstream_entry_iterator_is_valid(it.get()) != 0) {
throw std::runtime_error("Invalid iterator");
}

auto this_entry = pmemstream_entry_iterator_get(it.get());
return pmemstream_entry_timestamp(stream, this_entry);
}

bool is_valid()
{
return pmemstream_entry_iterator_is_valid(it.get()) == 0;
}

pmemstream_entry_iterator *raw_iterator()
{
return it.get();
}

private:
pmemstream *stream;
std::shared_ptr<pmemstream_entry_iterator> it;
};

std::vector<entry_iterator> get_entry_iterators(pmemstream *stream, std::vector<pmemstream_region> regions)
{
std::vector<entry_iterator> entry_iterators;
for (auto &region : regions) {
entry_iterators.emplace_back(entry_iterator(stream, region));
}
return entry_iterators;
}

int main(int argc, char *argv[])
{
if (argc != 2) {
Expand Down Expand Up @@ -159,8 +81,8 @@ int main(int argc, char *argv[])
});

// In region monotonicity check
std::vector<entry_iterator> entry_iterators =
get_entry_iterators(stream.sut.c_ptr(), regions);
auto entry_iterators =
entry_iterator<payload>::get_entry_iterators(stream.sut.c_ptr(), regions);
uint64_t prev_timestamp = PMEMSTREAM_INVALID_TIMESTAMP;
size_t entry_counter = 0;
for (auto e_iterator : entry_iterators) {
Expand All @@ -176,15 +98,7 @@ int main(int argc, char *argv[])
UT_ASSERTeq(entry_counter, no_elements * no_regions);

// Global ordering validation
entry_iterators = get_entry_iterators(stream.sut.c_ptr(), regions);
uint64_t expected_timestamp = PMEMSTREAM_FIRST_TIMESTAMP;
for (size_t i = 0; i < no_elements * no_regions; i++) {
auto oldest_data =
std::min_element(entry_iterators.begin(), entry_iterators.end());
payload entry = oldest_data->get_data();
UT_ASSERTeq(expected_timestamp++, oldest_data->get_timestamp());
++(*oldest_data);
}
UT_ASSERTeq(stream.helpers.validate_timestamps(false), true);
});
});
}
}

0 comments on commit 49400dc

Please sign in to comment.