From 75ff7dd53395005ad62b9f3e3435188357487e2e Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 9 Oct 2024 14:13:11 +0100 Subject: [PATCH 1/4] Enhancement 1895: Fully parallelise processing in read_batch --- cpp/arcticdb/CMakeLists.txt | 1 + cpp/arcticdb/async/async_store.hpp | 6 - cpp/arcticdb/async/tasks.hpp | 22 - .../test/ingestion_stress_test.cpp | 12 +- cpp/arcticdb/pipeline/frame_slice.cpp | 2 +- cpp/arcticdb/pipeline/pipeline_utils.hpp | 2 +- cpp/arcticdb/pipeline/read_frame.cpp | 41 +- cpp/arcticdb/pipeline/read_frame.hpp | 6 +- cpp/arcticdb/processing/clause_utils.cpp | 25 ++ cpp/arcticdb/processing/clause_utils.hpp | 9 + cpp/arcticdb/processing/component_manager.cpp | 23 +- cpp/arcticdb/processing/component_manager.hpp | 43 +- .../test/test_parallel_processing.cpp | 166 ++++++++ cpp/arcticdb/storage/file/file_store.hpp | 9 +- cpp/arcticdb/storage/test/in_memory_store.hpp | 47 ++- .../storage/test/test_memory_storage.cpp | 2 +- cpp/arcticdb/stream/append_map.cpp | 9 +- cpp/arcticdb/stream/stream_source.hpp | 5 - .../version/local_versioned_engine.cpp | 200 +++------ .../version/local_versioned_engine.hpp | 12 +- cpp/arcticdb/version/python_bindings.cpp | 10 +- cpp/arcticdb/version/test/test_sparse.cpp | 46 +-- .../version/test/test_version_store.cpp | 19 +- cpp/arcticdb/version/version_core.cpp | 390 ++++++++---------- cpp/arcticdb/version/version_core.hpp | 38 +- .../version/version_map_batch_methods.cpp | 11 +- cpp/arcticdb/version/version_store_api.cpp | 12 +- cpp/arcticdb/version/version_store_api.hpp | 8 +- cpp/arcticdb/version/versioned_engine.hpp | 2 +- python/arcticdb/version_store/_store.py | 9 +- .../arcticdb/test_unicode_strings.py | 22 + .../version_store/test_basic_version_store.py | 92 +++-- .../version_store/test_incompletes.py | 46 +++ 33 files changed, 760 insertions(+), 587 deletions(-) create mode 100644 cpp/arcticdb/processing/test/test_parallel_processing.cpp create mode 100644 python/tests/unit/arcticdb/version_store/test_incompletes.py diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 86436146d7..4519233faf 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -910,6 +910,7 @@ if(${TEST}) processing/test/test_filter_and_project_sparse.cpp processing/test/test_has_valid_type_promotion.cpp processing/test/test_operation_dispatch.cpp + processing/test/test_parallel_processing.cpp processing/test/test_resample.cpp processing/test/test_set_membership.cpp processing/test/test_signed_unsigned_comparison.cpp diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index aea666e72c..bd712b8f06 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -251,12 +251,6 @@ folly::Future> read_timeseries_descr return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{}); } -folly::Future> read_timeseries_descriptor_for_incompletes( - const entity::VariantKey &key, - storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override { - return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorForIncompletesTask{}); -} - folly::Future key_exists(entity::VariantKey &&key) { return async::submit_io_task(KeyExistsTask{std::move(key), library_}); } diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index a15e5098b5..a24ab7f23f 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -551,28 +551,6 @@ struct DecodeTimeseriesDescriptorTask : BaseTask { } }; -struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask { - ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorForIncompletesTask) - - DecodeTimeseriesDescriptorForIncompletesTask() = default; - - std::pair operator()(storage::KeySegmentPair &&ks) const { - ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorForIncompletesTask, 0) - auto key_seg = std::move(ks); - ARCTICDB_DEBUG( - log::storage(), - "DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}", - variant_key_view(key_seg.variant_key())); - - auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment()); - - util::check(static_cast(maybe_desc), "Failed to decode timeseries descriptor"); - return std::make_pair( - std::move(key_seg.variant_key()), - std::move(*maybe_desc)); - } -}; - struct DecodeMetadataAndDescriptorTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataAndDescriptorTask) diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index 3fad155a9a..105c9e37ae 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -125,8 +125,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { ro.allow_sparse_ = true; ro.set_dynamic_schema(true); ro.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); @@ -213,8 +213,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { read_options.set_dynamic_schema(true); read_options.set_allow_sparse(true); read_options.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); @@ -266,8 +266,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { read_options.set_dynamic_schema(true); read_options.set_allow_sparse(true); read_options.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); diff --git a/cpp/arcticdb/pipeline/frame_slice.cpp b/cpp/arcticdb/pipeline/frame_slice.cpp index a7cb0dff26..d59a66372d 100644 --- a/cpp/arcticdb/pipeline/frame_slice.cpp +++ b/cpp/arcticdb/pipeline/frame_slice.cpp @@ -20,7 +20,7 @@ namespace arcticdb::pipelines { void SliceAndKey::ensure_segment(const std::shared_ptr& store) const { if(!segment_) - segment_ = store->read(*key_).get().second; + segment_ = store->read_sync(*key_).second; } SegmentInMemory& SliceAndKey::segment(const std::shared_ptr& store) { diff --git a/cpp/arcticdb/pipeline/pipeline_utils.hpp b/cpp/arcticdb/pipeline/pipeline_utils.hpp index d07bcd93ae..ae2666ed66 100644 --- a/cpp/arcticdb/pipeline/pipeline_utils.hpp +++ b/cpp/arcticdb/pipeline/pipeline_utils.hpp @@ -46,7 +46,7 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de auto descriptor = std::make_shared(frame_and_desc.frame_.descriptor()); pipeline_context->begin()->set_descriptor(std::move(descriptor)); auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data); + reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get(); apply_type_handlers(frame_and_desc.frame_, handler_data); return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc)); } diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index 471313cc36..de65763748 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -630,30 +630,42 @@ struct ReduceColumnTask : async::BaseTask { } }; - void reduce_and_fix_columns( - std::shared_ptr &context, - SegmentInMemory &frame, - const ReadOptions& read_options, - std::any& handler_data +folly::Future reduce_and_fix_columns( + std::shared_ptr &context, + SegmentInMemory &frame, + const ReadOptions& read_options, + std::any& handler_data ) { ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol) ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); if(frame.empty()) - return; + return folly::Unit{}; bool dynamic_schema = opt_false(read_options.dynamic_schema_); auto slice_map = std::make_shared(context, dynamic_schema); DecodePathData shared_data; - static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); - folly::collect(folly::window(frame.descriptor().fields().size(), [&] (size_t field) { - return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema)); - }, batch_size)).via(&async::io_executor()).get(); + // This logic mimics that in ReduceColumnTask operator() to identify whether the task will actually do any work + // This is to avoid scheduling work that is a no-op + std::vector fields_to_reduce; + for (size_t idx=0; idxcolumns_.contains(frame_field.name()) && is_sequence_type(frame_field.type().data_type()))) { + fields_to_reduce.emplace_back(idx); + } + } + static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); + return folly::collect( + folly::window(std::move(fields_to_reduce), + [context, frame, slice_map, shared_data, dynamic_schema, &handler_data] (size_t field) mutable { + return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema)); + }, batch_size)).via(&async::io_executor()).unit(); } -folly::Future> fetch_data( - const SegmentInMemory& frame, +folly::Future fetch_data( + SegmentInMemory&& frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, @@ -662,7 +674,7 @@ folly::Future> fetch_data( ) { ARCTICDB_SAMPLE_DEFAULT(FetchSlices) if (frame.empty()) - return {std::vector{}}; + return frame; std::vector> keys_and_continuations; keys_and_continuations.reserve(context->slice_and_keys_.size()); @@ -684,7 +696,8 @@ folly::Future> fetch_data( } } ARCTICDB_SUBSAMPLE_DEFAULT(DoBatchReadCompressed) - return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{}); + return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{}) + .thenValue([frame](auto&&){ return frame; }); } } // namespace read diff --git a/cpp/arcticdb/pipeline/read_frame.hpp b/cpp/arcticdb/pipeline/read_frame.hpp index d178b2a6dd..b05868533b 100644 --- a/cpp/arcticdb/pipeline/read_frame.hpp +++ b/cpp/arcticdb/pipeline/read_frame.hpp @@ -70,8 +70,8 @@ void mark_index_slices( bool dynamic_schema, bool column_groups); -folly::Future> fetch_data( - const SegmentInMemory& frame, +folly::Future fetch_data( + SegmentInMemory&& frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, @@ -92,7 +92,7 @@ void decode_into_frame_dynamic( const DecodePathData& shared_data, std::any& handler_data); -void reduce_and_fix_columns( +folly::Future reduce_and_fix_columns( std::shared_ptr &context, SegmentInMemory &frame, const ReadOptions& read_options, diff --git a/cpp/arcticdb/processing/clause_utils.cpp b/cpp/arcticdb/processing/clause_utils.cpp index 854ad5e39b..6b40a64e67 100644 --- a/cpp/arcticdb/processing/clause_utils.cpp +++ b/cpp/arcticdb/processing/clause_utils.cpp @@ -81,4 +81,29 @@ std::vector flatten_entities(std::vector>&& enti return res; } +std::vector> split_futures( + std::vector>&& segment_and_slice_futures) { + std::vector> res; + res.reserve(segment_and_slice_futures.size()); + for (auto&& future: segment_and_slice_futures) { + res.emplace_back(folly::splitFuture(std::move(future))); + } + return res; +} + +std::shared_ptr> generate_segment_fetch_counts( + const std::vector>& processing_unit_indexes, + size_t num_segments) { + auto res = std::make_shared>(num_segments, 0); + for (const auto& list: processing_unit_indexes) { + for (auto idx: list) { + res->at(idx)++; + } + } + debug::check( + std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }), + "All segments should be needed by at least one ProcessingUnit"); + return res; +} + } diff --git a/cpp/arcticdb/processing/clause_utils.hpp b/cpp/arcticdb/processing/clause_utils.hpp index 7bf93221d0..b0d7c044ec 100644 --- a/cpp/arcticdb/processing/clause_utils.hpp +++ b/cpp/arcticdb/processing/clause_utils.hpp @@ -12,6 +12,8 @@ #include #include +#include + #include #include #include @@ -242,4 +244,11 @@ std::vector push_entities(ComponentManager& component_manager, Process std::vector flatten_entities(std::vector>&& entity_ids_vec); +std::vector> split_futures( + std::vector>&& segment_and_slice_futures); + +std::shared_ptr> generate_segment_fetch_counts( + const std::vector>& processing_unit_indexes, + size_t num_segments); + }//namespace arcticdb diff --git a/cpp/arcticdb/processing/component_manager.cpp b/cpp/arcticdb/processing/component_manager.cpp index 5b413b6477..187ef90d1b 100644 --- a/cpp/arcticdb/processing/component_manager.cpp +++ b/cpp/arcticdb/processing/component_manager.cpp @@ -14,17 +14,26 @@ namespace arcticdb { std::vector ComponentManager::get_new_entity_ids(size_t count) { std::vector ids(count); - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); registry_.create(ids.begin(), ids.end()); return ids; } -void ComponentManager::erase_entity(EntityId id) { - // Ideally would call registry_.destroy(id), or at least registry_.erase>(id) - // at this point. However, they are both slower than this, so just decrement the ref count of the only - // sizeable component, so that when the shared pointer goes out of scope in the calling function, the - // memory is freed - registry_.get>(id).reset(); +void ComponentManager::decrement_entity_fetch_count(EntityId id) { + if (registry_.get>(id).fetch_sub(1) == 1) { + // This entity will never be accessed again + // Ideally would call registry_.destroy(id), or at least registry_.erase>(id) + // at this point. However, they are both slower than this, and would require taking a unique_lock on the + // shared_mutex, so just decrement the ref count of the only sizeable component, so that when the shared pointer + // goes out of scope in the calling function, the memory is freed + registry_.get>(id).reset(); + debug::check(!registry_.get>(id), + "SegmentInMemory memory retained in ComponentManager"); + } +} + +void ComponentManager::update_entity_fetch_count(EntityId id, EntityFetchCount count) { + registry_.get>(id).store(count); } diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index 406df05c10..df204f443a 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -8,6 +8,7 @@ #pragma once #include +#include #include @@ -23,8 +24,6 @@ using bucket_id = uint8_t; using namespace entt::literals; -constexpr auto remaining_entity_fetch_count_id = "remaining_entity_fetch_count"_hs; - class ComponentManager { public: ComponentManager() = default; @@ -35,16 +34,16 @@ class ComponentManager { // Add a single entity with the components defined by args template void add_entity(EntityId id, Args... args) { - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); ([&]{ registry_.emplace(id, args); // Store the initial entity fetch count component as a "first-class" entity, accessible by // registry_.get(id), as this is external facing (used by resample) // The remaining entity fetch count below will be decremented each time an entity is fetched, but is never - // accessed externally, so make this a named component. + // accessed externally. Stored as an atomic to minimise the requirement to take the shared_mutex with a + // unique_lock. if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - remaining_entity_fetch_count_registry.emplace(id, args); + registry_.emplace>(id, args); } }(), ...); } @@ -55,7 +54,7 @@ class ComponentManager { std::vector add_entities(Args... args) { std::vector ids; size_t entity_count{0}; - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); ([&]{ if (entity_count == 0) { // Reserve memory for the result on the first pass @@ -70,8 +69,9 @@ class ComponentManager { } registry_.insert(ids.cbegin(), ids.cend(), args.begin()); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - remaining_entity_fetch_count_registry.insert(ids.cbegin(), ids.cend(), args.begin()); + for (auto&& [idx, id]: folly::enumerate(ids)) { + registry_.emplace>(id, args[idx]); + } } }(), ...); return ids; @@ -79,24 +79,23 @@ class ComponentManager { template void replace_entities(const std::vector& ids, T value) { + std::unique_lock lock(mtx_); for (auto id: ids) { registry_.replace(id, value); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - // For some reason named storages don't have a replace API - remaining_entity_fetch_count_registry.patch(id, [value](EntityFetchCount& entity_fetch_count){ entity_fetch_count = value; }); + update_entity_fetch_count(id, value); } } } template void replace_entities(const std::vector& ids, const std::vector& values) { + internal::check(ids.size() == values.size(), "Received vectors of differing lengths in ComponentManager::replace_entities"); + std::unique_lock lock(mtx_); for (auto [idx, id]: folly::enumerate(ids)) { registry_.replace(id, values[idx]); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - // For some reason named storages don't have a replace API - remaining_entity_fetch_count_registry.patch(id, [&values, idx](EntityFetchCount& entity_fetch_count){ entity_fetch_count = values[idx]; }); + update_entity_fetch_count(id, values[idx]); } } } @@ -107,8 +106,7 @@ class ComponentManager { std::vector> tuple_res; tuple_res.reserve(ids.size()); { - std::lock_guard lock(mtx_); - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); + std::shared_lock lock(mtx_); // Using view.get theoretically and empirically faster than registry_.get auto view = registry_.view(); for (auto id: ids) { @@ -116,11 +114,7 @@ class ComponentManager { } if (decrement_fetch_count) { for (auto id: ids) { - auto& remaining_entity_fetch_count = remaining_entity_fetch_count_registry.get(id); - // This entity will never be accessed again - if (--remaining_entity_fetch_count == 0) { - erase_entity(id); - } + decrement_entity_fetch_count(id); } } } @@ -138,10 +132,11 @@ class ComponentManager { } private: - void erase_entity(EntityId id); + void decrement_entity_fetch_count(EntityId id); + void update_entity_fetch_count(EntityId id, EntityFetchCount count); entt::registry registry_; - std::mutex mtx_; + std::shared_mutex mtx_; }; } // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/test/test_parallel_processing.cpp b/cpp/arcticdb/processing/test/test_parallel_processing.cpp new file mode 100644 index 0000000000..5d2cb54a1b --- /dev/null +++ b/cpp/arcticdb/processing/test/test_parallel_processing.cpp @@ -0,0 +1,166 @@ +/* Copyright 2024 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace arcticdb::pipelines; + +struct RowSliceClause { + // Simple clause that accepts and produces segments partitioned by row-slice, which is representative of a lot of + // the real clauses we support. In place of doing any processing, the process method just sleeps for a random amount + // of time and then increments the stream id of each input segment. + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + + RowSliceClause() = default; + ARCTICDB_MOVE_COPY_DEFAULT(RowSliceClause) + + [[nodiscard]] std::vector> structure_for_processing(std::vector& ranges_and_keys) { + return structure_by_row_slice(ranges_and_keys); + } + + [[nodiscard]] std::vector> structure_for_processing(std::vector>&& entity_ids_vec) { + log::version().warn("RowSliceClause::structure_for_processing called"); + return structure_by_row_slice(*component_manager_, std::move(entity_ids_vec)); + } + + [[nodiscard]] std::vector process(std::vector&& entity_ids) const { + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{10, 100}; + auto sleep_ms = dist(eng); + log::version().warn("RowSliceClause::process sleeping for {}ms", sleep_ms); + std::this_thread::sleep_for(std::chrono::milliseconds{sleep_ms}); + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ++id; + segment->descriptor().set_id(id); + } + return push_entities(*component_manager_, std::move(proc)); + } + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig&) {} + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } +}; + +struct RestructuringClause { + // Simple clause that accepts non row-slice structured segments to stress the restructuring process (fan-in/fan-out) + // process method is the same as the RowSliceClause above + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + + RestructuringClause() { + clause_info_.input_structure_ = ProcessingStructure::ALL; + }; + ARCTICDB_MOVE_COPY_DEFAULT(RestructuringClause) + + [[nodiscard]] std::vector> structure_for_processing(std::vector& ranges_and_keys) { + return structure_by_row_slice(ranges_and_keys); + } + + [[nodiscard]] std::vector> structure_for_processing(std::vector>&& entity_ids_vec) { + log::version().warn("RestructuringClause::structure_for_processing called"); + return structure_by_row_slice(*component_manager_, std::move(entity_ids_vec)); + } + + [[nodiscard]] std::vector process(std::vector&& entity_ids) const { + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{10, 100}; + auto sleep_ms = dist(eng); + log::version().warn("RestructuringClause::process sleeping for {}ms", sleep_ms); + std::this_thread::sleep_for(std::chrono::milliseconds{sleep_ms}); + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ++id; + segment->descriptor().set_id(id); + } + return push_entities(*component_manager_, std::move(proc)); + } + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig&) {} + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } +}; + +TEST(Clause, ScheduleClauseProcessingStress) { + // Extensible stress test of schedule_clause_processing. Useful for ensuring a lack of deadlock when running on + // threadpools with 1 or multiple cores. Dummy clauses provided above used to stress the fan-in/fan-out behaviour. + // Could be extended to profile and compare different scheduling algorithms and threadpool implementations if we + // want to move away from folly. + using namespace arcticdb::version_store; + auto num_clauses = 5; + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{0, 1}; + + auto clauses = std::make_shared>>(); + for (auto unused=0; unusedemplace_back(std::make_shared(RowSliceClause())); + } else { + clauses->emplace_back(std::make_shared(RestructuringClause())); + } + } + + auto component_manager = std::make_shared(); + for (auto& clause: *clauses) { + clause->set_component_manager(component_manager); + } + + size_t num_segments{2}; + std::vector> segment_and_slice_promises(num_segments); + std::vector> segment_and_slice_futures; + std::vector> processing_unit_indexes; + for (size_t idx = 0; idx < num_segments; ++idx) { + segment_and_slice_futures.emplace_back(segment_and_slice_promises[idx].getFuture()); + processing_unit_indexes.emplace_back(std::vector{idx}); + } + + auto processed_entity_ids_fut = schedule_clause_processing(component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + clauses); + + for (size_t idx = 0; idx < segment_and_slice_promises.size(); ++idx) { + SegmentInMemory segment; + segment.descriptor().set_id(static_cast(idx)); + segment_and_slice_promises[idx].setValue(SegmentAndSlice(RangesAndKey({idx, idx+1}, {0, 1}, {}), std::move(segment))); + } + + auto processed_entity_ids = std::move(processed_entity_ids_fut).get(); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + ASSERT_EQ(proc.segments_.value().size(), num_segments); + NumericId start_id{0}; + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ASSERT_EQ(id, start_id++ + num_clauses); + } +} diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 32b1f59743..4b6e04e436 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -107,9 +107,10 @@ void write_dataframe_to_file_internal( version_store::ReadVersionOutput read_dataframe_from_file_internal( const StreamId& stream_id, const std::string& path, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, - const arcticdb::proto::encoding::VariantCodec &codec_opts) { + const arcticdb::proto::encoding::VariantCodec &codec_opts, + std::any& handler_data) { auto config = storage::file::pack_config(path, codec_opts); storage::LibraryPath lib_path{std::string{"file"}, fmt::format("{}", stream_id)}; auto library = create_library(lib_path, storage::OpenMode::WRITE, {std::move(config)}); @@ -126,8 +127,6 @@ version_store::ReadVersionOutput read_dataframe_from_file_internal( const auto header_offset = key_data.key_offset_ + key_data.key_size_; ARCTICDB_DEBUG(log::storage(), "Got header offset at {}", header_offset); single_file_storage->load_header(header_offset, data_end - header_offset); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - auto frame_and_descriptor = version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data); - return {std::move(versioned_item), std::move(frame_and_descriptor)}; + return version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get(); } } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index 68951f4397..d351e19905 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -206,15 +206,35 @@ namespace arcticdb { }); } - folly::Future read_compressed(const entity::VariantKey&, storage::ReadKeyOpts) override { - throw std::runtime_error("Not implemented"); + folly::Future read_compressed(const entity::VariantKey& key, storage::ReadKeyOpts opts) override { + // Anything read_compressed_sync() throws should be returned inside the Future, so: + return folly::makeFutureWith([&](){ return read_compressed_sync(key, opts); }); } storage::KeySegmentPair read_compressed_sync( - const entity::VariantKey&, + const entity::VariantKey& key, storage::ReadKeyOpts ) override { - throw std::runtime_error("Not implemented"); + StorageFailureSimulator::instance()->go(FailureType::READ); + std::lock_guard lock{mutex_}; + auto segment_in_memory = util::variant_match( + key, + [&] (const RefKey& ref_key) { + auto it = seg_by_ref_key_.find(ref_key); + if (it == seg_by_ref_key_.end()) + throw storage::KeyNotFoundException(Composite(ref_key)); + ARCTICDB_DEBUG(log::storage(), "Mock store returning compressed ref key {}", ref_key); + return it->second->clone(); + }, + [&] (const AtomKey& atom_key) { + auto it = seg_by_atom_key_.find(atom_key); + if (it == seg_by_atom_key_.end()) + throw storage::KeyNotFoundException(Composite(atom_key)); + ARCTICDB_DEBUG(log::storage(), "Mock store returning compressed atom key {}", atom_key); + return it->second->clone(); + }); + auto key_copy = key; + return storage::KeySegmentPair(std::move(key_copy), ::arcticdb::encode_dispatch(std::move(segment_in_memory), codec_, EncodingVersion::V1)); } folly::Future> read(const VariantKey& key, storage::ReadKeyOpts opts) override { @@ -432,24 +452,6 @@ namespace arcticdb { }); } - folly::Future, arcticdb::TimeseriesDescriptor>> - read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key, - storage::ReadKeyOpts /*opts*/) override { - return util::variant_match(key, [&](const AtomKey &ak) { - auto it = seg_by_atom_key_.find(ak); - if (it == seg_by_atom_key_.end()) - throw storage::KeyNotFoundException(Composite(ak)); - ARCTICDB_DEBUG(log::storage(), "Mock store read for atom key {}", ak); - auto tsd = it->second->index_descriptor(); - tsd.set_stream_descriptor(it->second->descriptor()); - return std::make_pair(key, it->second->index_descriptor()); - }, - [&](const RefKey&) { - util::raise_rte("Not implemented"); - return std::make_pair(key, TimeseriesDescriptor{}); - }); - } - void set_failure_sim(const arcticdb::proto::storage::VersionStoreConfig::StorageFailureSimulator &) override {} std::string name() const override { @@ -475,6 +477,7 @@ namespace arcticdb { std::recursive_mutex mutex_; // Allow iterate_type() to be re-entrant std::unordered_map> seg_by_atom_key_; std::unordered_map> seg_by_ref_key_; + arcticdb::proto::encoding::VariantCodec codec_; }; } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index 6e095f100f..894076118a 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -27,7 +27,7 @@ TEST(InMemory, ReadTwice) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result1 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); diff --git a/cpp/arcticdb/stream/append_map.cpp b/cpp/arcticdb/stream/append_map.cpp index 77df44dff2..c61267b122 100644 --- a/cpp/arcticdb/stream/append_map.cpp +++ b/cpp/arcticdb/stream/append_map.cpp @@ -5,6 +5,7 @@ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ +#include #include #include #include @@ -379,11 +380,13 @@ std::pair> get_descriptor_a bool load_data, storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) { if(load_data) { - auto [key, seg] = store->read_sync(k, opts); + auto seg = store->read_sync(k, opts).second; return std::make_pair(seg.index_descriptor(), std::make_optional(seg)); } else { - auto [key, tsd] = store->read_timeseries_descriptor_for_incompletes(k, opts).get(); - return std::make_pair(std::move(tsd), std::nullopt); + auto seg_ptr = store->read_compressed_sync(k, opts).segment_ptr(); + auto tsd = decode_timeseries_descriptor_for_incompletes(*seg_ptr); + internal::check(tsd.has_value(), "Failed to decode timeseries descriptor"); + return std::make_pair(std::move(*tsd), std::nullopt); } } diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 9fa6aa8488..401fa8a4ac 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -82,11 +82,6 @@ struct StreamSource { virtual folly::Future> read_timeseries_descriptor(const entity::VariantKey& key, storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; - - virtual folly::Future> - read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key, - storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; - }; } // namespace arcticdb::stream \ No newline at end of file diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 3cd88a7258..da52561a14 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -360,14 +360,13 @@ std::variant get_version_identifier( ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) { py::gil_scoped_release release_gil; auto version = get_version_to_read(stream_id, version_query); const auto identifier = get_version_identifier(stream_id, version_query, read_options, version); - auto frame_and_descriptor = read_frame_for_version(store(), identifier, read_query, read_options, handler_data); - return ReadVersionOutput{version.value_or(VersionedItem{}), std::move(frame_and_descriptor)}; + return read_frame_for_version(store(), identifier, read_query, read_options, handler_data).get(); } folly::Future LocalVersionedEngine::get_descriptor( @@ -406,14 +405,14 @@ folly::Future LocalVersionedEngine::get_descriptor( } folly::Future LocalVersionedEngine::get_descriptor_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query){ - return std::move(version_fut) - .thenValue([this, &stream_id, &version_query](std::optional&& key){ - missing_data::check(key.has_value(), + return std::move(opt_index_key_fut) + .thenValue([this, &stream_id, &version_query](std::optional&& opt_index_key){ + missing_data::check(opt_index_key.has_value(), "Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query); - return get_descriptor(std::move(key.value())); + return get_descriptor(std::move(*opt_index_key)); }).via(&async::cpu_executor()); } @@ -437,11 +436,11 @@ std::vector> LocalVersionedEngine::batch internal::check(read_options.batch_throw_on_error_.has_value(), "ReadOptions::batch_throw_on_error_ should always be set here"); - auto version_futures = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector> descriptor_futures; - for (auto&& [idx, version_fut]: folly::enumerate(version_futures)) { + for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) { descriptor_futures.emplace_back( - get_descriptor_async(std::move(version_fut), stream_ids[idx], version_queries[idx])); + get_descriptor_async(std::move(opt_index_key_fut), stream_ids[idx], version_queries[idx])); } auto descriptors = folly::collectAll(descriptor_futures).get(); std::vector> descriptors_or_errors; @@ -1050,77 +1049,55 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea return versioned_item; } -folly::Future async_read_direct( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options) { - return async_read_direct_impl(store, index_key, std::move(index_segment), read_query, shared_data, handler_data, read_options); -} - std::vector LocalVersionedEngine::batch_read_keys(const std::vector &keys) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); py::gil_scoped_release release_gil; - std::vector>> index_futures; - for (auto &index_key: keys) { - index_futures.emplace_back(store()->read(index_key)); - } - auto indexes = folly::collect(index_futures).get(); - std::vector> results_fut; - auto i = 0u; - for (auto&& [index_key, index_segment] : indexes) { - DecodePathData shared_data; - results_fut.emplace_back(async_read_direct(store(), - keys[i], - std::move(index_segment), - std::make_shared(), - shared_data, - handler_data, - ReadOptions{})); - ++i; + std::vector> res; + res.reserve(keys.size()); + for (const auto& index_key: keys) { + res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared(), ReadOptions{}, handler_data)); } Allocator::instance()->trim(); - return folly::collect(results_fut).get(); + return folly::collect(res).get(); } -std::vector> LocalVersionedEngine::temp_batch_read_internal_direct( - const std::vector &stream_ids, - const std::vector &version_queries, - std::vector> &read_queries, - const ReadOptions &read_options) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); +std::vector> LocalVersionedEngine::batch_read_internal( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::any& handler_data) { py::gil_scoped_release release_gil; - - auto versions = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + // This read option should always be set when calling batch_read + internal::check(read_options.batch_throw_on_error_.has_value(), + "ReadOptions::batch_throw_on_error_ should always be set here"); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector> read_versions_futs; - for (auto&& [idx, version] : folly::enumerate(versions)) { - read_versions_futs.emplace_back(std::move(version) - .thenValue([store = store()](auto&& maybe_index_key) { - missing_data::check( - maybe_index_key.has_value(), - "Version not found for symbol"); - return store->read(*maybe_index_key); - }) - .thenValue([store = store(), - read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], - read_options, - &handler_data](auto&& key_segment_pair) { - auto [index_key, index_segment] = std::move(key_segment_pair); - return async_read_direct(store, - index_key, - std::move(index_segment), - read_query, - DecodePathData{}, - handler_data, - read_options); - }) + for (auto&& [idx, opt_index_key_fut] : folly::enumerate(opt_index_key_futs)) { + read_versions_futs.emplace_back( + std::move(opt_index_key_fut).thenValue([store = store(), + idx, + &stream_ids, + &version_queries, + read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], + &read_options, + &handler_data](auto&& opt_index_key) { + std::variant version_info; + if (opt_index_key.has_value()) { + version_info = VersionedItem(std::move(*opt_index_key)); + } else { + if (opt_false(read_options.incompletes_)) { + log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_ids[idx]); + version_info = stream_ids[idx]; + } else { + missing_data::raise( + "batch_read_internal: version matching query '{}' not found for symbol '{}'", version_queries[idx], stream_ids[idx]); + } + } + return read_frame_for_version(store, version_info, read_query, read_options, handler_data); + }) ); } - // TODO: https://github.com/man-group/ArcticDB/issues/241 - // Move everything from here to the end of the function out into batch_read_internal as part of #241 auto read_versions = folly::collectAll(read_versions_futs).get(); std::vector> read_versions_or_errors; read_versions_or_errors.reserve(read_versions.size()); @@ -1135,7 +1112,8 @@ std::vector> LocalVersionedEngine::te DataError data_error(stream_ids[idx], exception.what().toStdString(), version_queries[idx].content_); if (exception.is_compatible_with()) { data_error.set_error_code(ErrorCode::E_NO_SUCH_VERSION); - } else if (exception.is_compatible_with()) { + } else if (exception.is_compatible_with() || + exception.is_compatible_with()) { data_error.set_error_code(ErrorCode::E_KEY_NOT_FOUND); } read_versions_or_errors.emplace_back(std::move(data_error)); @@ -1145,68 +1123,6 @@ std::vector> LocalVersionedEngine::te return read_versions_or_errors; } -std::vector> LocalVersionedEngine::batch_read_internal( - const std::vector& stream_ids, - const std::vector& version_queries, - std::vector>& read_queries, - const ReadOptions& read_options, - std::any& handler_data) { - // This read option should always be set when calling batch_read - internal::check(read_options.batch_throw_on_error_.has_value(), - "ReadOptions::batch_throw_on_error_ should always be set here"); - - if(std::none_of(std::begin(read_queries), std::end(read_queries), [] (const auto& read_query) { - return !read_query->clauses_.empty(); - })) { - return temp_batch_read_internal_direct(stream_ids, version_queries, read_queries, read_options); - } - - std::vector> read_versions_or_errors; - read_versions_or_errors.reserve(stream_ids.size()); - for (size_t i=0; i < stream_ids.size(); ++i) { - auto version_query = version_queries.size() > i ? version_queries[i] : VersionQuery{}; - auto read_query = read_queries.size() > i ? *read_queries[i] : ReadQuery{}; - // TODO: https://github.com/man-group/ArcticDB/issues/241 - // Remove this try-catch in favour of the implementation in temp_batch_read_internal_direct as part of #241 - try { - auto read_version = read_dataframe_version_internal(stream_ids[i], version_query, read_query, read_options, handler_data); - read_versions_or_errors.emplace_back(std::move(read_version)); - } catch (const NoSuchVersionException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_NO_SUCH_VERSION)); - } catch (const storage::NoDataFoundException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_KEY_NOT_FOUND)); - } catch (const storage::KeyNotFoundException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_KEY_NOT_FOUND)); - } catch (const std::exception& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_)); - } - } - return read_versions_or_errors; -} - void LocalVersionedEngine::write_version_and_prune_previous( bool prune_previous_versions, const AtomKey& new_version, @@ -1593,15 +1509,15 @@ folly::Future, std::optional>> LocalVersionedEngine::get_metadata_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query ) { - return std::move(version_fut) - .thenValue([this, &stream_id, &version_query](std::optional&& key){ - missing_data::check(key.has_value(), + return std::move(opt_index_key_fut) + .thenValue([this, &stream_id, &version_query](std::optional&& opt_index_key){ + missing_data::check(opt_index_key.has_value(), "Unable to retrieve metadata. {}@{}: version not found", stream_id, version_query); - return get_metadata(std::move(key)); + return get_metadata(std::move(*opt_index_key)); }) .thenValue([](auto&& metadata){ auto&& [opt_key, meta_proto] = metadata; @@ -1617,10 +1533,10 @@ std::vector(read_options.batch_throw_on_error_.has_value(), "ReadOptions::batch_throw_on_error_ should always be set here"); - auto version_futures = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector>>> metadata_futures; - for (auto&& [idx, version]: folly::enumerate(version_futures)) { - metadata_futures.emplace_back(get_metadata_async(std::move(version), stream_ids[idx], version_queries[idx])); + for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) { + metadata_futures.emplace_back(get_metadata_async(std::move(opt_index_key_fut), stream_ids[idx], version_queries[idx])); } auto metadatas = folly::collectAll(metadata_futures).get(); diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index d450549394..da140ceec4 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -141,7 +141,7 @@ class LocalVersionedEngine : public VersionedEngine { ReadVersionOutput read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) override; @@ -192,7 +192,7 @@ class LocalVersionedEngine : public VersionedEngine { std::optional&& key); folly::Future>> get_metadata_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query); @@ -200,7 +200,7 @@ class LocalVersionedEngine : public VersionedEngine { AtomKey&& key); folly::Future get_descriptor_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query); @@ -286,12 +286,6 @@ class LocalVersionedEngine : public VersionedEngine { const ReadOptions& read_options, std::any& handler_data); - std::vector> temp_batch_read_internal_direct( - const std::vector& stream_ids, - const std::vector& version_queries, - std::vector>& read_queries, - const ReadOptions& read_options); - std::vector> batch_read_descriptor_internal( const std::vector& stream_ids, const std::vector& version_queries, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index bd837a4e46..00a5398629 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -179,8 +179,9 @@ void register_bindings(py::module &version, py::exception read_query){ + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return adapt_read_df(read_dataframe_from_file(sid, path, read_query, handler_data)); }); using FrameDataWrapper = arcticdb::pipelines::FrameDataWrapper; @@ -615,7 +616,7 @@ void register_bindings(py::module &version, py::exception(), "Write a specific version of this dataframe to the store") .def("read_dataframe_version", - [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, ReadQuery& read_query, const ReadOptions& read_options) { + [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, std::shared_ptr read_query, const ReadOptions& read_options) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data)); }, @@ -715,7 +716,8 @@ void register_bindings(py::module &version, py::exception& version_queries, std::vector>& read_queries, const ReadOptions& read_options){ - return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options)); + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data)); }, py::call_guard(), "Read a dataframe from the store") .def("batch_read_keys", diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index 92241eaa72..4182fae934 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -84,8 +84,8 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -176,8 +176,8 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -227,8 +227,8 @@ TEST_F(SparseTestStore, DenseToSparse) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -276,8 +276,8 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); const auto& frame = read_result.frame_data.frame();; @@ -330,8 +330,8 @@ TEST_F(SparseTestStore, Multiblock) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -383,8 +383,8 @@ TEST_F(SparseTestStore, Segment) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -443,8 +443,8 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -503,9 +503,9 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.columns = {"time", "first"}; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->columns = {"time", "first"}; + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -560,8 +560,8 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = IndexRange(timestamp{3000}, timestamp{6999}); + auto read_query = std::make_shared(); + read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -614,8 +614,8 @@ TEST_F(SparseTestStore, Compact) { ReadOptions read_options; read_options.set_dynamic_schema(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -673,8 +673,8 @@ TEST_F(SparseTestStore, CompactWithStrings) { ReadOptions read_options; read_options.set_dynamic_schema(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); const auto& frame = read_result.frame_data.frame(); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 4e2ef14ff5..060d3813a2 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -256,7 +256,7 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { } auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -366,8 +366,9 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { std::vector> read_queries; ReadOptions read_options; register_native_handler_data_factory(); + auto handler_data = get_type_handler_data(); read_options.set_batch_throw_on_error(true); - auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options); + auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options, handler_data); for(auto&& [idx, version] : folly::enumerate(latest_versions)) { auto expected = get_test_simple_frame(std::get(version).item.symbol(), 10, idx); bool equal = expected.segment_ == std::get(version).frame_data.frame(); @@ -531,7 +532,7 @@ TEST(VersionStore, UpdateWithin) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -574,7 +575,7 @@ TEST(VersionStore, UpdateBefore) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -617,7 +618,7 @@ TEST(VersionStore, UpdateAfter) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -661,7 +662,7 @@ TEST(VersionStore, UpdateIntersectBefore) { get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -705,7 +706,7 @@ TEST(VersionStore, UpdateIntersectAfter) { get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -759,7 +760,7 @@ TEST(VersionStore, UpdateWithinSchemaChange) { ReadOptions read_options; read_options.set_dynamic_schema(true); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); @@ -822,7 +823,7 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { ReadOptions read_options; read_options.set_dynamic_schema(true); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 14d3fbb193..0be39e11b1 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -404,54 +404,7 @@ void set_row_id_for_empty_columns_set( } } -// This is a parallelizable direct read (no processing pipeline) that is used -// for things that need to get multiple objects in their entirety, as with -// recursive normalizers. Outside those specific situtations it's probably -// not what you want -folly::Future async_read_direct_impl( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options) { - auto index_segment_reader = std::make_shared(std::move(index_segment)); - const auto& tsd = index_segment_reader->tsd(); - check_column_and_date_range_filterable(*index_segment_reader, *read_query); - add_index_columns_to_query(*read_query, tsd); - read_query->calculate_row_filter(static_cast(tsd.total_rows())); - - auto pipeline_context = std::make_shared(StreamDescriptor{tsd.as_stream_descriptor()}); - pipeline_context->set_selected_columns(read_query->columns); - - const bool dynamic_schema = opt_false(read_options.dynamic_schema_); - const bool bucketize_dynamic = index_segment_reader->bucketize_dynamic(); - - auto queries = get_column_bitset_and_query_functions( - *read_query, - pipeline_context, - dynamic_schema, - bucketize_dynamic); - - pipeline_context->slice_and_keys_ = filter_index(*index_segment_reader, combine_filter_functions(queries)); - - generate_filtered_field_descriptors(pipeline_context, read_query->columns); - mark_index_slices(pipeline_context, dynamic_schema, bucketize_dynamic); - auto frame = allocate_frame(pipeline_context); - - return fetch_data(frame, pipeline_context, store, dynamic_schema, shared_data, handler_data).thenValue( - [pipeline_context, frame, read_options, &handler_data](auto &&) mutable { - reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data); - }).thenValue( - [index_segment_reader, frame, index_key, shared_data, read_query, pipeline_context](auto &&) mutable { - set_row_id_for_empty_columns_set(*read_query, *pipeline_context, frame, index_segment_reader->tsd().total_rows() - 1); - return ReadVersionOutput{VersionedItem{to_atom(index_key)}, - FrameAndDescriptor{frame, std::move(index_segment_reader->mutable_tsd()), {}, shared_data.buffers()}}; - }); -} - -FrameAndDescriptor read_multi_key( +folly::Future read_multi_key( const std::shared_ptr& store, const SegmentInMemory& index_key_seg, std::any& handler_data) { @@ -461,61 +414,69 @@ FrameAndDescriptor read_multi_key( } AtomKey dup{keys[0]}; - ReadQuery read_query; VersionedItem versioned_item{std::move(dup)}; - auto res = read_frame_for_version(store, versioned_item, read_query, ReadOptions{}, handler_data); TimeseriesDescriptor multi_key_desc{index_key_seg.index_descriptor()}; - multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(res.desc_.proto().normalization()); - return {res.frame_, multi_key_desc, keys, std::shared_ptr{}}; + return read_frame_for_version(store, versioned_item, std::make_shared(), ReadOptions{}, handler_data) + .thenValue([multi_key_desc=std::move(multi_key_desc), keys=std::move(keys)](ReadVersionOutput&& read_version_output) mutable { + multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(read_version_output.frame_and_descriptor_.desc_.proto().normalization()); + read_version_output.frame_and_descriptor_.desc_ = std::move(multi_key_desc); + read_version_output.frame_and_descriptor_.keys_ = std::move(keys); + read_version_output.frame_and_descriptor_.buffers_ = std::make_shared(); + return std::move(read_version_output); + }); } -std::vector process_clauses( - std::shared_ptr component_manager, - std::vector>&& segment_and_slice_futures, - std::vector>&& processing_unit_indexes, - std::shared_ptr>> clauses) { - // There are some odd looking choices in this method: - // - clauses being shared_ptr> - // - segment_proc_unit_counts, entity_added_mtx, and entity_added created as shared pointers rather than just on the - // stack - // Both are for the same reason. folly::collect short-circuits and throws an exception the first time a task - // finishes due to an exception rather than cleanly exiting. However, other tasks that have already been enqueued - // continue executing, and so any variables from this scope that they depend on must be kept alive by the tasks - // themselves. - // It was considered to make the type of ReadQuery::clauses_ std::shared_ptr>. However, this - // makes all the other uses of clauses_ much less clean, so the compromise is an odd function signature here. - - std::vector> segment_and_slice_future_splitters; - segment_and_slice_future_splitters.reserve(segment_and_slice_futures.size()); - for (auto&& future: segment_and_slice_futures) { - segment_and_slice_future_splitters.emplace_back(folly::splitFuture(std::move(future))); +size_t generate_scheduling_iterations(const std::vector>& clauses) { + size_t res{1}; + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ != (*it)->clause_info().input_structure_) { + ++res; + } + ++it; } + return res; +} - // Map from index in segment_and_slice_future_splitters to the number of processing units that require that segment - auto segment_proc_unit_counts = std::make_shared>(segment_and_slice_futures.size(), 0); - for (const auto& list: processing_unit_indexes) { - for (auto idx: list) { - internal::check( - idx < segment_proc_unit_counts->size(), - "Index {} in processing_unit_indexes out of bounds >{}", idx, segment_proc_unit_counts->size() - 1); - (*segment_proc_unit_counts)[idx]++; +void remove_processed_clauses(std::vector>& clauses) { + // Erase all the clauses we have already scheduled to run + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { + ++it; + } else { + break; } } - internal::check( - std::all_of(segment_proc_unit_counts->begin(), segment_proc_unit_counts->end(), [](const size_t& val) { return val != 0; }), - "All segments should be needed by at least one ProcessingUnit"); - // Map from position in segment_and_slice_futures to entity ids + clauses.erase(clauses.cbegin(), it); +} + +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses) { + // All the shared pointers as arguments to this function and created within it are to ensure that resources are + // correctly kept alive after this function returns it's future + auto num_segments = segment_and_slice_futures.size(); + auto segment_and_slice_future_splitters = split_futures(std::move(segment_and_slice_futures)); + + // Map from index in segment_and_slice_future_splitters to the number of calls to process in the first clause that + // will require that segment + auto segment_fetch_counts = generate_segment_fetch_counts(processing_unit_indexes, num_segments); + // Map from position in segment_and_slice_future_splitters to entity ids std::vector pos_to_id; // Map from entity id to position in segment_and_slice_futures auto id_to_pos = std::make_shared>(); - pos_to_id.reserve(segment_and_slice_futures.size()); - auto ids = component_manager->get_new_entity_ids(segment_and_slice_futures.size()); + pos_to_id.reserve(num_segments); + auto ids = component_manager->get_new_entity_ids(num_segments); for (auto&& [idx, id]: folly::enumerate(ids)) { pos_to_id.emplace_back(id); id_to_pos->emplace(id, idx); } - // Give this a more descriptive name as we modify it between clauses std::vector> entity_ids_vec; entity_ids_vec.reserve(processing_unit_indexes.size()); for (const auto& indexes: processing_unit_indexes) { @@ -527,23 +488,21 @@ std::vector process_clauses( } // Used to make sure each entity is only added into the component manager once - auto slice_added_mtx = std::make_shared>(segment_and_slice_futures.size()); - auto slice_added = std::make_shared>(segment_and_slice_futures.size(), false); - std::vector>> futures; - bool first_clause{true}; - while (!clauses->empty()) { - for (auto&& entity_ids: entity_ids_vec) { - if (first_clause) { - std::vector> local_futs; - local_futs.reserve(entity_ids.size()); - for (auto id: entity_ids) { - local_futs.emplace_back(segment_and_slice_future_splitters[id_to_pos->at(id)].getFuture()); - } - futures.emplace_back( - folly::collect(local_futs) + auto slice_added_mtx = std::make_shared>(num_segments); + auto slice_added = std::make_shared>(num_segments, false); + auto futures = std::make_shared>>>(); + + for (auto&& entity_ids: entity_ids_vec) { + std::vector> local_futs; + local_futs.reserve(entity_ids.size()); + for (auto id: entity_ids) { + local_futs.emplace_back(segment_and_slice_future_splitters[id_to_pos->at(id)].getFuture()); + } + futures->emplace_back( + folly::collect(local_futs) .via(&async::cpu_executor()) .thenValue([component_manager, - segment_proc_unit_counts, + segment_fetch_counts, id_to_pos, slice_added_mtx, slice_added, @@ -560,41 +519,44 @@ std::vector process_clauses( std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), - (*segment_proc_unit_counts)[pos] - ); + (*segment_fetch_counts)[pos] + ); (*slice_added)[pos] = true; } } return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))(); })); - } else { - futures.emplace_back( - async::submit_cpu_task( - async::MemSegmentProcessingTask(*clauses, - std::move(entity_ids)) - ) - ); - } - } - first_clause = false; - entity_ids_vec = folly::collect(futures).get(); - futures.clear(); - // Erase all the clauses we have already called process on - auto it = std::next(clauses->cbegin()); - while (it != clauses->cend()) { - auto prev_it = std::prev(it); - if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { - ++it; - } else { - break; - } - } - clauses->erase(clauses->cbegin(), it); - if (!clauses->empty()) { - entity_ids_vec = clauses->front()->structure_for_processing(std::move(entity_ids_vec)); + } + + auto entity_ids_vec_fut = folly::Future>>::makeEmpty(); + // The number of iterations we need to pass through the following loop to get all the work scheduled + auto scheduling_iterations = generate_scheduling_iterations(*clauses); + for (size_t i=0; i work_scheduled(folly::Unit{}); + if (i > 0) { + work_scheduled = entity_ids_vec_fut.via(&async::cpu_executor()).thenValue([clauses, futures](std::vector>&& entity_ids_vec) { + futures->clear(); + for (auto&& entity_ids: entity_ids_vec) { + futures->emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask(*clauses, std::move(entity_ids)))); + } + return folly::Unit{}; + }); } + + entity_ids_vec_fut = work_scheduled.via(&async::cpu_executor()).thenValue([clauses, futures](auto&&) { + return folly::collect(*futures).via(&async::cpu_executor()).thenValue([clauses](std::vector>&& entity_ids_vec) { + remove_processed_clauses(*clauses); + if (clauses->empty()) { + return entity_ids_vec; + } else { + return clauses->front()->structure_for_processing(std::move(entity_ids_vec)); + } + }); + }); } - return flatten_entities(std::move(entity_ids_vec)); + return entity_ids_vec_fut.via(&async::cpu_executor()).thenValue([](std::vector>&& entity_ids_vec) { + return flatten_entities(std::move(entity_ids_vec)); + }); } void set_output_descriptors( @@ -730,15 +692,15 @@ std::vector generate_ranges_and_keys(const std::vector read_and_process( +folly::Future> read_and_process( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - const ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options ) { auto component_manager = std::make_shared(); ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; - for (auto& clause: read_query.clauses_) { + for (auto& clause: read_query->clauses_) { clause->set_processing_config(processing_config); clause->set_component_manager(component_manager); } @@ -749,39 +711,27 @@ std::vector read_and_process( // Each element of the vector corresponds to one processing unit containing the list of indexes in ranges_and_keys required for that processing unit // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] // then the structure will be {{0, 1}, {2, 3}} - std::vector> processing_unit_indexes = read_query.clauses_[0]->structure_for_processing(ranges_and_keys); + std::vector> processing_unit_indexes = read_query->clauses_[0]->structure_for_processing(ranges_and_keys); // Start reading as early as possible auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(ranges_and_keys), columns_to_decode(pipeline_context)); - auto processed_entity_ids = process_clauses(component_manager, - std::move(segment_and_slice_futures), - std::move(processing_unit_indexes), - std::make_shared>>(read_query.clauses_)); - auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - - if (std::any_of(read_query.clauses_.begin(), read_query.clauses_.end(), [](const std::shared_ptr& clause) { - return clause->clause_info().modifies_output_descriptor_; - })) { - set_output_descriptors(proc, read_query.clauses_, pipeline_context); - } - return collect_segments(std::move(proc)); -} - -SegmentInMemory read_direct(const std::shared_ptr& store, - const std::shared_ptr& pipeline_context, - DecodePathData shared_data, - const ReadOptions& read_options, - std::any& handler_data) { - ARCTICDB_DEBUG(log::version(), "Allocating frame"); - ARCTICDB_SAMPLE_DEFAULT(ReadDirect) - auto frame = allocate_frame(pipeline_context); - util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); - - ARCTICDB_DEBUG(log::version(), "Fetching frame data"); - fetch_data(frame, pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data).get(); - util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); - return frame; + return schedule_clause_processing(component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + std::make_shared>>( + read_query->clauses_)) + .via(&async::cpu_executor()) + .thenValue([component_manager, read_query, pipeline_context](auto&& processed_entity_ids) { + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + + if (std::any_of(read_query->clauses_.begin(), read_query->clauses_.end(), [](const std::shared_ptr& clause) { + return clause->clause_info().modifies_output_descriptor_; + })) { + set_output_descriptors(proc, read_query->clauses_, pipeline_context); + } + return collect_segments(std::move(proc)); + }); } void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDescriptor& desc) { @@ -918,7 +868,7 @@ bool read_incompletes_to_pipeline( // In order to have the right normalization metadata and descriptor we need to find the first non-empty segment. // Picking an empty segment when there are non-empty ones will impact the index type and column namings. - // If all segments are empty we will procede as if were appending/writing and empty dataframe. + // If all segments are empty we will proceed as if were appending/writing and empty dataframe. debug::check(!incomplete_segments.empty(), "Incomplete segments must be non-empty"); const auto first_non_empty_seg = std::find_if(incomplete_segments.begin(), incomplete_segments.end(), [&](auto& slice){ return slice.segment(store).row_count() > 0; @@ -1130,7 +1080,7 @@ void copy_frame_data_to_buffer( struct CopyToBufferTask : async::BaseTask { SegmentInMemory&& source_segment_; - SegmentInMemory& target_segment_; + SegmentInMemory target_segment_; FrameSlice frame_slice_; DecodePathData shared_data_; std::any& handler_data_; @@ -1138,7 +1088,7 @@ struct CopyToBufferTask : async::BaseTask { CopyToBufferTask( SegmentInMemory&& source_segment, - SegmentInMemory& target_segment, + const SegmentInMemory& target_segment, FrameSlice frame_slice, DecodePathData shared_data, std::any& handler_data, @@ -1178,10 +1128,10 @@ struct CopyToBufferTask : async::BaseTask { } }; -void copy_segments_to_frame( +folly::Future copy_segments_to_frame( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - SegmentInMemory& frame, + SegmentInMemory frame, std::any& handler_data) { std::vector> copy_tasks; DecodePathData shared_data; @@ -1198,10 +1148,10 @@ void copy_segments_to_frame( handler_data, context_row->fetch_index()})); } - folly::collect(copy_tasks).get(); + return folly::collect(copy_tasks).via(&async::cpu_executor()).unit(); } -SegmentInMemory prepare_output_frame( +folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, const std::shared_ptr& store, @@ -1224,9 +1174,7 @@ SegmentInMemory prepare_output_frame( } auto frame = allocate_frame(pipeline_context); - copy_segments_to_frame(store, pipeline_context, frame, handler_data); - - return frame; + return copy_segments_to_frame(store, pipeline_context, frame, handler_data).thenValue([frame](auto&&){ return frame; }); } AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key) { @@ -1251,7 +1199,7 @@ void create_column_stats_impl( log::version().warn("Cannot create empty column stats"); return; } - ReadQuery read_query({std::make_shared(std::move(*clause))}); + auto read_query = std::make_shared(std::vector>{std::make_shared(std::move(*clause))}); auto column_stats_key = index_key_to_column_stats_key(versioned_item.key_); std::optional old_segment; @@ -1270,7 +1218,7 @@ void create_column_stats_impl( auto pipeline_context = std::make_shared(); pipeline_context->stream_id_ = versioned_item.key_.id(); - read_indexed_keys_to_pipeline(store, pipeline_context, versioned_item, read_query, read_options); + read_indexed_keys_to_pipeline(store, pipeline_context, versioned_item, *read_query, read_options); schema::check( !pipeline_context->multi_key_, @@ -1281,7 +1229,7 @@ void create_column_stats_impl( "Cannot create column stats on pickled data" ); - auto segs = read_and_process(store, pipeline_context, read_query, read_options); + auto segs = read_and_process(store, pipeline_context, read_query, read_options).get(); schema::check(!segs.empty(), "Cannot create column stats for nonexistent columns"); // Convert SliceAndKey vector into SegmentInMemory vector @@ -1377,26 +1325,29 @@ ColumnStats get_column_stats_info_impl( } } -SegmentInMemory do_direct_read_or_process( +folly::Future do_direct_read_or_process( const std::shared_ptr& store, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, const std::shared_ptr& pipeline_context, const DecodePathData& shared_data, std::any& handler_data) { - SegmentInMemory frame; - if(!read_query.clauses_.empty()) { + if(!read_query->clauses_.empty()) { ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) util::check_rte(!pipeline_context->is_pickled(),"Cannot filter pickled data"); - auto segs = read_and_process(store, pipeline_context, read_query, read_options); - frame = prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); + return read_and_process(store, pipeline_context, read_query, read_options) + .thenValue([store, pipeline_context, &read_options, &handler_data](auto&& segs) { + return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); + }); } else { ARCTICDB_SAMPLE(MarkAndReadDirect, 0) - util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query.row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); + util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query->row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); mark_index_slices(pipeline_context, opt_false(read_options.dynamic_schema_), pipeline_context->bucketize_dynamic_); - frame = read_direct(store, pipeline_context, shared_data, read_options, handler_data); + auto frame = allocate_frame(pipeline_context); + util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); + ARCTICDB_DEBUG(log::version(), "Fetching frame data"); + return fetch_data(std::move(frame), pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data); } - return frame; } VersionedItem collate_and_write( @@ -1496,14 +1447,14 @@ VersionedItem sort_merge_impl( auto pipeline_context = std::make_shared(); pipeline_context->stream_id_ = stream_id; pipeline_context->version_id_ = update_info.next_version_id_; - ReadQuery read_query; + auto read_query = std::make_shared(); std::optional previous_sorted_value; const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_ ? IncompleteKeysRAII{pipeline_context, store, &options} : IncompleteKeysRAII{}; if(options.append_ && update_info.previous_index_key_.has_value()) { - read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, ReadOptions{}); + read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, ReadOptions{}); if (!write_options.dynamic_schema) { user_input::check( pipeline_context->slice_and_keys_.front().slice().columns() == pipeline_context->slice_and_keys_.back().slice().columns(), @@ -1517,7 +1468,7 @@ VersionedItem sort_merge_impl( const bool has_incomplete_segments = read_incompletes_to_pipeline( store, pipeline_context, - read_query, + *read_query, ReadOptions{}, options.convert_int_to_float_, options.via_iteration_, @@ -1534,19 +1485,19 @@ VersionedItem sort_merge_impl( auto index = stream::index_type_from_descriptor(pipeline_context->descriptor()); util::variant_match(index, [&](const stream::TimeseriesIndex ×eries_index) { - read_query.clauses_.emplace_back(std::make_shared(SortClause{timeseries_index.name(), pipeline_context->incompletes_after()})); - read_query.clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{})); + read_query->clauses_.emplace_back(std::make_shared(SortClause{timeseries_index.name(), pipeline_context->incompletes_after()})); + read_query->clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{})); //const auto split_size = ConfigsMap::instance()->get_int("Split.RowCount", 10000); - //read_query.clauses_.emplace_back(std::make_shared(SplitClause{static_cast(split_size)})); + //read_query->clauses_.emplace_back(std::make_shared(SplitClause{static_cast(split_size)})); - read_query.clauses_.emplace_back(std::make_shared(MergeClause{ + read_query->clauses_.emplace_back(std::make_shared(MergeClause{ timeseries_index, SparseColumnPolicy{}, stream_id, pipeline_context->descriptor(), write_options.dynamic_schema })); - auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}); + auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}).get(); if (options.append_ && update_info.previous_index_key_ && !segments.empty()) { const timestamp last_index_on_disc = update_info.previous_index_key_->end_time() - 1; const timestamp incomplete_start = @@ -1715,8 +1666,8 @@ PredefragmentationInfo get_pre_defragmentation_info( pipeline_context->stream_id_ = stream_id; pipeline_context->version_id_ = update_info.next_version_id_; - ReadQuery read_query; - read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, defragmentation_read_options_generator(options)); + auto read_query = std::make_shared(); + read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, defragmentation_read_options_generator(options)); using CompactionStartInfo = std::pair;//row, segment_append_after std::vector first_col_segment_idx; @@ -1776,8 +1727,8 @@ VersionedItem defragment_symbol_data_impl( util::variant_match(std::move(policies), [ &fut_vec, &slices, &store, &options, &pre_defragmentation_info, segment_size=segment_size] (auto &&idx, auto &&schema) { - pre_defragmentation_info.read_query.clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{pre_defragmentation_info.append_after.value()})); - auto segments = read_and_process(store, pre_defragmentation_info.pipeline_context, pre_defragmentation_info.read_query, defragmentation_read_options_generator(options)); + pre_defragmentation_info.read_query->clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{pre_defragmentation_info.append_after.value()})); + auto segments = read_and_process(store, pre_defragmentation_info.pipeline_context, pre_defragmentation_info.read_query, defragmentation_read_options_generator(options)).get(); using IndexType = std::remove_reference_t; using SchemaType = std::remove_reference_t; do_compact( @@ -1817,33 +1768,44 @@ void set_row_id_if_index_only( // This is the main user-facing read method that either returns all or // part of a dataframe as-is, or transforms it via a processing pipeline -FrameAndDescriptor read_frame_for_version( +folly::Future read_frame_for_version( const std::shared_ptr& store, const std::variant& version_info, - ReadQuery& read_query, + const std::shared_ptr& read_query , const ReadOptions& read_options, std::any& handler_data) { using namespace arcticdb::pipelines; auto pipeline_context = std::make_shared(); + VersionedItem res_versioned_item; if(std::holds_alternative(version_info)) { pipeline_context->stream_id_ = std::get(version_info); + // This isn't ideal. It would be better if the version() and timestamp() methods on the C++ VersionedItem class + // returned optionals, but this change would bubble up to the Python VersionedItem class defined in _store.py. + // This class is very hard to change at this point, as users do things like pickling them to pass them around. + // This at least gets the symbol attribute of VersionedItem correct. The creation timestamp will be zero, which + // corresponds to 1970, and so with this obviously ridiculous version ID, it should be clear to users that these + // values are meaningless before an indexed version exists. + res_versioned_item = VersionedItem(AtomKeyBuilder() + .version_id(std::numeric_limits::max()) + .build(std::get(version_info))); } else { pipeline_context->stream_id_ = std::get(version_info).key_.id(); - read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), read_query, read_options); + read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), *read_query, read_options); + res_versioned_item = std::get(version_info); } if(pipeline_context->multi_key_) { - check_multi_key_is_not_index_only(*pipeline_context, read_query); + check_multi_key_is_not_index_only(*pipeline_context, *read_query); return read_multi_key(store, *pipeline_context->multi_key_, handler_data); } if(opt_false(read_options.incompletes_)) { - util::check(std::holds_alternative(read_query.row_filter), "Streaming read requires date range filter"); - const auto& query_range = std::get(read_query.row_filter); + util::check(std::holds_alternative(read_query->row_filter), "Streaming read requires date range filter"); + const auto& query_range = std::get(read_query->row_filter); const auto existing_range = pipeline_context->index_range(); if(!existing_range.specified_ || query_range.end_ > existing_range.end_) - read_incompletes_to_pipeline(store, pipeline_context, read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema_)); + read_incompletes_to_pipeline(store, pipeline_context, *read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema_)); } if(std::holds_alternative(version_info) && !pipeline_context->incompletes_after_) { @@ -1852,16 +1814,24 @@ FrameAndDescriptor read_frame_for_version( } modify_descriptor(pipeline_context, read_options); - generate_filtered_field_descriptors(pipeline_context, read_query.columns); + generate_filtered_field_descriptors(pipeline_context, read_query->columns); ARCTICDB_DEBUG(log::version(), "Fetching data to frame"); DecodePathData shared_data; - auto frame = version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data); - - ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); - reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data); - set_row_id_if_index_only(*pipeline_context, frame, read_query); - return {frame, timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), {}, shared_data.buffers()}; + return version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data) + .thenValue([res_versioned_item, pipeline_context, &read_options, &handler_data, read_query, shared_data](auto&& frame) mutable { + ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); + return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data) + .via(&async::cpu_executor()) + .thenValue([res_versioned_item, pipeline_context, frame, read_query, shared_data](auto&&) mutable { + set_row_id_if_index_only(*pipeline_context, frame, *read_query); + return ReadVersionOutput{std::move(res_versioned_item), + {frame, + timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), + {}, + shared_data.buffers()}}; + }); + }); } } //namespace arcticdb::version_store diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index a465b9c576..7e04f29e7f 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -125,11 +125,17 @@ ColumnStats get_column_stats_info_impl( const std::shared_ptr& store, const VersionedItem& versioned_item); -FrameAndDescriptor read_multi_key( +folly::Future read_multi_key( const std::shared_ptr& store, const SegmentInMemory& index_key_seg, std::any& handler_data); +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses); + FrameAndDescriptor read_segment_impl( const std::shared_ptr& store, const VariantKey& key); @@ -148,35 +154,11 @@ VersionedItem compact_incomplete_impl( struct PredefragmentationInfo{ std::shared_ptr pipeline_context; - ReadQuery read_query; + std::shared_ptr read_query; size_t segments_need_compaction; std::optional append_after; }; -folly::Future async_read_direct_impl( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options); - -SegmentInMemory prepare_output_frame( - std::vector&& items, - const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, - const ReadOptions& read_options, - std::any& handler_data); - -SegmentInMemory do_direct_read_or_process( - const std::shared_ptr& store, - ReadQuery& read_query, - const ReadOptions& read_options, - const std::shared_ptr& pipeline_context, - const DecodePathData& shared_data, - std::any& handler_data); - PredefragmentationInfo get_pre_defragmentation_info( const std::shared_ptr& store, const StreamId& stream_id, @@ -217,10 +199,10 @@ void add_index_columns_to_query( const ReadQuery& read_query, const TimeseriesDescriptor& desc); -FrameAndDescriptor read_frame_for_version( +folly::Future read_frame_for_version( const std::shared_ptr& store, const std::variant& version_info, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data ); diff --git a/cpp/arcticdb/version/version_map_batch_methods.cpp b/cpp/arcticdb/version/version_map_batch_methods.cpp index a6d0b31ddb..6010b2a784 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.cpp +++ b/cpp/arcticdb/version/version_map_batch_methods.cpp @@ -267,14 +267,19 @@ std::vector>> batch_get_versions_async( }); output.push_back(std::move(version_entry_fut) + .via(&async::cpu_executor()) .thenValue([vq = version_query, sid = *symbol](auto version_or_snapshot) { return util::variant_match(version_or_snapshot, [&vq](const std::shared_ptr &version_map_entry) { return get_key_for_version_query(version_map_entry, vq); }, - [&sid](std::optional snapshot) { - if (!snapshot) - return std::make_optional(); + [&vq, &sid](std::optional snapshot) -> std::optional { + missing_data::check( + snapshot, + "batch_get_versions_async: version matching query '{}' not found for symbol '{}'", + vq, + sid + ); auto [snap_key, snap_segment] = std::move(*snapshot); auto opt_id = row_id_for_stream_in_snapshot_segment( diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index aa394087e8..bbb1d10b5e 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -769,8 +769,8 @@ std::vector> PythonVersionStore::batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, - const ReadOptions& read_options) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + const ReadOptions& read_options, + std::any& handler_data) { auto read_versions_or_errors = batch_read_internal(stream_ids, version_queries, read_queries, read_options, handler_data); std::vector> res; for (auto&& [idx, read_version_or_error]: folly::enumerate(read_versions_or_errors)) { @@ -834,7 +834,7 @@ void PythonVersionStore::delete_snapshot_sync(const SnapshotId& snap_name, const ReadResult PythonVersionStore::read_dataframe_version( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) { @@ -1151,13 +1151,15 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId &stream_id, const std::string& path, - ReadQuery& read_query) { + const std::shared_ptr& read_query, + std::any& handler_data) { auto opt_version_and_frame = read_dataframe_from_file_internal( stream_id, path, read_query, ReadOptions{}, - codec::default_lz4_codec()); + codec::default_lz4_codec(), + handler_data); return create_python_read_result(opt_version_and_frame.versioned_item_, std::move(opt_version_and_frame.frame_and_descriptor_)); } diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index d1607eca27..0ace64bf73 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -167,7 +167,7 @@ class PythonVersionStore : public LocalVersionedEngine { ReadResult read_dataframe_version( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data); @@ -297,7 +297,8 @@ class PythonVersionStore : public LocalVersionedEngine { const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, - const ReadOptions& read_options); + const ReadOptions& read_options, + std::any& handler_data); std::vector, DataError>> batch_read_metadata( const std::vector& stream_ids, @@ -349,7 +350,8 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId &stream_id, const std::string& path, - ReadQuery& read_query); + const std::shared_ptr& read_query, + std::any& handler_data); struct ManualClockVersionStore : PythonVersionStore { ManualClockVersionStore(const std::shared_ptr& library) : diff --git a/cpp/arcticdb/version/versioned_engine.hpp b/cpp/arcticdb/version/versioned_engine.hpp index 2e47689dd5..f38f302a02 100644 --- a/cpp/arcticdb/version/versioned_engine.hpp +++ b/cpp/arcticdb/version/versioned_engine.hpp @@ -101,7 +101,7 @@ class VersionedEngine { virtual ReadVersionOutput read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) = 0; diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index f3e4dbc313..b13b577fd0 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -94,13 +94,18 @@ class VersionedItem: For data retrieval (read) operations, contains the data read. For data modification operations, the value might not be populated. version: int - For data retrieval operations, the version the `as_of` argument resolved to. + For data retrieval operations, the version the `as_of` argument resolved to. In the special case where no + versions have been written yet, but data is being read exclusively from incomplete segments, this will be + 2^64-1. For data modification operations, the version the data has been written under. metadata: Any The metadata saved alongside `data`. Availability depends on the method used and may be different from that of `data`. host: Optional[str] Informational / for backwards compatibility. + timestamp: Optional[int] + The time in nanoseconds since epoch that this version was written. In the special case where no versions have + been written yet, but data is being read exclusively from incomplete segments, this will be 0. """ symbol: str = attr.ib() @@ -1859,7 +1864,7 @@ def _post_process_dataframe(self, read_result, read_query, implement_read_index= original_data = Flattener().create_original_obj_from_metastruct_new(meta_struct, key_map) return VersionedItem( - symbol=vitem.symbol, + symbol=meta_struct["symbol"], library=vitem.library, data=original_data, version=vitem.version, diff --git a/python/tests/integration/arcticdb/test_unicode_strings.py b/python/tests/integration/arcticdb/test_unicode_strings.py index 020add623f..771ab50a54 100644 --- a/python/tests/integration/arcticdb/test_unicode_strings.py +++ b/python/tests/integration/arcticdb/test_unicode_strings.py @@ -1,8 +1,12 @@ +import copy import os from pandas.testing import assert_frame_equal import pandas as pd import numpy as np +from arcticdb import QueryBuilder + + def read_strings(): script_directory = os.path.dirname(os.path.abspath(__file__)) file_path = "{}/blns.txt".format(script_directory) @@ -65,6 +69,24 @@ def test_update_blns(lmdb_version_store): assert_frame_equal(df, vit.data) +def test_batch_read_blns(lmdb_version_store): + lib = lmdb_version_store + strings = read_strings() + num_symbols = 10 + symbols = [f"blns_batch_read_{idx}" for idx in range(num_symbols)] + dfs = [create_dataframe(strings) for _ in range(num_symbols)] + lib.batch_write(symbols, dfs) + q = QueryBuilder() + q = q[q["ints"] > 50] + qbs = (num_symbols // 2) * [None, copy.deepcopy(q)] + res = lib.batch_read(symbols, query_builder=qbs) + for idx, sym in enumerate(symbols): + expected = dfs[idx] + if idx % 2 == 1: + expected = expected[expected["ints"] > 50] + assert_frame_equal(expected, res[sym].data) + + def assert_dicts_of_dfs_equal(dict1, dict2): assert dict1.keys() == dict2.keys(), "Dictionary keys do not match" diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 0ead498da6..963ec7b371 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -1274,7 +1274,8 @@ def equals(x, y): assert x == y -def test_recursively_written_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data(basic_store, batch): samples = [ {"a": np.arange(5), "b": np.arange(8)}, # dict of np arrays (np.arange(5), np.arange(6)), # tuple of np arrays @@ -1283,41 +1284,64 @@ def test_recursively_written_data(basic_store): ] for idx, sample in enumerate(samples): - basic_store.write("sym_recursive" + str(idx), sample, recursive_normalizers=True) - basic_store.write("sym_pickle" + str(idx), sample) # pickled writes - recursive_data = basic_store.read("sym_recursive" + str(idx)).data - pickled_data = basic_store.read("sym_pickle" + str(idx)).data - equals(sample, recursive_data) - equals(pickled_data, recursive_data) + recursive_sym = "sym_recursive" + str(idx) + pickled_sym = "sym_pickled" + str(idx) + basic_store.write(recursive_sym, sample, recursive_normalizers=True) + basic_store.write(pickled_sym, sample) # pickled writes + if batch: + recursive_vit = basic_store.batch_read([recursive_sym])[recursive_sym] + pickled_vit = basic_store.batch_read([pickled_sym])[pickled_sym] + else: + recursive_vit = basic_store.read(recursive_sym) + pickled_vit = basic_store.read(pickled_sym) + equals(sample, recursive_vit.data) + equals(pickled_vit.data, recursive_vit.data) + assert recursive_vit.symbol == recursive_sym + assert pickled_vit.symbol == pickled_sym -def test_recursively_written_data_with_metadata(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data_with_metadata(basic_store, batch): samples = [ {"a": np.arange(5), "b": np.arange(8)}, # dict of np arrays (np.arange(5), np.arange(6)), # tuple of np arrays ] for idx, sample in enumerate(samples): - vit = basic_store.write( - "sym_recursive" + str(idx), sample, metadata={"something": 1}, recursive_normalizers=True - ) - recursive_data = basic_store.read("sym_recursive" + str(idx)).data - equals(sample, recursive_data) - assert vit.metadata == {"something": 1} + sym = "sym_recursive" + str(idx) + metadata = {"something": 1} + basic_store.write(sym, sample, metadata=metadata, recursive_normalizers=True) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(sample, vit.data) + assert vit.symbol == sym + assert vit.metadata == metadata -def test_recursively_written_data_with_nones(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data_with_nones(basic_store, batch): sample = {"a": np.arange(5), "b": np.arange(8), "c": None} - - basic_store.write("sym_recursive", sample, recursive_normalizers=True) - basic_store.write("sym_pickle", sample) # pickled writes - recursive_data = basic_store.read("sym_recursive").data - pickled_data = basic_store.read("sym_recursive").data - equals(sample, recursive_data) - equals(pickled_data, recursive_data) + recursive_sym = "sym_recursive" + pickled_sym = "sym_pickled" + basic_store.write(recursive_sym, sample, recursive_normalizers=True) + basic_store.write(pickled_sym, sample) # pickled writes + if batch: + recursive_vit = basic_store.batch_read([recursive_sym])[recursive_sym] + pickled_vit = basic_store.batch_read([pickled_sym])[pickled_sym] + else: + recursive_vit = basic_store.read(recursive_sym) + pickled_vit = basic_store.read(pickled_sym) + equals(sample, recursive_vit.data) + equals(pickled_vit.data, recursive_vit.data) + assert recursive_vit.symbol == recursive_sym + assert pickled_vit.symbol == pickled_sym -def test_recursive_nested_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursive_nested_data(basic_store, batch): + sym = "test_recursive_nested_data" sample_data = {"a": {"b": {"c": {"d": np.arange(24)}}}} fl = Flattener() assert fl.can_flatten(sample_data) @@ -1326,8 +1350,13 @@ def test_recursive_nested_data(basic_store): assert len(to_write) == 1 equals(list(to_write.values())[0], np.arange(24)) - basic_store.write("s", sample_data, recursive_normalizers=True) - equals(basic_store.read("s").data, sample_data) + basic_store.write(sym, sample_data, recursive_normalizers=True) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(vit.data, sample_data) + assert vit.symbol == sym def test_named_tuple_flattening_rejected(): @@ -1374,13 +1403,20 @@ def test_recursive_normalizer_with_custom_class(): assert fl.is_normalizable_to_nested_structure(list_like_obj) -def test_really_large_symbol_for_recursive_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_really_large_symbol_for_recursive_data(basic_store, batch): + sym = "s" * 100 data = {"a" * 100: {"b" * 100: {"c" * 1000: {"d": np.arange(5)}}}} - basic_store.write("s" * 100, data, recursive_normalizers=True) + basic_store.write(sym, data, recursive_normalizers=True) fl = Flattener() metastruct, to_write = fl.create_meta_structure(data, "s" * 100) assert len(list(to_write.keys())[0]) < fl.MAX_KEY_LENGTH - equals(basic_store.read("s" * 100).data, data) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(vit.data, data) + assert vit.symbol == sym def test_too_much_recursive_metastruct_data(monkeypatch, lmdb_version_store_v1): diff --git a/python/tests/unit/arcticdb/version_store/test_incompletes.py b/python/tests/unit/arcticdb/version_store/test_incompletes.py new file mode 100644 index 0000000000..522c96d126 --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_incompletes.py @@ -0,0 +1,46 @@ +""" +Copyright 2024 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" +import numpy as np +import pandas as pd +import pytest +from arcticdb.util.test import assert_frame_equal + + +@pytest.mark.parametrize("batch", (True, False)) +def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch): + lib = lmdb_version_store_v1 + sym = "test_read_incompletes_with_indexed_data" + num_rows = 10 + df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) + lib.write(sym, df.iloc[:num_rows // 2]) + for idx in range(num_rows // 2, num_rows): + lib.write(sym, df.iloc[idx: idx+1], incomplete=True) + assert lib.has_symbol(sym) + if batch: + received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] + else: + received_vit = lib.read(sym, date_range=(df.index[1], df.index[-2]), incomplete=True) + assert received_vit.symbol == sym + assert_frame_equal(df.iloc[1:-1], received_vit.data) + + +@pytest.mark.parametrize("batch", (True, False)) +def test_read_incompletes_no_indexed_data(lmdb_version_store_v1, batch): + lib = lmdb_version_store_v1 + sym = "test_read_incompletes_no_indexed_data" + num_rows = 10 + df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) + for idx in range(num_rows): + lib.write(sym, df.iloc[idx: idx+1], incomplete=True) + assert not lib.has_symbol(sym) + if batch: + received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] + else: + received_vit = lib.read(sym, date_range=(df.index[1], df.index[-2]), incomplete=True) + assert received_vit.symbol == sym + assert_frame_equal(df.iloc[1:-1], received_vit.data) From ded29e33c2e6acedfd30c5091a9cf1c8258d4bc6 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 6 Nov 2024 12:36:42 +0000 Subject: [PATCH 2/4] Use lib tool append_incomplete --- .../tests/unit/arcticdb/version_store/test_incompletes.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/tests/unit/arcticdb/version_store/test_incompletes.py b/python/tests/unit/arcticdb/version_store/test_incompletes.py index 522c96d126..e9ed2bacf6 100644 --- a/python/tests/unit/arcticdb/version_store/test_incompletes.py +++ b/python/tests/unit/arcticdb/version_store/test_incompletes.py @@ -14,12 +14,13 @@ @pytest.mark.parametrize("batch", (True, False)) def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch): lib = lmdb_version_store_v1 + lib_tool = lib.library_tool() sym = "test_read_incompletes_with_indexed_data" num_rows = 10 df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) lib.write(sym, df.iloc[:num_rows // 2]) for idx in range(num_rows // 2, num_rows): - lib.write(sym, df.iloc[idx: idx+1], incomplete=True) + lib_tool.append_incomplete(sym, df.iloc[idx: idx+1]) assert lib.has_symbol(sym) if batch: received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] @@ -32,11 +33,12 @@ def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch): @pytest.mark.parametrize("batch", (True, False)) def test_read_incompletes_no_indexed_data(lmdb_version_store_v1, batch): lib = lmdb_version_store_v1 + lib_tool = lib.library_tool() sym = "test_read_incompletes_no_indexed_data" num_rows = 10 df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) for idx in range(num_rows): - lib.write(sym, df.iloc[idx: idx+1], incomplete=True) + lib_tool.append_incomplete(sym, df.iloc[idx: idx+1]) assert not lib.has_symbol(sym) if batch: received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] From 62fd6f3447e2a3c37827d55a42325eb41901ffe9 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 6 Nov 2024 16:24:07 +0000 Subject: [PATCH 3/4] Skip test on Linux Python 3.6 that segfaults during setup --- python/tests/integration/arcticdb/test_s3.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/tests/integration/arcticdb/test_s3.py b/python/tests/integration/arcticdb/test_s3.py index 0494f60c9c..dbb2e07a3a 100644 --- a/python/tests/integration/arcticdb/test_s3.py +++ b/python/tests/integration/arcticdb/test_s3.py @@ -9,6 +9,7 @@ import pytest import pandas as pd +import sys from arcticdb_ext.exceptions import StorageException from arcticdb_ext import set_config_string @@ -53,6 +54,8 @@ def test_s3_running_on_aws_fast_check(lib_name, s3_storage_factory, run_on_aws): assert lib_tool.inspect_env_variable("AWS_EC2_METADATA_DISABLED") == "true" +@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 6 and sys.platform == "linux", + reason="Test setup segfaults") def test_nfs_backed_s3_storage(lib_name, nfs_backed_s3_storage): # Given lib = nfs_backed_s3_storage.create_version_store_factory(lib_name)() From 75581f6820a0c46ed65661a50b7fee695b060f1f Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 6 Nov 2024 17:23:27 +0000 Subject: [PATCH 4/4] More skipping --- python/tests/integration/arcticdb/test_s3.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/tests/integration/arcticdb/test_s3.py b/python/tests/integration/arcticdb/test_s3.py index dbb2e07a3a..d33e4205ac 100644 --- a/python/tests/integration/arcticdb/test_s3.py +++ b/python/tests/integration/arcticdb/test_s3.py @@ -19,6 +19,12 @@ from arcticdb.storage_fixtures.s3 import MotoS3StorageFixtureFactory +pytestmark = pytest.mark.skipif( + sys.version_info.major == 3 and sys.version_info.minor == 6 and sys.platform == "linux", + reason="Test setup segfaults" +) + + def test_s3_storage_failures(mock_s3_store_with_error_simulation): lib = mock_s3_store_with_error_simulation symbol_fail_write = "symbol#Failure_Write_99_0" @@ -54,8 +60,6 @@ def test_s3_running_on_aws_fast_check(lib_name, s3_storage_factory, run_on_aws): assert lib_tool.inspect_env_variable("AWS_EC2_METADATA_DISABLED") == "true" -@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 6 and sys.platform == "linux", - reason="Test setup segfaults") def test_nfs_backed_s3_storage(lib_name, nfs_backed_s3_storage): # Given lib = nfs_backed_s3_storage.create_version_store_factory(lib_name)()