Skip to content

Commit

Permalink
1. Move CustomStorageMergeTree.cpp/.h and StorageMergeTreeFactory.cpp…
Browse files Browse the repository at this point in the history
…/.h to Mergetree folderMove CustomStorageMergeTree.cpp/.h and StorageMergeTreeFactory.cpp/.h to Mergetree folder

2. Add CustomMergeTreeDataWriter
  • Loading branch information
baibaichen committed Aug 27, 2024
1 parent 58907cf commit bfeafec
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 238 deletions.
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/Cache/CacheManager.h>
#include <Storages/Mergetree/StorageMergeTreeFactory.h>
#include <Storages/Output/WriteBufferBuilder.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ bool sameColumns(const substrait::NamedStruct & left, const substrait::NamedStru
return true;
}

bool MergeTreeTable::sameStructWith(const MergeTreeTable & other)
bool MergeTreeTable::sameStructWith(const MergeTreeTable & other) const
{
return database == other.database &&
table == other.table &&
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/MergeTreeTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct MergeTreeTable
std::vector<MergeTreePart> parts;
std::unordered_set<String> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;
bool sameStructWith(const MergeTreeTable& other);
bool sameStructWith(const MergeTreeTable& other) const;
};

std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
Expand Down
24 changes: 11 additions & 13 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel:
return parseMergeTreeTableString(table.value());
}

CustomStorageMergeTreePtr
MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore)
CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore)
{
DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key);
auto names_and_types_list = header.getNamesAndTypesList();
Expand Down Expand Up @@ -99,8 +98,7 @@ MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, Contex
return global_storage;
}

CustomStorageMergeTreePtr
MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
CustomStorageMergeTreePtr MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
{
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
Expand All @@ -111,8 +109,7 @@ MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable merge_tree_table,
return parseStorage(merge_tree_table, context);
}

CustomStorageMergeTreePtr
MergeTreeRelParser::copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
CustomStorageMergeTreePtr MergeTreeRelParser::copyToVirtualStorage(MergeTreeTable merge_tree_table, ContextMutablePtr context)
{
auto temp_uuid = UUIDHelpers::generateV4();
String temp_uuid_str = toString(temp_uuid);
Expand Down Expand Up @@ -158,8 +155,8 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
query_info->prewhere_info = parsePreWhereInfo(rel.filter(), input);
}

std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());
std::vector<DataPartPtr> selected_parts = StorageMergeTreeFactory::getDataPartsByNames(
storage->getStorageID(), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());

