Skip to content

Commit

Permalink
Merge pull request ClickHouse#53792 from ClickHouse/revert-53782-reve…
Browse files Browse the repository at this point in the history
…rt-52762-planner-prepare-filters-for-analysis

Revert "Revert "Planner prepare filters for analysis""
  • Loading branch information
Algunenano authored Oct 16, 2023
2 parents a4fb143 + 0b4d767 commit 478f635
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 27 deletions.
82 changes: 82 additions & 0 deletions src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageDistributed.h>
#include <Storages/IStorage.h>

#include <Analyzer/Utils.h>
Expand Down Expand Up @@ -138,6 +139,84 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
}
}

/** Storages can rely that filters that for storage will be available for analysis before
* getQueryProcessingStage method will be called.
*
* StorageDistributed skip unused shards optimization relies on this.
*
* To collect filters that will be applied to specific table in case we have JOINs requires
* to run query plan optimization pipeline.
*
* Algorithm:
* 1. Replace all table expressions in query tree with dummy tables.
* 2. Build query plan.
* 3. Optimize query plan.
* 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes.
*/
void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
{
bool collect_filters = false;

for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData())
{
auto * table_node = table_expression->as<TableNode>();
auto * table_function_node = table_expression->as<TableFunctionNode>();
if (!table_node && !table_function_node)
continue;

const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
if (typeid_cast<const StorageDistributed *>(storage.get()))
{
collect_filters = true;
break;
}
}

if (!collect_filters)
return;

ResultReplacementMap replacement_map;
auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, planner_context->getQueryContext(), &replacement_map);

std::unordered_map<const IStorage *, TableExpressionData *> dummy_storage_to_table_expression_data;

for (auto & [from_table_expression, dummy_table_expression] : replacement_map)
{
auto * dummy_storage = dummy_table_expression->as<TableNode &>().getStorage().get();
auto * table_expression_data = &planner_context->getTableExpressionDataOrThrow(from_table_expression);
dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data);
}

const auto & query_context = planner_context->getQueryContext();

SelectQueryOptions select_query_options;
Planner planner(updated_query_tree, select_query_options);
planner.buildQueryPlanIfNeeded();

auto & result_query_plan = planner.getQueryPlan();

auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
result_query_plan.optimize(optimization_settings);

std::vector<QueryPlan::Node *> nodes_to_process;
nodes_to_process.push_back(result_query_plan.getRootNode());

while (!nodes_to_process.empty())
{
const auto * node_to_process = nodes_to_process.back();
nodes_to_process.pop_back();
nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());

auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
if (!read_from_dummy)
continue;

auto filter_actions = ActionsDAG::buildFilterActionsDAG(read_from_dummy->getFilterNodes().nodes, {}, query_context);
auto & table_expression_data = dummy_storage_to_table_expression_data.at(&read_from_dummy->getStorage());
table_expression_data->setFilterActions(std::move(filter_actions));
}
}

/// Extend lifetime of query context, storages, and table locks
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
{
Expand Down Expand Up @@ -1227,6 +1306,9 @@ void Planner::buildPlanForQueryNode()
collectSets(query_tree, *planner_context);
collectTableExpressionData(query_tree, planner_context);

if (!select_query_options.only_analyze)
collectFiltersForAnalysis(query_tree, planner_context);

const auto & settings = query_context->getSettingsRef();

/// Check support for JOIN for parallel replicas with custom key
Expand Down
7 changes: 7 additions & 0 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/Sources/SourceFromSingleChunk.h>

#include <Storages/StorageDummy.h>

#include <Interpreters/Context.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
Expand Down Expand Up @@ -84,6 +86,10 @@ namespace
/// Check if current user has privileges to SELECT columns from table
void checkAccessRights(const TableNode & table_node, const Names & column_names, const ContextPtr & query_context)
{
/// StorageDummy is created on preliminary stage, ignore access check for it.
if (typeid_cast<const StorageDummy *>(table_node.getStorage().get()))
return;

const auto & storage_id = table_node.getStorageID();
const auto & storage_snapshot = table_node.getStorageSnapshot();

Expand Down Expand Up @@ -553,6 +559,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres

auto table_expression_query_info = select_query_info;
table_expression_query_info.table_expression = table_expression;
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();

size_t max_streams = settings.max_threads;
size_t max_threads_execute_query = settings.max_threads;
Expand Down
40 changes: 34 additions & 6 deletions src/Planner/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,24 +355,52 @@ QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, con
return function_node;
}

QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
const ContextPtr & context,
ResultReplacementMap * result_replacement_map)
{
auto & query_node_typed = query_node->as<QueryNode &>();
auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree());
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
size_t subquery_index = 0;

