diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java index 864cc4eb70ace..edf7e9c2b2055 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java @@ -108,8 +108,6 @@ public native long nativeMakeForRSS( Object pusher, boolean forceMemorySort); - public native void split(long splitterId, long block); - public native CHSplitResult stop(long splitterId) throws IOException; public native void close(long splitterId); diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHSplitResult.java b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHSplitResult.java index ea6f756cd5f23..b739aed3c5c2a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHSplitResult.java +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHSplitResult.java @@ -20,6 +20,9 @@ public class CHSplitResult extends SplitResult { private final long splitTime; private final long diskWriteTime; private final long serializationTime; + private final long totalRows; + private final long totalBatches; + private final long wallTime; public CHSplitResult(long totalComputePidTime, long totalWriteTime, @@ -31,7 +34,10 @@ public CHSplitResult(long totalComputePidTime, long[] rawPartitionLengths, long splitTime, long diskWriteTime, - long serializationTime) { + long serializationTime, + long totalRows, + long totalBatches, + long wallTime) { super(totalComputePidTime, totalWriteTime, totalEvictTime, @@ -43,6 +49,9 @@ public CHSplitResult(long totalComputePidTime, this.splitTime = splitTime; this.diskWriteTime = diskWriteTime; this.serializationTime = serializationTime; + this.totalRows = totalRows; + this.totalBatches = totalBatches; + this.wallTime = wallTime; } public long getSplitTime() { @@ -56,4 +65,16 @@ public long getDiskWriteTime() { public long getSerializationTime() { return serializationTime; } + + public long getTotalRows() { + return totalRows; + } + + public long getTotalBatches() { + return totalBatches; + } + + public long getWallTime() { + return wallTime; + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index db9bba5f170a3..b991128a9ebed 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -24,7 +24,6 @@ import org.apache.gluten.vectorized._ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.scheduler.MapStatus -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{SparkDirectoryUtil, Utils} import java.io.IOException @@ -80,17 +79,7 @@ class CHColumnarShuffleWriter[K, V]( private def internalCHWrite(records: Iterator[Product2[K, V]]): Unit = { val splitterJniWrapper: CHShuffleSplitterJniWrapper = jniWrapper - if (!records.hasNext) { - partitionLengths = new Array[Long](dep.partitioner.numPartitions) - shuffleBlockResolver.writeMetadataFileAndCommit( - dep.shuffleId, - mapId, - partitionLengths, - Array[Long](), - null) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) - return - } + val dataTmp = Utils.tempFileWith(shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)) if (nativeSplitter == 0) { nativeSplitter = splitterJniWrapper.make( @@ -108,50 +97,54 @@ class CHColumnarShuffleWriter[K, V]( forceMemorySortShuffle ) } - while (records.hasNext) { - val cb = records.next()._2.asInstanceOf[ColumnarBatch] - if (cb.numRows == 0 || cb.numCols == 0) { - logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols") - } else { - firstRecordBatch = false - val col = cb.column(0).asInstanceOf[CHColumnVector] - val block = col.getBlockAddress - splitterJniWrapper - .split(nativeSplitter, block) - dep.metrics("numInputRows").add(cb.numRows) - dep.metrics("inputBatches").add(1) - writeMetrics.incRecordsWritten(cb.numRows) + + splitResult = splitterJniWrapper.stop(nativeSplitter) + if (splitResult.getTotalRows > 0) + { + dep.metrics("numInputRows").add(splitResult.getTotalRows) + dep.metrics("inputBatches").add(splitResult.getTotalBatches) + writeMetrics.incRecordsWritten(splitResult.getTotalRows) + dep.metrics("splitTime").add(splitResult.getSplitTime) + dep.metrics("IOTime").add(splitResult.getDiskWriteTime) + dep.metrics("serializeTime").add(splitResult.getSerializationTime) + dep.metrics("spillTime").add(splitResult.getTotalSpillTime) + dep.metrics("compressTime").add(splitResult.getTotalCompressTime) + dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime) + dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled) + dep.metrics("dataSize").add(splitResult.getTotalBytesWritten) + dep.metrics("shuffleWallTime").add(splitResult.getWallTime) + writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) + writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) + + partitionLengths = splitResult.getPartitionLengths + rawPartitionLengths = splitResult.getRawPartitionLengths + + try { + shuffleBlockResolver.writeMetadataFileAndCommit( + dep.shuffleId, + mapId, + partitionLengths, + Array[Long](), + dataTmp) + } finally { + if (dataTmp.exists() && !dataTmp.delete()) { + logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}") + } } + mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) } - splitResult = splitterJniWrapper.stop(nativeSplitter) - - dep.metrics("splitTime").add(splitResult.getSplitTime) - dep.metrics("IOTime").add(splitResult.getDiskWriteTime) - dep.metrics("serializeTime").add(splitResult.getSerializationTime) - dep.metrics("spillTime").add(splitResult.getTotalSpillTime) - dep.metrics("compressTime").add(splitResult.getTotalCompressTime) - dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime) - dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled) - dep.metrics("dataSize").add(splitResult.getTotalBytesWritten) - writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) - writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) - - partitionLengths = splitResult.getPartitionLengths - rawPartitionLengths = splitResult.getRawPartitionLengths - try { + else + { + partitionLengths = new Array[Long](dep.partitioner.numPartitions) shuffleBlockResolver.writeMetadataFileAndCommit( dep.shuffleId, mapId, partitionLengths, Array[Long](), - dataTmp) - } finally { - if (dataTmp.exists() && !dataTmp.delete()) { - logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}") - } + null) + mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) } - - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) + closeCHSplitter() } override def stop(success: Boolean): Option[MapStatus] = { @@ -166,15 +159,16 @@ class CHColumnarShuffleWriter[K, V]( None } } finally { - if (nativeSplitter != 0) { - closeCHSplitter() - nativeSplitter = 0 - } + closeCHSplitter() } } private def closeCHSplitter(): Unit = { - jniWrapper.close(nativeSplitter) + if (nativeSplitter != 0) + { + jniWrapper.close(nativeSplitter) + nativeSplitter = 0 + } } // VisibleForTesting diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index 782df7f5413d4..abb7295adc0d0 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -38,7 +38,7 @@ struct MemoryConfig MemoryConfig config; config.extra_memory_hard_limit = context->getConfigRef().getUInt64(EXTRA_MEMORY_HARD_LIMIT, 0); config.off_heap_per_task = context->getConfigRef().getUInt64(CH_TASK_MEMORY, 0); - config.spill_mem_ratio = context->getConfigRef().getUInt64(SPILL_MEM_RATIO, 0.9); + config.spill_mem_ratio = context->getConfigRef().getDouble(SPILL_MEM_RATIO, 0.9); return config; } }; diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 2d5780a6e35b2..9d70ec658bf0f 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -125,10 +125,7 @@ void QueryContextManager::finalizeQuery(int64_t id) { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); } - std::shared_ptr context; - { - context = query_map.get(id); - } + std::shared_ptr context = query_map.get(id); auto query_context = context->thread_status->getQueryContext(); if (!query_context) { @@ -138,7 +135,7 @@ void QueryContextManager::finalizeQuery(int64_t id) context->thread_status->finalizePerformanceCounters(); LOG_INFO(logger, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id)); - if (currentThreadGroupMemoryUsage() > 1_MiB) + if (currentThreadGroupMemoryUsage() > 2_MiB) { LOG_WARNING(logger, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage()); } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index d0924a745716f..87c7943ac414e 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -87,6 +87,8 @@ #include #include #include +#include +#include namespace DB { @@ -1321,16 +1323,14 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla if (root_rel.root().input().has_write()) addSinkTransfrom(context, root_rel.root().input().write(), builder); /// - QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); - auto * logger = &Poco::Logger::get("SerializedPlanParser"); LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); LOG_DEBUG( logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan)); - LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline)); + // LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline)); auto config = ExecutorConfig::loadFromContext(context); - return std::make_unique(std::move(query_plan), std::move(pipeline), config.dump_pipeline); + return std::make_unique(std::move(query_plan), std::move(builder), config.dump_pipeline); } SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_) @@ -1597,8 +1597,19 @@ std::unique_ptr LocalExecutor::writeBlockToSparkRow(const Block & return ch_column_to_spark_row->convertCHColumnToSparkRow(block); } +void LocalExecutor::initPullingPipelineExecutor() +{ + if (!executor) + { + query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline_builder)); + query_pipeline.setNumThreads(1); + executor = std::make_unique(query_pipeline); + } +} + bool LocalExecutor::hasNext() { + initPullingPipelineExecutor(); size_t columns = currentBlock().columns(); if (columns == 0 || isConsumed()) { @@ -1651,21 +1662,29 @@ void LocalExecutor::cancel() executor->cancel(); } +void LocalExecutor::execute() +{ + chassert(query_pipeline_builder); + push_executor = query_pipeline_builder->execute(); + push_executor->execute(1, false); +} + Block & LocalExecutor::getHeader() { return header; } -LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_) - : query_pipeline(std::move(pipeline)) - , executor(std::make_unique(query_pipeline)) +LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipelineBuilderPtr pipeline_builder, bool dump_pipeline_) + : query_pipeline_builder(std::move(pipeline_builder)) , header(query_plan->getCurrentDataStream().header.cloneEmpty()) , dump_pipeline(dump_pipeline_) , ch_column_to_spark_row(std::make_unique()) , current_query_plan(std::move(query_plan)) { + chassert(!current_executor); + current_executor = this; } - +thread_local LocalExecutor * LocalExecutor::current_executor = nullptr; std::string LocalExecutor::dumpPipeline() const { const auto & processors = query_pipeline.getProcessors(); @@ -1674,7 +1693,7 @@ std::string LocalExecutor::dumpPipeline() const WriteBufferFromOwnString buffer; auto data_stats = processor->getProcessorDataStats(); buffer << "("; - buffer << "\nexcution time: " << processor->getElapsedNs() / 1000U << " us."; + buffer << "\nexecution time: " << processor->getElapsedNs() / 1000U << " us."; buffer << "\ninput wait time: " << processor->getInputWaitElapsedNs() / 1000U << " us."; buffer << "\noutput wait time: " << processor->getOutputWaitElapsedNs() / 1000U << " us."; buffer << "\ninput rows: " << data_stats.input_rows; diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index e44a7f657a205..2f3c365ebd3ea 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include @@ -31,6 +31,7 @@ #include #include #include +#include namespace local_engine { @@ -210,7 +211,10 @@ struct SparkBuffer class LocalExecutor : public BlockIterator { public: - LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_ = false); + static thread_local LocalExecutor * current_executor; + static LocalExecutor * getCurrentExecutor() { return current_executor; } + static void resetCurrentExecutor() { current_executor = nullptr; } + LocalExecutor(QueryPlanPtr query_plan, QueryPipelineBuilderPtr pipeline, bool dump_pipeline_ = false); ~LocalExecutor(); SparkRowInfoPtr next(); @@ -219,7 +223,11 @@ class LocalExecutor : public BlockIterator /// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal void cancel(); - + void setSinks(std::function setter) + { + setter(*query_pipeline_builder); + } + void execute(); Block & getHeader(); RelMetricPtr getMetric() const { return metric; } void setMetric(const RelMetricPtr & metric_) { metric = metric_; } @@ -227,12 +235,14 @@ class LocalExecutor : public BlockIterator private: std::unique_ptr writeBlockToSparkRow(const DB::Block & block) const; - + void initPullingPipelineExecutor(); /// Dump processor runtime information to log std::string dumpPipeline() const; + QueryPipelineBuilderPtr query_pipeline_builder; QueryPipeline query_pipeline; - std::unique_ptr executor; + std::unique_ptr executor = nullptr; + PipelineExecutorPtr push_executor = nullptr; Block header; bool dump_pipeline; std::unique_ptr ch_column_to_spark_row; diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp deleted file mode 100644 index 1ab95abcca48d..0000000000000 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "CachedShuffleWriter.h" -#include -#include -#include -#include -#include -#include - - -namespace DB -{ -namespace ErrorCodes -{ -extern const int BAD_ARGUMENTS; -} -} - - -namespace local_engine -{ -using namespace DB; - -CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitOptions & options_, jobject rss_pusher) - : options(options_) -{ - if (short_name == "rr") - { - partitioner = std::make_unique(options.partition_num); - } - else if (short_name == "hash") - { - Poco::StringTokenizer expr_list(options_.hash_exprs, ","); - std::vector hash_fields; - for (const auto & expr : expr_list) - { - hash_fields.push_back(std::stoi(expr)); - } - partitioner = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); - } - else if (short_name == "single") - { - options.partition_num = 1; - partitioner = std::make_unique(options.partition_num); - } - else if (short_name == "range") - partitioner = std::make_unique(options.hash_exprs, options.partition_num); - else - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "unsupported splitter {}", short_name); - - Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); - for (const auto & iter : output_column_tokenizer) - output_columns_indicies.push_back(std::stoi(iter)); - - if (rss_pusher) - { - GET_JNIENV(env) - jclass celeborn_partition_pusher_class = - CreateGlobalClassReference(env, "Lorg/apache/spark/shuffle/CelebornPartitionPusher;"); - jmethodID celeborn_push_partition_data_method = - GetMethodID(env, celeborn_partition_pusher_class, "pushPartitionData", "(I[BI)I"); - CLEAN_JNIENV - celeborn_client = std::make_unique(rss_pusher, celeborn_push_partition_data_method); - } - - - split_result.partition_lengths.resize(options.partition_num, 0); - split_result.raw_partition_lengths.resize(options.partition_num, 0); -} - -void CachedShuffleWriter::split(DB::Block & block) -{ - lazyInitPartitionWriter(block); - auto block_info = block.info; - initOutputIfNeeded(block); - - Stopwatch split_time_watch; - if (!sort_shuffle) - block = convertAggregateStateInBlock(block); - split_result.total_split_time += split_time_watch.elapsedNanoseconds(); - - Stopwatch compute_pid_time_watch; - PartitionInfo partition_info = partitioner->build(block); - split_result.total_compute_pid_time += compute_pid_time_watch.elapsedNanoseconds(); - - DB::Block out_block; - for (size_t col_i = 0; col_i < output_header.columns(); ++col_i) - { - out_block.insert(block.getByPosition(output_columns_indicies[col_i])); - } - out_block.info = block_info; - partition_writer->write(partition_info, out_block); -} - -void CachedShuffleWriter::initOutputIfNeeded(Block & block) -{ - if (!output_header) - { - if (output_columns_indicies.empty()) - { - output_header = block.cloneEmpty(); - for (size_t i = 0; i < block.columns(); ++i) - output_columns_indicies.push_back(i); - } - else - { - ColumnsWithTypeAndName cols; - for (const auto & index : output_columns_indicies) - cols.push_back(block.getByPosition(index)); - - output_header = DB::Block(std::move(cols)); - } - } -} - -void CachedShuffleWriter::lazyInitPartitionWriter(Block & input_sample) -{ - if (partition_writer) - return; - - auto avg_row_size = input_sample.allocatedBytes() / input_sample.rows(); - auto overhead_memory = std::max(avg_row_size, input_sample.columns() * 16) * options.split_size * options.partition_num; - auto use_sort_shuffle = overhead_memory > options.spill_threshold * 0.5 || options.partition_num >= 300; - sort_shuffle = use_sort_shuffle || options.force_memory_sort; - if (celeborn_client) - { - if (sort_shuffle) - partition_writer = std::make_unique(this, std::move(celeborn_client)); - else - partition_writer = std::make_unique(this, std::move(celeborn_client)); - } - else - { - if (sort_shuffle) - partition_writer = std::make_unique(this); - else - partition_writer = std::make_unique(this); - } - partitioner->setUseSortShuffle(sort_shuffle); - LOG_INFO(logger, "Use Partition Writer {}", partition_writer->getName()); -} - -SplitResult CachedShuffleWriter::stop() -{ - if (partition_writer) - partition_writer->stop(); - LOG_INFO(logger, "CachedShuffleWriter stop, split result: {}", split_result.toString()); - return split_result; -} - -} \ No newline at end of file diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h deleted file mode 100644 index 6de22f35d9bff..0000000000000 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once -#include -#include -#include -#include -#include -#include - -namespace local_engine -{ - class CelebornClient; - class PartitionWriter; - class LocalPartitionWriter; - class CelebornPartitionWriter; - -class CachedShuffleWriter : public ShuffleWriterBase -{ -public: - friend class PartitionWriter; - friend class LocalPartitionWriter; - friend class CelebornPartitionWriter; - friend class SortBasedPartitionWriter; - friend class MemorySortLocalPartitionWriter; - friend class MemorySortCelebornPartitionWriter; - friend class ExternalSortLocalPartitionWriter; - friend class ExternalSortCelebornPartitionWriter; - friend class Spillable; - - explicit CachedShuffleWriter(const String & short_name, const SplitOptions & options, jobject rss_pusher = nullptr); - ~CachedShuffleWriter() override = default; - - void split(DB::Block & block) override; - SplitResult stop() override; - -private: - void initOutputIfNeeded(DB::Block & block); - void lazyInitPartitionWriter(DB::Block & input_sample); - - bool stopped = false; - DB::Block output_header; - SplitOptions options; - SplitResult split_result; - std::unique_ptr partitioner; - std::vector output_columns_indicies; - std::unique_ptr partition_writer; - std::unique_ptr celeborn_client; - bool sort_shuffle = false; - Poco::Logger* logger = &Poco::Logger::get("CachedShuffleWriter"); -}; -} - - - diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index a2ef0888aeff5..a0d00c0916245 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -23,8 +23,6 @@ #include #include #include -#include -#include #include #include #include @@ -45,6 +43,7 @@ extern const int LOGICAL_ERROR; } using namespace DB; + namespace local_engine { static const String PARTITION_COLUMN_NAME = "partition"; @@ -68,12 +67,13 @@ int64_t searchLastPartitionIdIndex(ColumnPtr column, size_t start, size_t partit bool PartitionWriter::worthToSpill(size_t cache_size) const { - return (options->spill_threshold > 0 && cache_size >= options->spill_threshold) || + return (options.spill_threshold > 0 && cache_size >= options.spill_threshold) || currentThreadGroupMemoryUsageRatio() > settings.spill_mem_ratio; } void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & block) { + chassert(init); /// PartitionWriter::write is alwasy the top frame who occupies evicting_or_writing Stopwatch watch; size_t current_cached_bytes = bytes(); @@ -85,7 +85,7 @@ void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & bl /// Make sure buffer size is no greater than split_size auto & block_buffer = partition_block_buffer[partition_id]; auto & buffer = partition_buffer[partition_id]; - if (block_buffer->size() && block_buffer->size() + length >= shuffle_writer->options.split_size) + if (!block_buffer->empty() && block_buffer->size() + length >= options.split_size) buffer->addBlock(block_buffer->releaseColumns()); current_cached_bytes -= block_buffer->bytes(); @@ -99,8 +99,8 @@ void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & bl /// Calculate average rows of each partition block buffer size_t avg_size = 0; size_t cnt = 0; - for (size_t i = (last_partition_id + 1) % options->partition_num; i != (partition_id + 1) % options->partition_num; - i = (i + 1) % options->partition_num) + for (size_t i = (last_partition_id + 1) % options.partition_num; i != (partition_id + 1) % options.partition_num; + i = (i + 1) % options.partition_num) { avg_size += partition_block_buffer[i]->size(); ++cnt; @@ -108,12 +108,13 @@ void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & bl avg_size /= cnt; - for (size_t i = (last_partition_id + 1) % options->partition_num; i != (partition_id + 1) % options->partition_num; - i = (i + 1) % options->partition_num) + for (size_t i = (last_partition_id + 1) % options.partition_num; i != (partition_id + 1) % options.partition_num; + i = (i + 1) % options.partition_num) { bool flush_block_buffer = partition_block_buffer[i]->size() >= avg_size; - current_cached_bytes -= flush_block_buffer ? partition_block_buffer[i]->bytes() + partition_buffer[i]->bytes() - : partition_buffer[i]->bytes(); + current_cached_bytes -= flush_block_buffer + ? partition_block_buffer[i]->bytes() + partition_buffer[i]->bytes() + : partition_buffer[i]->bytes(); evictSinglePartition(i); } // std::cout << "current cached bytes after evict partitions is " << current_cached_bytes << " partition from " @@ -127,7 +128,7 @@ void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & bl if (!supportsEvictSinglePartition() && worthToSpill(current_cached_bytes)) evictPartitions(); - shuffle_writer->split_result.total_split_time += watch.elapsedNanoseconds(); + split_result->total_split_time += watch.elapsedNanoseconds(); } size_t LocalPartitionWriter::evictPartitions() @@ -138,10 +139,10 @@ size_t LocalPartitionWriter::evictPartitions() auto spill_to_file = [this, &res, &spilled_bytes]() { auto file = getNextSpillFile(); - WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, shuffle_writer->output_header); + WriteBufferFromFile output(file, options.io_buffer_size); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); + CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size); + NativeWriter writer(compressed_output, output_header); SpillInfo info; info.spilled_file = file; @@ -167,113 +168,39 @@ size_t LocalPartitionWriter::evictPartitions() compressed_output.sync(); offsets.second = output.count() - offsets.first; - shuffle_writer->split_result.raw_partition_lengths[partition_id] += written_bytes; + split_result->raw_partition_lengths[partition_id] += written_bytes; info.partition_spill_infos[partition_id] = offsets; } spill_infos.emplace_back(info); - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_write_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + split_result->total_compress_time += compressed_output.getCompressTime(); + split_result->total_write_time += compressed_output.getWriteTime(); + split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; Stopwatch spill_time_watch; spill_to_file(); - shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; + split_result->total_spill_time += spill_time_watch.elapsedNanoseconds(); + split_result->total_bytes_spilled += spilled_bytes; LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds()); return res; } String Spillable::getNextSpillFile() { - auto file_name = std::to_string(static_cast(split_options.shuffle_id)) + "_" + std::to_string(static_cast(split_options.map_id)) + "_" + std::to_string(spill_infos.size()); + auto file_name = std::to_string(static_cast(spill_options.shuffle_id)) + "_" + std::to_string( + static_cast(spill_options.map_id)) + "_" + std::to_string(reinterpret_cast(this)) + "_" + std::to_string( + spill_infos.size()); std::hash hasher; auto hash = hasher(file_name); - auto dir_id = hash % split_options.local_dirs_list.size(); - auto sub_dir_id = (hash / split_options.local_dirs_list.size()) % split_options.num_sub_dirs; + auto dir_id = hash % spill_options.local_dirs_list.size(); + auto sub_dir_id = (hash / spill_options.local_dirs_list.size()) % spill_options.num_sub_dirs; - std::string dir = std::filesystem::path(split_options.local_dirs_list[dir_id]) / std::format("{:02x}", sub_dir_id); + std::string dir = std::filesystem::path(spill_options.local_dirs_list[dir_id]) / std::format("{:02x}", sub_dir_id); if (!std::filesystem::exists(dir)) std::filesystem::create_directories(dir); return std::filesystem::path(dir) / file_name; } -std::vector Spillable::mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data) -{ - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - - CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, shuffle_writer->output_header); - - std::vector partition_length(shuffle_writer->options.partition_num, 0); - - std::vector> spill_inputs; - spill_inputs.reserve(spill_infos.size()); - for (const auto & spill : spill_infos) - { - // only use readBig - spill_inputs.emplace_back(std::make_shared(spill.spilled_file, 0)); - } - - Stopwatch write_time_watch; - Stopwatch io_time_watch; - Stopwatch serialization_time_watch; - size_t merge_io_time = 0; - String buffer; - for (size_t partition_id = 0; partition_id < split_options.partition_num; ++partition_id) - { - auto size_before = data_file.count(); - - io_time_watch.restart(); - for (size_t i = 0; i < spill_infos.size(); ++i) - { - if (!spill_infos[i].partition_spill_infos.contains(partition_id)) - { - continue; - } - size_t size = spill_infos[i].partition_spill_infos[partition_id].second; - size_t offset = spill_infos[i].partition_spill_infos[partition_id].first; - if (!size) - { - continue; - } - buffer.reserve(size); - auto count = spill_inputs[i]->readBigAt(buffer.data(), size, offset, nullptr); - - chassert(count == size); - data_file.write(buffer.data(), count); - } - merge_io_time += io_time_watch.elapsedNanoseconds(); - - serialization_time_watch.restart(); - if (!extra_data.partition_block_buffer.empty() && !extra_data.partition_block_buffer[partition_id]->empty()) - { - Block block = extra_data.partition_block_buffer[partition_id]->releaseColumns(); - extra_data.partition_buffer[partition_id]->addBlock(std::move(block)); - } - if (!extra_data.partition_buffer.empty()) - { - size_t raw_size = extra_data.partition_buffer[partition_id]->spill(writer); - shuffle_writer->split_result.raw_partition_lengths[partition_id] += raw_size; - } - compressed_output.sync(); - partition_length[partition_id] = data_file.count() - size_before; - shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_written += partition_length[partition_id]; - } - - shuffle_writer->split_result.total_write_time += write_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_io_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_serialize_time = shuffle_writer->split_result.total_serialize_time - - shuffle_writer->split_result.total_io_time - shuffle_writer->split_result.total_compress_time; - shuffle_writer->split_result.total_io_time += merge_io_time; - - for (const auto & spill : spill_infos) - std::filesystem::remove(spill.spilled_file); - return partition_length; -} - void SortBasedPartitionWriter::write(const PartitionInfo & info, DB::Block & block) { Stopwatch write_time_watch; @@ -295,36 +222,30 @@ void SortBasedPartitionWriter::write(const PartitionInfo & info, DB::Block & blo accumulated_blocks.emplace_back(std::move(chunk)); current_accumulated_bytes += accumulated_blocks.back().allocatedBytes(); current_accumulated_rows += accumulated_blocks.back().getNumRows(); - shuffle_writer->split_result.total_write_time += write_time_watch.elapsedNanoseconds(); + split_result->total_write_time += write_time_watch.elapsedNanoseconds(); if (worthToSpill(current_accumulated_bytes)) evictPartitions(); } -LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter * shuffle_writer_) : PartitionWriter(shuffle_writer_, getLogger("LocalPartitionWriter")), Spillable(shuffle_writer_->options) +LocalPartitionWriter::LocalPartitionWriter(const SplitOptions & options) + : PartitionWriter(options, getLogger("LocalPartitionWriter")) + , Spillable(options) { } -void LocalPartitionWriter::stop() -{ - WriteBufferFromFile output(options->data_file, options->io_buffer_size); - auto offsets = mergeSpills(shuffle_writer, output, {partition_block_buffer, partition_buffer}); - shuffle_writer->split_result.partition_lengths = offsets; -} - -PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPtr logger_) - : shuffle_writer(shuffle_writer_) - , options(&shuffle_writer->options) - , partition_block_buffer(options->partition_num) - , partition_buffer(options->partition_num) - , last_partition_id(options->partition_num - 1) +PartitionWriter::PartitionWriter(const SplitOptions & options, LoggerPtr logger_) + : options(options) + , partition_block_buffer(options.partition_num) + , partition_buffer(options.partition_num) + , last_partition_id(options.partition_num - 1) , logger(logger_) { - for (size_t partition_id = 0; partition_id < options->partition_num; ++partition_id) + for (size_t partition_id = 0; partition_id < options.partition_num; ++partition_id) { - partition_block_buffer[partition_id] = std::make_shared(options->split_size); + partition_block_buffer[partition_id] = std::make_shared(options.split_size); partition_buffer[partition_id] = std::make_shared(); } - settings.loadFromContext(SerializedPlanParser::global_context); + settings = MemoryConfig::loadFromContext(SerializedPlanParser::global_context); } size_t PartitionWriter::bytes() const @@ -351,9 +272,9 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() if (accumulated_blocks.empty()) return; auto file = getNextSpillFile(); - WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); + WriteBufferFromFile output(file, options.io_buffer_size); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); + CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size); NativeWriter writer(compressed_output, output_header); SpillInfo info; @@ -362,7 +283,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() Stopwatch serialization_time_watch; MergeSorter sorter(sort_header, std::move(accumulated_blocks), sort_description, adaptiveBlockSize(), 0); size_t cur_partition_id = 0; - info.partition_spill_infos[cur_partition_id] = {0,0}; + info.partition_spill_infos[cur_partition_id] = {0, 0}; while (auto data = sorter.read()) { Block serialized_block = sort_header.cloneWithColumns(data.detachColumns()); @@ -374,7 +295,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() auto last_idx = searchLastPartitionIdIndex(partitions, row_offset, cur_partition_id); if (last_idx < 0) { - auto& last = info.partition_spill_infos[cur_partition_id]; + auto & last = info.partition_spill_infos[cur_partition_id]; compressed_output.sync(); last.second = output.count() - last.first; cur_partition_id++; @@ -385,7 +306,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() if (row_offset == 0 && last_idx == serialized_block.rows() - 1) { auto count = writer.write(serialized_block); - shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count; + split_result->raw_partition_lengths[cur_partition_id] += count; break; } else @@ -393,11 +314,11 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() auto cut_block = serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1); auto count = writer.write(cut_block); - shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count; + split_result->raw_partition_lengths[cur_partition_id] += count; row_offset = last_idx + 1; if (last_idx != serialized_block.rows() - 1) { - auto& last = info.partition_spill_infos[cur_partition_id]; + auto & last = info.partition_spill_infos[cur_partition_id]; compressed_output.sync(); last.second = output.count() - last.first; cur_partition_id++; @@ -407,39 +328,33 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() } } compressed_output.sync(); - auto& last = info.partition_spill_infos[cur_partition_id]; + auto & last = info.partition_spill_infos[cur_partition_id]; last.second = output.count() - last.first; spilled_bytes = current_accumulated_bytes; res = current_accumulated_bytes; current_accumulated_bytes = 0; current_accumulated_rows = 0; - std::erase_if(info.partition_spill_infos, [](const auto & item) - { - auto const& [key, value] = item; - return value.second == 0; - }); + std::erase_if( + info.partition_spill_infos, + [](const auto & item) + { + auto const & [key, value] = item; + return value.second == 0; + }); spill_infos.emplace_back(info); - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_io_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + split_result->total_compress_time += compressed_output.getCompressTime(); + split_result->total_io_time += compressed_output.getWriteTime(); + split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; Stopwatch spill_time_watch; spill_to_file(); - shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; + split_result->total_spill_time += spill_time_watch.elapsedNanoseconds(); + split_result->total_bytes_spilled += spilled_bytes; LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds()); return res; } -void MemorySortLocalPartitionWriter::stop() -{ - evictPartitions(); - WriteBufferFromFile output(options->data_file, options->io_buffer_size); - auto offsets = mergeSpills(shuffle_writer, output); - shuffle_writer->split_result.partition_lengths = offsets; -} - size_t MemorySortCelebornPartitionWriter::evictPartitions() { size_t res = 0; @@ -453,22 +368,22 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, shuffle_writer->output_header); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); + CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size); + NativeWriter writer(compressed_output, output_header); MergeSorter sorter(sort_header, std::move(accumulated_blocks), sort_description, adaptiveBlockSize(), 0); size_t cur_partition_id = 0; auto push_to_celeborn = [&]() { compressed_output.sync(); - auto& data = output.str(); + auto & data = output.str(); if (!data.empty()) { Stopwatch push_time_watch; celeborn_client->pushPartitionData(cur_partition_id, data.data(), data.size()); - shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size(); + split_result->total_io_time += push_time_watch.elapsedNanoseconds(); + split_result->partition_lengths[cur_partition_id] += data.size(); } output.restart(); }; @@ -492,12 +407,12 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() if (row_offset == 0 && last_idx == serialized_block.rows() - 1) { auto count = writer.write(serialized_block); - shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count; + split_result->raw_partition_lengths[cur_partition_id] += count; break; } auto cut_block = serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1); auto count = writer.write(cut_block); - shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count; + split_result->raw_partition_lengths[cur_partition_id] += count; row_offset = last_idx + 1; if (last_idx != serialized_block.rows() - 1) { @@ -512,33 +427,29 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() current_accumulated_bytes = 0; current_accumulated_rows = 0; - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_io_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + split_result->total_compress_time += compressed_output.getCompressTime(); + split_result->total_io_time += compressed_output.getWriteTime(); + split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; Stopwatch spill_time_watch; spill_to_celeborn(); - shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; + split_result->total_spill_time += spill_time_watch.elapsedNanoseconds(); + split_result->total_bytes_spilled += spilled_bytes; LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds()); return res; } -void MemorySortCelebornPartitionWriter::stop() -{ - evictPartitions(); -} - -CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) - : PartitionWriter(shuffleWriter, getLogger("CelebornPartitionWriter")), celeborn_client(std::move(celeborn_client_)) +CelebornPartitionWriter::CelebornPartitionWriter(const SplitOptions & options, std::unique_ptr celeborn_client_) + : PartitionWriter(options, getLogger("CelebornPartitionWriter")) + , celeborn_client(std::move(celeborn_client_)) { } size_t CelebornPartitionWriter::evictPartitions() { size_t res = 0; - for (size_t partition_id = 0; partition_id < options->partition_num; ++partition_id) + for (size_t partition_id = 0; partition_id < options.partition_num; ++partition_id) res += evictSinglePartition(partition_id); return res; } @@ -564,9 +475,9 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, shuffle_writer->output_header); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); + CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size); + NativeWriter writer(compressed_output, output_header); spilled_bytes += buffer->bytes(); size_t written_bytes = buffer->spill(writer); @@ -579,30 +490,23 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) Stopwatch push_time_watch; celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); - shuffle_writer->split_result.partition_lengths[partition_id] += output.str().size(); - shuffle_writer->split_result.raw_partition_lengths[partition_id] += written_bytes; - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_write_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_write_time += push_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + split_result->partition_lengths[partition_id] += output.str().size(); + split_result->raw_partition_lengths[partition_id] += written_bytes; + split_result->total_compress_time += compressed_output.getCompressTime(); + split_result->total_write_time += compressed_output.getWriteTime(); + split_result->total_write_time += push_time_watch.elapsedNanoseconds(); + split_result->total_io_time += push_time_watch.elapsedNanoseconds(); + split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; Stopwatch spill_time_watch; spill_to_celeborn(); - shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; + split_result->total_spill_time += spill_time_watch.elapsedNanoseconds(); + split_result->total_bytes_spilled += spilled_bytes; LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds()); return res; } -void CelebornPartitionWriter::stop() -{ - evictPartitions(); - for (const auto & length : shuffle_writer->split_result.partition_lengths) - shuffle_writer->split_result.total_bytes_written += length; -} - void Partition::addBlock(DB::Block block) { /// Do not insert empty blocks, otherwise will cause the shuffle read terminate early. @@ -628,7 +532,4 @@ size_t Partition::spill(NativeWriter & writer) cached_bytes = 0; return written_bytes; } - - - -} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 78eb845e1db12..3ca251abea957 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -64,17 +63,27 @@ class CachedShuffleWriter; using PartitionPtr = std::shared_ptr; class PartitionWriter : boost::noncopyable { +friend class Spillable; public: - explicit PartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPtr logger_); + PartitionWriter(const SplitOptions& options, LoggerPtr logger_); virtual ~PartitionWriter() = default; + void initialize(SplitResult * split_result_, const Block & output_header_) + { + chassert(split_result); + split_result = split_result_; + split_result->partition_lengths.resize(options.partition_num); + split_result->raw_partition_lengths.resize(options.partition_num); + output_header = output_header_; + init = true; + } virtual String getName() const = 0; virtual void write(const PartitionInfo & info, DB::Block & block); - virtual void stop() = 0; + virtual bool useRSSPusher() const = 0; + virtual size_t evictPartitions() = 0; protected: - virtual size_t evictPartitions() = 0; size_t bytes() const; @@ -87,8 +96,7 @@ class PartitionWriter : boost::noncopyable throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Evict single partition is not supported for {}", getName()); } - CachedShuffleWriter * shuffle_writer; - const SplitOptions * options; + const SplitOptions & options; MemoryConfig settings; std::vector partition_block_buffer; @@ -96,7 +104,10 @@ class PartitionWriter : boost::noncopyable /// Only valid in celeborn partition writer size_t last_partition_id; + SplitResult* split_result = nullptr; + Block output_header; LoggerPtr logger = nullptr; + bool init = false; }; class Spillable @@ -108,38 +119,41 @@ class Spillable std::vector partition_buffer; }; - Spillable(SplitOptions options_) : split_options(std::move(options_)) {} + Spillable(const SplitOptions& options_) : spill_options(options_) {} virtual ~Spillable() = default; + const std::vector & getSpillInfos() const + { + return spill_infos; + } protected: String getNextSpillFile(); - std::vector mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data = {}); std::vector spill_infos; - -private: - const SplitOptions split_options; + const SplitOptions& spill_options; }; class LocalPartitionWriter : public PartitionWriter, public Spillable { public: - explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); + explicit LocalPartitionWriter(const SplitOptions& options); ~LocalPartitionWriter() override = default; String getName() const override { return "LocalPartitionWriter"; } - + ExtraData getExtraData() + { + return {partition_block_buffer, partition_buffer}; + } size_t evictPartitions() override; - void stop() override; - + bool useRSSPusher() const override { return false; } }; class SortBasedPartitionWriter : public PartitionWriter { protected: - explicit SortBasedPartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPtr logger) : PartitionWriter(shuffle_writer_, logger) + explicit SortBasedPartitionWriter(const SplitOptions& options, LoggerPtr logger) : PartitionWriter(options, logger) { - max_merge_block_size = options->split_size; - max_sort_buffer_size = options->max_sort_buffer_size; + max_merge_block_size = options.split_size; + max_sort_buffer_size = options.max_sort_buffer_size; max_merge_block_bytes = SerializedPlanParser::global_context->getSettingsRef().prefer_external_sort_block_bytes; } public: @@ -170,8 +184,8 @@ class SortBasedPartitionWriter : public PartitionWriter class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public Spillable { public: - explicit MemorySortLocalPartitionWriter(CachedShuffleWriter* shuffle_writer_) - : SortBasedPartitionWriter(shuffle_writer_, getLogger("MemorySortLocalPartitionWriter")), Spillable(shuffle_writer_->options) + explicit MemorySortLocalPartitionWriter(const SplitOptions& options) + : SortBasedPartitionWriter(options, getLogger("MemorySortLocalPartitionWriter")), Spillable(options) { } @@ -179,23 +193,22 @@ class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public S String getName() const override { return "MemorySortLocalPartitionWriter"; } size_t evictPartitions() override; - void stop() override; + bool useRSSPusher() const override { return false; } }; class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter { public: - explicit MemorySortCelebornPartitionWriter(CachedShuffleWriter* shuffle_writer_, std::unique_ptr celeborn_client_) - : SortBasedPartitionWriter(shuffle_writer_, getLogger("MemorySortCelebornPartitionWriter")), celeborn_client(std::move(celeborn_client_)) + explicit MemorySortCelebornPartitionWriter(const SplitOptions& options, std::unique_ptr celeborn_client_) + : SortBasedPartitionWriter(options, getLogger("MemorySortCelebornPartitionWriter")), celeborn_client(std::move(celeborn_client_)) { } String getName() const override { return "MemorySortCelebornPartitionWriter"; } ~MemorySortCelebornPartitionWriter() override = default; - void stop() override; + bool useRSSPusher() const override { return true; } -protected: size_t evictPartitions() override; private: std::unique_ptr celeborn_client; @@ -204,13 +217,14 @@ class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter class CelebornPartitionWriter : public PartitionWriter { public: - CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); + CelebornPartitionWriter(const SplitOptions& options, std::unique_ptr celeborn_client); ~CelebornPartitionWriter() override = default; String getName() const override { return "CelebornPartitionWriter"; } - void stop() override; -protected: + bool useRSSPusher() const override { return true; } size_t evictPartitions() override; + +protected: bool supportsEvictSinglePartition() const override { return true; } size_t evictSinglePartition(size_t partition_id) override; private: diff --git a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h index d398362aa4b64..85442b01fab34 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h @@ -19,16 +19,16 @@ #include #include #include -#include -#include #include -#include -#include #include -#include #include +namespace local_engine +{ + class SparkExechangeManager; +} + namespace local_engine { struct SplitOptions @@ -94,6 +94,9 @@ struct SplitResult UInt64 total_split_time = 0; // Total nanoseconds to execute CachedShuffleWriter::split, excluding total_compute_pid_time UInt64 total_io_time = 0; // Total nanoseconds to write data to local/celeborn, excluding the time writing to buffer UInt64 total_serialize_time = 0; // Total nanoseconds to execute spill_to_file/spill_to_celeborn. Bad naming, it works not as the name suggests. + UInt64 total_rows = 0; + UInt64 total_blocks = 0; + UInt64 wall_time = 0; // Wall nanoseconds time of shuffle. String toString() const { @@ -114,7 +117,7 @@ struct SplitResult struct SplitterHolder { - std::unique_ptr splitter; + std::unique_ptr exechange_manager; }; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h b/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h deleted file mode 100644 index 4c2eab853febd..0000000000000 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once -#include - -namespace local_engine -{ -struct SplitResult; -class ShuffleWriterBase -{ -public: - virtual ~ShuffleWriterBase() = default; - - virtual void split(DB::Block & block) = 0; - virtual size_t evictPartitions() { return 0; } - virtual SplitResult stop() = 0; -}; -} diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp new file mode 100644 index 0000000000000..ee476b2bda8ba --- /dev/null +++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SparkExchangeSink.h" + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} +} + +using namespace DB; + +namespace local_engine +{ +void SparkExchangeSink::consume(Chunk chunk) +{ + Stopwatch wall_time; + if (chunk.getNumRows() == 0) + return; + split_result.total_blocks += 1; + split_result.total_rows += chunk.getNumRows(); + auto aggregate_info = chunk.getChunkInfos().get(); + auto intput = inputs.front().getHeader().cloneWithColumns(chunk.getColumns()); + Stopwatch split_time_watch; + split_result.total_split_time += split_time_watch.elapsedNanoseconds(); + + Stopwatch compute_pid_time_watch; + PartitionInfo partition_info = partitioner->build(intput); + split_result.total_compute_pid_time += compute_pid_time_watch.elapsedNanoseconds(); + + Block out_block; + for (size_t col_i = 0; col_i < output_header.columns(); ++col_i) + { + out_block.insert(intput.getByPosition(output_columns_indicies[col_i])); + } + if (aggregate_info) + { + out_block.info.is_overflows = aggregate_info->is_overflows; + out_block.info.bucket_num = aggregate_info->bucket_num; + } + partition_writer->write(partition_info, out_block); + split_result.wall_time += wall_time.elapsedNanoseconds(); +} + +void SparkExchangeSink::onFinish() +{ + Stopwatch wall_time; + if (!dynamic_cast(partition_writer.get())) + { + partition_writer->evictPartitions(); + } + split_result.wall_time += wall_time.elapsedNanoseconds(); +} + +void SparkExchangeSink::initOutputHeader(const Block & block) +{ + if (!output_header) + { + if (output_columns_indicies.empty()) + { + output_header = block.cloneEmpty(); + for (size_t i = 0; i < block.columns(); ++i) + output_columns_indicies.push_back(i); + } + else + { + ColumnsWithTypeAndName cols; + for (const auto & index : output_columns_indicies) + cols.push_back(block.getByPosition(index)); + + output_header = Block(std::move(cols)); + } + } +} + +SparkExechangeManager::SparkExechangeManager(const Block& header, const String & short_name, const SplitOptions & options_, jobject rss_pusher): input_header(header), options(options_) +{ + if (rss_pusher) + { + GET_JNIENV(env) + jclass celeborn_partition_pusher_class = + CreateGlobalClassReference(env, "Lorg/apache/spark/shuffle/CelebornPartitionPusher;"); + jmethodID celeborn_push_partition_data_method = + GetMethodID(env, celeborn_partition_pusher_class, "pushPartitionData", "(I[BI)I"); + CLEAN_JNIENV + celeborn_client = std::make_unique(rss_pusher, celeborn_push_partition_data_method); + use_rss = true; + } + if (!partitioner_creators.contains(short_name)) + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "unsupported splitter {}", short_name); + } + partitioner_creator = partitioner_creators[short_name]; + Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); + for (const auto & iter : output_column_tokenizer) + output_columns_indicies.push_back(std::stoi(iter)); + auto overhead_memory = header.columns() * 16 * options.split_size * options.partition_num; + use_sort_shuffle = overhead_memory > options.spill_threshold * 0.5 || options.partition_num >= 300 || options.force_memory_sort; + + split_result.partition_lengths.resize(options.partition_num, 0); + split_result.raw_partition_lengths.resize(options.partition_num, 0); +} + +std::shared_ptr createPartitionWriter(const SplitOptions& options, bool use_sort_shuffle, std::unique_ptr celeborn_client) +{ + if (celeborn_client) + { + if (use_sort_shuffle) + return std::make_shared(options, std::move(celeborn_client)); + return std::make_shared(options, std::move(celeborn_client)); + } + if (use_sort_shuffle) + return std::make_shared(options); + return std::make_shared(options); +} + +void SparkExechangeManager::initSinks(size_t num) +{ + if (num > 1 && celeborn_client) + { + throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "CelebornClient can't be used with multiple sinks"); + } + sinks.resize(num); + partition_writers.resize(num); + for (size_t i = 0; i < num; ++i) + { + partition_writers[i] = createPartitionWriter(options, use_sort_shuffle, std::move(celeborn_client)); + sinks[i] = std::make_shared(input_header, partitioner_creator(options), partition_writers[i], output_columns_indicies); + } +} + +void SparkExechangeManager::setSinksToPipeline(DB::QueryPipelineBuilder & pipeline) const +{ + size_t count = 0; + Pipe::ProcessorGetterWithStreamKind getter = [&](const Block & header, Pipe::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == Pipe::StreamType::Main) + { + return std::dynamic_pointer_cast(sinks[count++]); + } + return std::make_shared(header); + }; + pipeline.setSinks(getter); +} + +SelectBuilderPtr SparkExechangeManager::createRoundRobinSelectorBuilder(const SplitOptions & options_) +{ + return std::make_unique(options_.partition_num); +} + +SelectBuilderPtr SparkExechangeManager::createHashSelectorBuilder(const SplitOptions & options_) +{ + Poco::StringTokenizer expr_list(options_.hash_exprs, ","); + std::vector hash_fields; + for (const auto & expr : expr_list) + { + hash_fields.push_back(std::stoi(expr)); + } + return std::make_unique(options_.partition_num, hash_fields, options_.hash_algorithm); +} + +SelectBuilderPtr SparkExechangeManager::createSingleSelectorBuilder(const SplitOptions & options_) +{ + chassert(options_.partition_num == 1); + return std::make_unique(options_.partition_num); +} + +SelectBuilderPtr SparkExechangeManager::createRangeSelectorBuilder(const SplitOptions & options_) +{ + return std::make_unique(options_.hash_exprs, options_.partition_num); +} + +void SparkExechangeManager::finish() +{ + Stopwatch wall_time; + mergeSplitResult(); + auto infos = gatherAllSpillInfo(); + WriteBufferFromFile output(options.data_file, options.io_buffer_size); + std::vector extra_datas; + for (const auto & writer : partition_writers) + { + LocalPartitionWriter * local_partition_writer = dynamic_cast(writer.get()); + if (local_partition_writer) + { + extra_datas.emplace_back(local_partition_writer->getExtraData()); + } + } + if (!extra_datas.empty()) + chassert(extra_datas.size() == partition_writers.size()); + split_result.partition_lengths = mergeSpills(output, infos, extra_datas); + split_result.wall_time += wall_time.elapsedNanoseconds(); +} + +void SparkExechangeManager::mergeSplitResult() +{ + for (const auto & sink : sinks) + { + auto split_result = sink->getSplitResultCopy(); + this->split_result.total_bytes_written += split_result.total_bytes_written; + this->split_result.total_bytes_spilled += split_result.total_bytes_spilled; + this->split_result.total_compress_time += split_result.total_compress_time; + this->split_result.total_spill_time += split_result.total_spill_time; + this->split_result.total_write_time += split_result.total_write_time; + this->split_result.total_compute_pid_time += split_result.total_compute_pid_time; + this->split_result.total_split_time += split_result.total_split_time; + this->split_result.total_io_time += split_result.total_io_time; + this->split_result.total_serialize_time += split_result.total_serialize_time; + this->split_result.total_rows += split_result.total_rows; + this->split_result.total_blocks += split_result.total_blocks; + this->split_result.wall_time += split_result.wall_time; + } +} + +std::vector SparkExechangeManager::gatherAllSpillInfo() +{ + std::vector res; + for (const auto& writer : partition_writers) + { + if (Spillable * spillable = dynamic_cast(writer.get())) + { + for (const auto & info : spillable->getSpillInfos()) + res.emplace_back(info); + } + } + return res; +} + +std::vector SparkExechangeManager::mergeSpills(DB::WriteBuffer & data_file, const std::vector& spill_infos, const std::vector & extra_datas) +{ + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); + + CompressedWriteBuffer compressed_output(data_file, codec, options.io_buffer_size); + NativeWriter writer(compressed_output, sinks.front()->getOutputHeaderCopy()); + + std::vector partition_length(options.partition_num, 0); + + std::vector> spill_inputs; + + spill_inputs.reserve(spill_infos.size()); + for (const auto & spill : spill_infos) + { + // only use readBig + spill_inputs.emplace_back(std::make_shared(spill.spilled_file, 0)); + } + + Stopwatch write_time_watch; + Stopwatch io_time_watch; + Stopwatch serialization_time_watch; + size_t merge_io_time = 0; + String buffer; + for (size_t partition_id = 0; partition_id < options.partition_num; ++partition_id) + { + auto size_before = data_file.count(); + + io_time_watch.restart(); + for (size_t i = 0; i < spill_infos.size(); ++i) + { + if (!spill_infos[i].partition_spill_infos.contains(partition_id)) + { + continue; + } + size_t size = spill_infos[i].partition_spill_infos.at(partition_id).second; + size_t offset = spill_infos[i].partition_spill_infos.at(partition_id).first; + if (!size) + { + continue; + } + buffer.reserve(size); + auto count = spill_inputs[i]->readBigAt(buffer.data(), size, offset, nullptr); + + chassert(count == size); + data_file.write(buffer.data(), count); + } + merge_io_time += io_time_watch.elapsedNanoseconds(); + + serialization_time_watch.restart(); + if (!extra_datas.empty()) + { + for (const auto & extra_data : extra_datas) + { + if (!extra_data.partition_block_buffer.empty() && !extra_data.partition_block_buffer[partition_id]->empty()) + { + Block block = extra_data.partition_block_buffer[partition_id]->releaseColumns(); + extra_data.partition_buffer[partition_id]->addBlock(std::move(block)); + } + if (!extra_data.partition_buffer.empty()) + { + size_t raw_size = extra_data.partition_buffer[partition_id]->spill(writer); + split_result.raw_partition_lengths[partition_id] += raw_size; + } + } + } + + compressed_output.sync(); + partition_length[partition_id] = data_file.count() - size_before; + split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + split_result.total_bytes_written += partition_length[partition_id]; + } + split_result.total_write_time += write_time_watch.elapsedNanoseconds(); + split_result.total_compress_time += compressed_output.getCompressTime(); + split_result.total_io_time += compressed_output.getWriteTime(); + split_result.total_serialize_time = split_result.total_serialize_time + - split_result.total_io_time - split_result.total_compress_time; + split_result.total_io_time += merge_io_time; + + for (const auto & spill : spill_infos) + std::filesystem::remove(spill.spilled_file); + return partition_length; +} + +std::unordered_map SparkExechangeManager::partitioner_creators = { + {"rr", createRoundRobinSelectorBuilder}, + {"hash", createHashSelectorBuilder}, + {"single", createSingleSelectorBuilder}, + {"range", createRangeSelectorBuilder}, +}; +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h new file mode 100644 index 0000000000000..971846816bac2 --- /dev/null +++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +namespace DB +{ + class QueryPipelineBuilder; +} + +namespace local_engine +{ +class CelebornClient; +class PartitionWriter; + +class SparkExchangeSink : public DB::ISink +{ +public: + SparkExchangeSink(const DB::Block& header, std::unique_ptr partitioner_, std::shared_ptr partition_writer_, + const std::vector & output_columns_indicies_) + : DB::ISink(header) + , partitioner(std::move(partitioner_)) + , partition_writer(partition_writer_) + , output_columns_indicies(output_columns_indicies_) + { + initOutputHeader(header); + partition_writer->initialize(&split_result, output_header); + } + + String getName() const override + { + return "SparkExchangeSink"; + } + + SplitResult getSplitResultCopy() const + { + return split_result; + } + + DB::Block getOutputHeaderCopy() const + { + return output_header.cloneEmpty(); + } + +protected: + void consume(DB::Chunk block) override; + void onFinish() override; + +private: + void initOutputHeader(const DB::Block& block); + + DB::Block output_header; + std::unique_ptr partitioner; + std::shared_ptr partition_writer; + std::vector output_columns_indicies; + SplitResult split_result; +}; + +using SelectBuilderPtr = std::unique_ptr; +using SelectBuilderCreator = std::function; + +class SparkExechangeManager +{ +public: + SparkExechangeManager(const DB::Block& header, const String & short_name, const SplitOptions & options_, jobject rss_pusher = nullptr); + void initSinks(size_t num); + void setSinksToPipeline(DB::QueryPipelineBuilder & pipeline) const; + void finish(); + [[nodiscard]] SplitResult getSplitResult() const + { + return split_result; + } + +private: + static SelectBuilderPtr createRoundRobinSelectorBuilder(const SplitOptions & options_); + static SelectBuilderPtr createHashSelectorBuilder(const SplitOptions & options_); + static SelectBuilderPtr createSingleSelectorBuilder(const SplitOptions & options_); + static SelectBuilderPtr createRangeSelectorBuilder(const SplitOptions & options_); + static std::unordered_map partitioner_creators; + + void mergeSplitResult(); + std::vector gatherAllSpillInfo(); + std::vector mergeSpills(DB::WriteBuffer & data_file, const std::vector& spill_infos, const std::vector & extra_datas = {}); + + DB::Block input_header; + std::vector> sinks; + std::vector> partition_writers; + std::unique_ptr celeborn_client = nullptr; + SplitOptions options; + SelectBuilderCreator partitioner_creator; + std::vector output_columns_indicies; + bool use_sort_shuffle = false; + bool use_rss = false; + SplitResult split_result; +}; +} diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c4e8ec67b106a..af4ba410c3c3c 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -28,14 +28,13 @@ #include #include #include -#include #include #include #include #include #include #include -#include +#include #include #include #include @@ -124,7 +123,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "", "(J[J[II)V"); split_result_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/CHSplitResult;"); - split_result_constructor = local_engine::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[JJJJ)V"); + split_result_constructor = local_engine::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[JJJJJJJ)V"); local_engine::ShuffleReader::input_stream_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/ShuffleInputStream;"); @@ -300,6 +299,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEn LOCAL_ENGINE_JNI_METHOD_START auto * executor = reinterpret_cast(executor_address); LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); + local_engine::LocalExecutor::resetCurrentExecutor(); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } @@ -590,7 +590,13 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .max_sort_buffer_size = static_cast(max_sort_buffer_size), .force_memory_sort = static_cast(force_memory_sort)}; auto name = jstring2string(env, short_name); - local_engine::SplitterHolder * splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options)}; + auto * current_executor = local_engine::LocalExecutor::getCurrentExecutor(); + chassert(current_executor); + local_engine::SplitterHolder * splitter = new local_engine::SplitterHolder{.exechange_manager = std::make_unique(current_executor->getHeader(), name, options)}; + splitter->exechange_manager->initSinks(local_engine::QueryContextManager::instance().currentQueryContext()->getSettingsRef().max_threads); + current_executor->setSinks([&](auto & pipeline_builder) { splitter->exechange_manager->setSinksToPipeline(pipeline_builder);}); + // execute pipeline + current_executor->execute(); return reinterpret_cast(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } @@ -641,27 +647,22 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_algorithm = jstring2string(env, hash_algorithm), .force_memory_sort = static_cast(force_memory_sort)}; auto name = jstring2string(env, short_name); - local_engine::SplitterHolder * splitter; - splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options, pusher)}; + auto * current_executor = local_engine::LocalExecutor::getCurrentExecutor(); + chassert(current_executor); + local_engine::SplitterHolder * splitter = new local_engine::SplitterHolder{.exechange_manager = std::make_unique(current_executor->getHeader(), name, options, pusher)}; + splitter->exechange_manager->initSinks(local_engine::QueryContextManager::instance().currentQueryContext()->getSettingsRef().max_threads); + current_executor->setSinks([&](auto & pipeline_builder) { splitter->exechange_manager->setSinksToPipeline(pipeline_builder);}); + current_executor->execute(); return reinterpret_cast(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -JNIEXPORT void Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jlong block) -{ - LOCAL_ENGINE_JNI_METHOD_START - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - DB::Block * data = reinterpret_cast(block); - splitter->splitter->split(*data); - LOCAL_ENGINE_JNI_METHOD_END(env, ) -} - JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - auto result = splitter->splitter->stop(); + splitter->exechange_manager->finish(); + auto result = splitter->exechange_manager->getSplitResult(); const auto & partition_lengths = result.partition_lengths; auto * partition_length_arr = env->NewLongArray(partition_lengths.size()); @@ -686,7 +687,11 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_ raw_partition_length_arr, result.total_split_time, result.total_io_time, - result.total_serialize_time); + result.total_serialize_time, + result.total_rows, + result.total_blocks, + result.wall_time + ); return split_result; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index c7d7957c15b6b..2a35010626dd6 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -19,15 +19,14 @@ package org.apache.spark.shuffle import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings import org.apache.gluten.vectorized._ - import org.apache.spark._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle import org.apache.spark.sql.vectorized.ColumnarBatch - import org.apache.celeborn.client.ShuffleClient import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.protocol.ShuffleMode +import org.apache.gluten.memory.CHThreadGroup import java.io.IOException import java.util.Locale @@ -56,50 +55,7 @@ class CHCelebornColumnarShuffleWriter[K, V]( @throws[IOException] override def internalWrite(records: Iterator[Product2[K, V]]): Unit = { - while (records.hasNext) { - val cb = records.next()._2.asInstanceOf[ColumnarBatch] - if (cb.numRows == 0 || cb.numCols == 0) { - logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols") - } else { - initShuffleWriter(cb) - val col = cb.column(0).asInstanceOf[CHColumnVector] - val startTime = System.nanoTime() - jniWrapper.split(nativeShuffleWriter, col.getBlockAddress) - dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) - dep.metrics("numInputRows").add(cb.numRows) - dep.metrics("inputBatches").add(1) - // This metric is important, AQE use it to decide if EliminateLimit - writeMetrics.incRecordsWritten(cb.numRows()) - } - } - - // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1 - if (nativeShuffleWriter == -1L) { - handleEmptyIterator() - return - } - - val startTime = System.nanoTime() - splitResult = jniWrapper.stop(nativeShuffleWriter) - - dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) - dep.metrics("splitTime").add(splitResult.getSplitTime) - dep.metrics("IOTime").add(splitResult.getDiskWriteTime) - dep.metrics("serializeTime").add(splitResult.getSerializationTime) - dep.metrics("spillTime").add(splitResult.getTotalSpillTime) - dep.metrics("compressTime").add(splitResult.getTotalCompressTime) - dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime) - dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled) - dep.metrics("dataSize").add(splitResult.getTotalBytesWritten) - writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) - writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) - - partitionLengths = splitResult.getPartitionLengths - pushMergedDataToCeleborn() - mapStatus = MapStatus(blockManager.shuffleServerId, splitResult.getRawPartitionLengths, mapId) - } - - override def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = { + CHThreadGroup.registerNewThreadGroup() nativeShuffleWriter = jniWrapper.makeForRSS( dep.nativePartitioning, shuffleId, @@ -112,9 +68,41 @@ class CHCelebornColumnarShuffleWriter[K, V]( GlutenConfig.getConf.chColumnarForceMemorySortShuffle || ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType) ) + + val startTime = System.nanoTime() + splitResult = jniWrapper.stop(nativeShuffleWriter) + // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1 + if (splitResult.getTotalRows == 0) { + handleEmptyIterator() + } + else { + dep.metrics("numInputRows").add(splitResult.getTotalRows) + dep.metrics("inputBatches").add(splitResult.getTotalBatches) + writeMetrics.incRecordsWritten(splitResult.getTotalRows) + dep.metrics("splitTime").add(splitResult.getSplitTime) + dep.metrics("IOTime").add(splitResult.getDiskWriteTime) + dep.metrics("serializeTime").add(splitResult.getSerializationTime) + dep.metrics("spillTime").add(splitResult.getTotalSpillTime) + dep.metrics("compressTime").add(splitResult.getTotalCompressTime) + dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime) + dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled) + dep.metrics("dataSize").add(splitResult.getTotalBytesWritten) + dep.metrics("shuffleWallTime").add(splitResult.getWallTime) + writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) + writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) + + partitionLengths = splitResult.getPartitionLengths + pushMergedDataToCeleborn() + mapStatus = MapStatus(blockManager.shuffleServerId, splitResult.getRawPartitionLengths, mapId) + } + closeShuffleWriter() } override def closeShuffleWriter(): Unit = { - jniWrapper.close(nativeShuffleWriter) + if (nativeShuffleWriter != 0) + { + jniWrapper.close(nativeShuffleWriter) + nativeShuffleWriter = 0 + } } }