auto read_step = storage->reader.readFromParts(
selected_parts,
Expand All @@ -182,9 +179,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
auto ranges = merge_tree_table.extractRange(selected_parts);
std::string ret;
if (context->getSettingsRef().tryGetString("enabled_driver_filter_mergetree_index", ret) && ret == "'true'")
storage->analysisPartsByRanges(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
CustomStorageMergeTree::analysisPartsByRanges(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
else
storage->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
CustomStorageMergeTree::wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);

steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
Expand Down Expand Up @@ -399,10 +396,11 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_
query_info->prewhere_info = parsePreWhereInfo(read_rel.filter(), input);

auto storage_factory = StorageMergeTreeFactory::instance();
std::vector<DataPartPtr> selected_parts
= storage_factory.getDataPartsByNames(StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());
std::vector<DataPartPtr> selected_parts = storage_factory.getDataPartsByNames(
StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());

auto storage_snapshot = std::make_shared<StorageSnapshot>(*custom_storage_mergetree, custom_storage_mergetree->getInMemoryMetadataPtr());
auto storage_snapshot
= std::make_shared<StorageSnapshot>(*custom_storage_mergetree, custom_storage_mergetree->getInMemoryMetadataPtr());
if (selected_parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
auto read_step = custom_storage_mergetree->reader.readFromParts(
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/MergeTreeRelParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/Mergetree/StorageMergeTreeFactory.h>
#include <Common/MergeTreeTool.h>


Expand Down
4 changes: 1 addition & 3 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#include <DataTypes/Serializations/ISerialization.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/CollectJoinOnKeysVisitor.h>
Expand All @@ -60,7 +59,6 @@
#include <Parser/WriteRelParser.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/Formats/Impl/ArrowBlockOutputFormat.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/LimitStep.h>
Expand All @@ -71,8 +69,8 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/Mergetree/CustomStorageMergeTree.h>
#include <Storages/Output/FileWriterWrappers.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/Mergetree/CustomStorageMergeTree.h>
#include <Storages/SourceFromJavaIter.h>
#include <base/types.h>
#include <substrait/plan.pb.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace local_engine
{


void CustomStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree & source, DB::RangesInDataParts ranges_in_data_parts)
void CustomStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges_in_data_parts)
{
ReadFromMergeTree::AnalysisResult result;
result.column_names_to_read = source.getAllColumnNames();
Expand Down Expand Up @@ -76,14 +76,13 @@ void CustomStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree & sourc
result.selected_rows = sum_rows;

if (source.getQueryInfo().input_order_info)
result.read_type = (source.getQueryInfo().input_order_info->direction > 0)
? MergeTreeReadType::InOrder
: MergeTreeReadType::InReverseOrder;
result.read_type
= (source.getQueryInfo().input_order_info->direction > 0) ? MergeTreeReadType::InOrder : MergeTreeReadType::InReverseOrder;

source.setAnalyzedResult(std::make_shared<ReadFromMergeTree::AnalysisResult>(std::move(result)));
}

void CustomStorageMergeTree::wrapRangesInDataParts(DB::ReadFromMergeTree & source, DB::RangesInDataParts ranges)
void CustomStorageMergeTree::wrapRangesInDataParts(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges)
{
auto result = source.getAnalysisResult();
std::unordered_map<String, std::tuple<size_t, size_t>> range_index;
Expand All @@ -107,7 +106,7 @@ void CustomStorageMergeTree::wrapRangesInDataParts(DB::ReadFromMergeTree & sourc
const size_t end = std::min(range.end, std::get<1>(expected_range));
// [1, 1) or [5, 2) are invalid.
if (begin >= end)
continue ;
continue;
MarkRange final_range(begin, end);
final_ranges.emplace_back(final_range);
}
Expand Down Expand Up @@ -138,7 +137,6 @@ CustomStorageMergeTree::CustomStorageMergeTree(
std::move(storage_settings_),
false, /// require_part_metadata
attach ? LoadingStrictnessLevel::ATTACH : LoadingStrictnessLevel::FORCE_RESTORE)
, writer(*this)
, reader(*this)
, merger_mutator(*this)
{
Expand All @@ -149,16 +147,17 @@ CustomStorageMergeTree::CustomStorageMergeTree(
std::atomic<int> CustomStorageMergeTree::part_num;



void CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set<std::string> parts)
void CustomStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string> parts) const
{
auto disk = getDisks().front();
if (!disk->isRemote()) return;
if (!disk->isRemote())
return;
std::vector<String> meta_paths;
std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
for (const auto & meta_path: meta_paths)
for (const auto & meta_path : meta_paths)
{
if (!disk->exists(meta_path)) continue;
if (!disk->exists(meta_path))
continue;
auto in = disk->readFile(meta_path);
String ignore_data;
readStringUntilEOF(ignore_data, *in);
Expand All @@ -167,10 +166,10 @@ void CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set<std::strin

std::vector<MergeTreeDataPartPtr> CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> parts)
{
prefectchMetaDataFile(parts);
prefetchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
for (const auto & name : parts)
{
const auto num = part_num.fetch_add(1);
MergeTreePartInfo part_info = {"all", num, num, 0};
Expand All @@ -182,10 +181,7 @@ std::vector<MergeTreeDataPartPtr> CustomStorageMergeTree::loadDataPartsWithNames
}

MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
const DiskPtr & part_disk_ptr,
MergeTreeDataPartState to_state)
const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr, MergeTreeDataPartState to_state)
{
LOG_TRACE(log, "Loading {} part {} from disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
#pragma once

#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/MutationCommands.h>
#include <Storages/StorageMergeTree.h>

namespace local_engine
Expand All @@ -37,8 +37,8 @@ class CustomStorageMergeTree final : public MergeTreeData
friend class MergeSparkMergeTreeTask;

public:
static void wrapRangesInDataParts(DB::ReadFromMergeTree & source, DB::RangesInDataParts ranges);
void analysisPartsByRanges(DB::ReadFromMergeTree & source, DB::RangesInDataParts ranges_in_data_parts);
static void wrapRangesInDataParts(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges);
static void analysisPartsByRanges(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges_in_data_parts);
CustomStorageMergeTree(
const StorageID & table_id_,
const String & relative_data_path_,
Expand All @@ -56,7 +56,7 @@ class CustomStorageMergeTree final : public MergeTreeData
std::vector<MergeTreeDataPartPtr> loadDataPartsWithNames(std::unordered_set<std::string> parts);
void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach);

MergeTreeDataWriter writer;

MergeTreeDataSelectExecutor reader;
MergeTreeDataMergerMutator merger_mutator;

Expand All @@ -65,14 +65,11 @@ class CustomStorageMergeTree final : public MergeTreeData
private:
SimpleIncrement increment;

void prefectchMetaDataFile(std::unordered_set<std::string> parts);
void prefetchMetaDataFile(std::unordered_set<std::string> parts) const;
void startBackgroundMovesIfNeeded() override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
LoadPartResult loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
const DiskPtr & part_disk_ptr,
MergeTreeDataPartState to_state);
const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr, MergeTreeDataPartState to_state);

protected:
void dropPartNoWaitNoThrow(const String & part_name) override;
Expand All @@ -85,7 +82,6 @@ class CustomStorageMergeTree final : public MergeTreeData
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & /*part*/) const override { return {}; }
void attachRestoredParts(MutableDataPartsVector && /*parts*/) override { throw std::runtime_error("not implement"); }

};

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/
#include "MergeSparkMergeTreeTask.h"
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/Mergetree/CustomStorageMergeTree.h>

#include <Interpreters/TransactionLog.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Interpreters/TransactionLog.h>
#include <Common/ProfileEventsScope.h>
#include <Common/ProfileEvents.h>
#include <Common/ProfileEventsScope.h>
#include <Common/ThreadFuzzer.h>
using namespace DB;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTask.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
#include <Interpreters/MergeTreeTransactionHolder.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>

using namespace DB;

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/Mergetree/StorageMergeTreeFactory.h>
#include <Common/MergeTreeTool.h>

namespace local_engine
Expand Down
Loading

0 comments on commit bfeafec

Please sign in to comment.