for (auto & table_expression : table_expressions)
{
auto * table_node = table_expression->as<TableNode>();
auto * table_function_node = table_expression->as<TableFunctionNode>();
if (!table_node && !table_function_node)
continue;
auto * subquery_node = table_expression->as<QueryNode>();
auto * union_node = table_expression->as<UnionNode>();

StoragePtr storage_dummy;

if (table_node || table_function_node)
{
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();

storage_dummy
= std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options)));
}
else if (subquery_node || union_node)
{
const auto & subquery_projection_columns
= subquery_node ? subquery_node->getProjectionColumns() : union_node->computeProjectionColumns();

NameSet unique_column_names;
NamesAndTypes storage_dummy_columns;
storage_dummy_columns.reserve(subquery_projection_columns.size());

for (const auto & projection_column : subquery_projection_columns)
{
auto [_, inserted] = unique_column_names.insert(projection_column.name);
if (inserted)
storage_dummy_columns.emplace_back(projection_column);
}

storage_dummy = std::make_shared<StorageDummy>(StorageID{"dummy", "subquery_" + std::to_string(subquery_index)}, ColumnsDescription(storage_dummy_columns));
++subquery_index;
}

const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
auto storage_dummy = std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(),
storage_snapshot->metadata->getColumns());
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);

if (result_replacement_map)
Expand Down
4 changes: 2 additions & 2 deletions src/Planner/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_no
/// Returns `and` function node that has condition nodes as its arguments
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context);

/// Replace tables nodes and table function nodes with dummy table nodes
/// Replace table expressions from query JOIN TREE with dummy tables
using ResultReplacementMap = std::unordered_map<QueryTreeNodePtr, QueryTreeNodePtr>;
QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
const ContextPtr & context,
ResultReplacementMap * result_replacement_map = nullptr);

Expand Down
11 changes: 11 additions & 0 deletions src/Storages/ColumnsDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ void ColumnDescription::readText(ReadBuffer & buf)
}
}

ColumnsDescription::ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary)
{
for (const auto & elem : ordinary)
add(ColumnDescription(elem.name, elem.type));
}

ColumnsDescription::ColumnsDescription(NamesAndTypes ordinary)
{
for (auto & elem : ordinary)
add(ColumnDescription(std::move(elem.name), std::move(elem.type)));
}

ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary)
{
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/ColumnsDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class ColumnsDescription : public IHints<>
{
public:
ColumnsDescription() = default;

ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary);

explicit ColumnsDescription(NamesAndTypes ordinary);

explicit ColumnsDescription(NamesAndTypesList ordinary);

explicit ColumnsDescription(NamesAndTypesList ordinary, NamesAndAliases aliases);
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1335,14 +1335,18 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(

selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);

/// Do not keep data parts in snapshot.
/// They are stored separately, and some could be released after PK analysis.
auto storage_snapshot_copy = storage_snapshot->clone(std::make_unique<MergeTreeData::SnapshotData>());

return std::make_unique<ReadFromMergeTree>(
std::move(parts),
std::move(alter_conversions),
real_column_names,
virt_column_names,
data,
query_info,
storage_snapshot,
storage_snapshot_copy,
context,
max_block_size,
num_streams,
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageDummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace DB
{

class StorageDummy : public IStorage
class StorageDummy final : public IStorage
{
public:
StorageDummy(const StorageID & table_id_, const ColumnsDescription & columns_, ColumnsDescription object_columns_ = {});
Expand Down Expand Up @@ -46,7 +46,7 @@ class StorageDummy : public IStorage
const ColumnsDescription object_columns;
};

class ReadFromDummy : public SourceStepWithFilter
class ReadFromDummy final : public SourceStepWithFilter
{
public:
explicit ReadFromDummy(const StorageDummy & storage_,
Expand Down
7 changes: 0 additions & 7 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,6 @@ void StorageMergeTree::read(
processed_stage, nullptr, enable_parallel_reading))
query_plan = std::move(*plan);
}

/// Now, copy of parts that is required for the query, stored in the processors,
/// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning,
/// reset them to avoid holding them.
auto & snapshot_data = assert_cast<MergeTreeData::SnapshotData &>(*storage_snapshot->data);
snapshot_data.parts = {};
snapshot_data.alter_conversions = {};
}

std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
Expand Down
9 changes: 0 additions & 9 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5134,15 +5134,6 @@ void StorageReplicatedMergeTree::read(
const size_t max_block_size,
const size_t num_streams)
{
SCOPE_EXIT({
/// Now, copy of parts that is required for the query, stored in the processors,
/// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning,
/// reset them to avoid holding them.
auto & snapshot_data = assert_cast<MergeTreeData::SnapshotData &>(*storage_snapshot->data);
snapshot_data.parts = {};
snapshot_data.alter_conversions = {};
});

const auto & settings = local_context->getSettingsRef();

/// The `select_sequential_consistency` setting has two meanings:
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/StorageSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ namespace ErrorCodes
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
}

std::shared_ptr<StorageSnapshot> StorageSnapshot::clone(DataPtr data_) const
{
auto res = std::make_shared<StorageSnapshot>(storage, metadata, object_columns);

res->projection = projection;
res->data = std::move(data_);

return res;
}

void StorageSnapshot::init()
{
for (const auto & [name, type] : storage.getVirtuals())
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct StorageSnapshot
init();
}

std::shared_ptr<StorageSnapshot> clone(DataPtr data_) const;

/// Get all available columns with types according to options.
NamesAndTypesList getColumns(const GetColumnsOptions & options) const;

Expand Down

0 comments on commit 478f635

Please sign in to comment.