From 93ab53163ac3529b0dbb3af239f02728c1040d89 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 24 Aug 2023 12:32:56 +0200 Subject: [PATCH 1/3] Revert "Revert "Planner prepare filters for analysis"" --- src/Planner/Planner.cpp | 82 +++++++++++++++++++ src/Planner/PlannerJoinTree.cpp | 1 + src/Planner/Utils.cpp | 40 +++++++-- src/Planner/Utils.h | 4 +- src/Storages/ColumnsDescription.cpp | 11 +++ src/Storages/ColumnsDescription.h | 5 ++ .../MergeTree/MergeTreeDataSelectExecutor.cpp | 6 +- src/Storages/StorageDummy.h | 4 +- src/Storages/StorageMergeTree.cpp | 7 -- src/Storages/StorageReplicatedMergeTree.cpp | 9 -- src/Storages/StorageSnapshot.cpp | 10 +++ src/Storages/StorageSnapshot.h | 2 + 12 files changed, 154 insertions(+), 27 deletions(-) diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 7cce495dfb85..734458ed086e 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include @@ -138,6 +139,84 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context) } } +/** Storages can rely that filters that for storage will be available for analysis before + * getQueryProcessingStage method will be called. + * + * StorageDistributed skip unused shards optimization relies on this. + * + * To collect filters that will be applied to specific table in case we have JOINs requires + * to run query plan optimization pipeline. + * + * Algorithm: + * 1. Replace all table expressions in query tree with dummy tables. + * 2. Build query plan. + * 3. Optimize query plan. + * 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes. + */ +void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context) +{ + bool collect_filters = false; + + for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData()) + { + auto * table_node = table_expression->as(); + auto * table_function_node = table_expression->as(); + if (!table_node && !table_function_node) + continue; + + const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage(); + if (typeid_cast(storage.get())) + { + collect_filters = true; + break; + } + } + + if (!collect_filters) + return; + + ResultReplacementMap replacement_map; + auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, planner_context->getQueryContext(), &replacement_map); + + std::unordered_map dummy_storage_to_table_expression_data; + + for (auto & [from_table_expression, dummy_table_expression] : replacement_map) + { + auto * dummy_storage = dummy_table_expression->as().getStorage().get(); + auto * table_expression_data = &planner_context->getTableExpressionDataOrThrow(from_table_expression); + dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data); + } + + const auto & query_context = planner_context->getQueryContext(); + + SelectQueryOptions select_query_options; + Planner planner(updated_query_tree, select_query_options); + planner.buildQueryPlanIfNeeded(); + + auto & result_query_plan = planner.getQueryPlan(); + + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context); + result_query_plan.optimize(optimization_settings); + + std::vector nodes_to_process; + nodes_to_process.push_back(result_query_plan.getRootNode()); + + while (!nodes_to_process.empty()) + { + const auto * node_to_process = nodes_to_process.back(); + nodes_to_process.pop_back(); + nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end()); + + auto * read_from_dummy = typeid_cast(node_to_process->step.get()); + if (!read_from_dummy) + continue; + + auto filter_actions = ActionsDAG::buildFilterActionsDAG(read_from_dummy->getFilterNodes().nodes, {}, query_context); + auto & table_expression_data = dummy_storage_to_table_expression_data.at(&read_from_dummy->getStorage()); + table_expression_data->setFilterActions(std::move(filter_actions)); + } +} + /// Extend lifetime of query context, storages, and table locks void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context) { @@ -1226,6 +1305,9 @@ void Planner::buildPlanForQueryNode() collectSets(query_tree, *planner_context); collectTableExpressionData(query_tree, planner_context); + if (!select_query_options.only_analyze) + collectFiltersForAnalysis(query_tree, planner_context); + const auto & settings = query_context->getSettingsRef(); /// Check support for JOIN for parallel replicas with custom key diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 70ff32c1d312..f9833a9fbd43 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -544,6 +544,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres auto table_expression_query_info = select_query_info; table_expression_query_info.table_expression = table_expression; + table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions(); size_t max_streams = settings.max_threads; size_t max_threads_execute_query = settings.max_threads; diff --git a/src/Planner/Utils.cpp b/src/Planner/Utils.cpp index 733db0f00bc2..9c46622f578e 100644 --- a/src/Planner/Utils.cpp +++ b/src/Planner/Utils.cpp @@ -355,24 +355,52 @@ QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, con return function_node; } -QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node, +QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node, const ContextPtr & context, ResultReplacementMap * result_replacement_map) { auto & query_node_typed = query_node->as(); auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree()); std::unordered_map replacement_map; + size_t subquery_index = 0; for (auto & table_expression : table_expressions) { auto * table_node = table_expression->as(); auto * table_function_node = table_expression->as(); - if (!table_node && !table_function_node) - continue; + auto * subquery_node = table_expression->as(); + auto * union_node = table_expression->as(); + + StoragePtr storage_dummy; + + if (table_node || table_function_node) + { + const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot(); + auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals(); + + storage_dummy + = std::make_shared(storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options))); + } + else if (subquery_node || union_node) + { + const auto & subquery_projection_columns + = subquery_node ? subquery_node->getProjectionColumns() : union_node->computeProjectionColumns(); + + NameSet unique_column_names; + NamesAndTypes storage_dummy_columns; + storage_dummy_columns.reserve(subquery_projection_columns.size()); + + for (const auto & projection_column : subquery_projection_columns) + { + auto [_, inserted] = unique_column_names.insert(projection_column.name); + if (inserted) + storage_dummy_columns.emplace_back(projection_column); + } + + storage_dummy = std::make_shared(StorageID{"dummy", "subquery_" + std::to_string(subquery_index)}, ColumnsDescription(storage_dummy_columns)); + ++subquery_index; + } - const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot(); - auto storage_dummy = std::make_shared(storage_snapshot->storage.getStorageID(), - storage_snapshot->metadata->getColumns()); auto dummy_table_node = std::make_shared(std::move(storage_dummy), context); if (result_replacement_map) diff --git a/src/Planner/Utils.h b/src/Planner/Utils.h index d9412800e61e..1b8397f47ccd 100644 --- a/src/Planner/Utils.h +++ b/src/Planner/Utils.h @@ -65,9 +65,9 @@ bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_no /// Returns `and` function node that has condition nodes as its arguments QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context); -/// Replace tables nodes and table function nodes with dummy table nodes +/// Replace table expressions from query JOIN TREE with dummy tables using ResultReplacementMap = std::unordered_map; -QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node, +QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node, const ContextPtr & context, ResultReplacementMap * result_replacement_map = nullptr); diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index 0c918bda5fdd..9b721280addc 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -140,6 +140,17 @@ void ColumnDescription::readText(ReadBuffer & buf) } } +ColumnsDescription::ColumnsDescription(std::initializer_list ordinary) +{ + for (const auto & elem : ordinary) + add(ColumnDescription(elem.name, elem.type)); +} + +ColumnsDescription::ColumnsDescription(NamesAndTypes ordinary) +{ + for (auto & elem : ordinary) + add(ColumnDescription(std::move(elem.name), std::move(elem.type))); +} ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary) { diff --git a/src/Storages/ColumnsDescription.h b/src/Storages/ColumnsDescription.h index fb1eeed3127d..276968ffe4ea 100644 --- a/src/Storages/ColumnsDescription.h +++ b/src/Storages/ColumnsDescription.h @@ -102,6 +102,11 @@ class ColumnsDescription : public IHints<1, ColumnsDescription> { public: ColumnsDescription() = default; + + ColumnsDescription(std::initializer_list ordinary); + + explicit ColumnsDescription(NamesAndTypes ordinary); + explicit ColumnsDescription(NamesAndTypesList ordinary); explicit ColumnsDescription(NamesAndTypesList ordinary, NamesAndAliases aliases); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 62547ff8786b..5e9aefb7fb5e 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1304,6 +1304,10 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried); + /// Do not keep data parts in snapshot. + /// They are stored separately, and some could be released after PK analysis. + auto storage_snapshot_copy = storage_snapshot->clone(std::make_unique()); + return std::make_unique( std::move(parts), std::move(alter_conversions), @@ -1311,7 +1315,7 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( virt_column_names, data, query_info, - storage_snapshot, + storage_snapshot_copy, context, max_block_size, num_streams, diff --git a/src/Storages/StorageDummy.h b/src/Storages/StorageDummy.h index 2f9a8beb4d04..aa2201a196ba 100644 --- a/src/Storages/StorageDummy.h +++ b/src/Storages/StorageDummy.h @@ -8,7 +8,7 @@ namespace DB { -class StorageDummy : public IStorage +class StorageDummy final : public IStorage { public: StorageDummy(const StorageID & table_id_, const ColumnsDescription & columns_, ColumnsDescription object_columns_ = {}); @@ -46,7 +46,7 @@ class StorageDummy : public IStorage const ColumnsDescription object_columns; }; -class ReadFromDummy : public SourceStepWithFilter +class ReadFromDummy final : public SourceStepWithFilter { public: explicit ReadFromDummy(const StorageDummy & storage_, diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 9d7f6903b466..c7f27c008991 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -255,13 +255,6 @@ void StorageMergeTree::read( processed_stage, nullptr, enable_parallel_reading)) query_plan = std::move(*plan); } - - /// Now, copy of parts that is required for the query, stored in the processors, - /// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning, - /// reset them to avoid holding them. - auto & snapshot_data = assert_cast(*storage_snapshot->data); - snapshot_data.parts = {}; - snapshot_data.alter_conversions = {}; } std::optional StorageMergeTree::totalRows(const Settings &) const diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 76a2ad9883c7..6f50c9c773c7 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5103,15 +5103,6 @@ void StorageReplicatedMergeTree::read( const size_t max_block_size, const size_t num_streams) { - SCOPE_EXIT({ - /// Now, copy of parts that is required for the query, stored in the processors, - /// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning, - /// reset them to avoid holding them. - auto & snapshot_data = assert_cast(*storage_snapshot->data); - snapshot_data.parts = {}; - snapshot_data.alter_conversions = {}; - }); - const auto & settings = local_context->getSettingsRef(); /// The `select_sequential_consistency` setting has two meanings: diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index c0e859007943..5de60f4decde 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -17,6 +17,16 @@ namespace ErrorCodes extern const int COLUMN_QUERIED_MORE_THAN_ONCE; } +std::shared_ptr StorageSnapshot::clone(DataPtr data_) const +{ + auto res = std::make_shared(storage, metadata, object_columns); + + res->projection = projection; + res->data = std::move(data_); + + return res; +} + void StorageSnapshot::init() { for (const auto & [name, type] : storage.getVirtuals()) diff --git a/src/Storages/StorageSnapshot.h b/src/Storages/StorageSnapshot.h index a69f9b959551..d62e118e1f28 100644 --- a/src/Storages/StorageSnapshot.h +++ b/src/Storages/StorageSnapshot.h @@ -60,6 +60,8 @@ struct StorageSnapshot init(); } + std::shared_ptr clone(DataPtr data_) const; + /// Get all available columns with types according to options. NamesAndTypesList getColumns(const GetColumnsOptions & options) const; From 6128146371bfa02384636f915b9b98f8a39bfe53 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 24 Aug 2023 10:32:13 +0000 Subject: [PATCH 2/3] Fix 02834_remote_session_log (cherry picked from commit 40a84e48aae09aff08850e19c5527fd9bcf4ab8e) --- src/Planner/PlannerJoinTree.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index f9833a9fbd43..935a1c8384a3 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -46,6 +46,8 @@ #include #include +#include + #include #include #include @@ -84,6 +86,10 @@ namespace /// Check if current user has privileges to SELECT columns from table void checkAccessRights(const TableNode & table_node, const Names & column_names, const ContextPtr & query_context) { + /// StorageDummy is created on preliminary stage, igore access check for it. + if (typeid_cast(table_node.getStorage().get())) + return; + const auto & storage_id = table_node.getStorageID(); const auto & storage_snapshot = table_node.getStorageSnapshot(); From 15182772cc860639f059a4d58d9be34ed27a8385 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 24 Aug 2023 12:54:11 +0200 Subject: [PATCH 3/3] Update src/Planner/PlannerJoinTree.cpp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raúl Marín --- src/Planner/PlannerJoinTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 935a1c8384a3..8f87b90e9494 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -86,7 +86,7 @@ namespace /// Check if current user has privileges to SELECT columns from table void checkAccessRights(const TableNode & table_node, const Names & column_names, const ContextPtr & query_context) { - /// StorageDummy is created on preliminary stage, igore access check for it. + /// StorageDummy is created on preliminary stage, ignore access check for it. if (typeid_cast(table_node.getStorage().get())) return;