Skip to content

Commit

Permalink
Update vendored DuckDB sources to 6024b25
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Jul 12, 2024
1 parent 6024b25 commit 6561074
Show file tree
Hide file tree
Showing 162 changed files with 2,947 additions and 1,064 deletions.
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ vector<TableFunctionSet> JSONFunctions::GetTableFunctions() {

unique_ptr<TableRef> JSONFunctions::ReadJSONReplacement(ClientContext &context, ReplacementScanInput &input,
optional_ptr<ReplacementScanData> data) {
auto &table_name = input.table_name;
auto table_name = ReplacementScan::GetFullPath(input);
if (!ReplacementScan::CanReplace(table_name, {"json", "jsonl", "ndjson"})) {
return nullptr;
}
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/extension/json/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,9 @@ void JSONScan::ComplexFilterPushdown(ClientContext &context, LogicalGet &get, Fu

SimpleMultiFileList file_list(std::move(data.files));

MultiFilePushdownInfo info(get);
auto filtered_list =
MultiFileReader().ComplexFilterPushdown(context, file_list, data.options.file_options, get, filters);
MultiFileReader().ComplexFilterPushdown(context, file_list, data.options.file_options, info, filters);
if (filtered_list) {
MultiFileReader().PruneReaders(data, *filtered_list);
data.files = filtered_list->GetAllFiles();
Expand Down
70 changes: 50 additions & 20 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,18 @@ struct ParquetFileReaderData {
};

struct ParquetReadGlobalState : public GlobalTableFunctionState {
explicit ParquetReadGlobalState(MultiFileList &file_list_p) : file_list(file_list_p) {
}
explicit ParquetReadGlobalState(unique_ptr<MultiFileList> owned_file_list_p)
: file_list(*owned_file_list_p), owned_file_list(std::move(owned_file_list_p)) {
}

//! The file list to scan
MultiFileList &file_list;
//! The scan over the file_list
MultiFileListScanData file_list_scan;
//! Owned multi file list - if filters have been dynamically pushed into the reader
unique_ptr<MultiFileList> owned_file_list;

unique_ptr<MultiFileReaderGlobalState> multi_file_reader_state;

Expand All @@ -156,7 +166,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {
vector<idx_t> projection_ids;
vector<LogicalType> scanned_types;
vector<column_t> column_ids;
TableFilterSet *filters;
optional_ptr<TableFilterSet> filters;

idx_t MaxThreads() const override {
return max_threads;
Expand Down Expand Up @@ -613,7 +623,7 @@ class ParquetScanFunction {
auto &bind_data = bind_data_p->Cast<ParquetReadBindData>();
auto &gstate = global_state->Cast<ParquetReadGlobalState>();

auto total_count = bind_data.file_list->GetTotalFileCount();
auto total_count = gstate.file_list.GetTotalFileCount();
if (total_count == 0) {
return 100.0;
}
Expand Down Expand Up @@ -643,16 +653,37 @@ class ParquetScanFunction {
return std::move(result);
}

static unique_ptr<MultiFileList> ParquetDynamicFilterPushdown(ClientContext &context,
const ParquetReadBindData &data,
const vector<column_t> &column_ids,
optional_ptr<TableFilterSet> filters) {
if (!filters) {
return nullptr;
}
auto new_list = data.multi_file_reader->DynamicFilterPushdown(
context, *data.file_list, data.parquet_options.file_options, data.names, data.types, column_ids, *filters);
return new_list;
}

static unique_ptr<GlobalTableFunctionState> ParquetScanInitGlobal(ClientContext &context,
TableFunctionInitInput &input) {
auto &bind_data = input.bind_data->CastNoConst<ParquetReadBindData>();
auto result = make_uniq<ParquetReadGlobalState>();
bind_data.file_list->InitializeScan(result->file_list_scan);
unique_ptr<ParquetReadGlobalState> result;

// before instantiating a scan trigger a dynamic filter pushdown if possible
auto new_list = ParquetDynamicFilterPushdown(context, bind_data, input.column_ids, input.filters);
if (new_list) {
result = make_uniq<ParquetReadGlobalState>(std::move(new_list));
} else {
result = make_uniq<ParquetReadGlobalState>(*bind_data.file_list);
}
auto &file_list = result->file_list;
file_list.InitializeScan(result->file_list_scan);

result->multi_file_reader_state = bind_data.multi_file_reader->InitializeGlobalState(
context, bind_data.parquet_options.file_options, bind_data.reader_bind, *bind_data.file_list,
bind_data.types, bind_data.names, input.column_ids);
if (bind_data.file_list->IsEmpty()) {
context, bind_data.parquet_options.file_options, bind_data.reader_bind, file_list, bind_data.types,
bind_data.names, input.column_ids);
if (file_list.IsEmpty()) {
result->readers = {};
} else if (!bind_data.union_readers.empty()) {
// TODO: confirm we are not changing behaviour by modifying the order here?
Expand All @@ -662,26 +693,24 @@ class ParquetScanFunction {
}
result->readers.push_back(make_uniq<ParquetFileReaderData>(std::move(reader)));
}
if (result->readers.size() != bind_data.file_list->GetTotalFileCount()) {
if (result->readers.size() != file_list.GetTotalFileCount()) {
// This case happens with recursive CTEs: the first execution the readers have already
// been moved out of the bind data.
// FIXME: clean up this process and make it more explicit
result->readers = {};
}
} else if (bind_data.initial_reader) {
// Ensure the initial reader was actually constructed from the first file
if (bind_data.initial_reader->file_name != bind_data.file_list->GetFirstFile()) {
throw InternalException("First file from list ('%s') does not match first reader ('%s')",
bind_data.initial_reader->file_name, bind_data.file_list->GetFirstFile());
// we can only use the initial reader if it was constructed from the first file
if (bind_data.initial_reader->file_name == file_list.GetFirstFile()) {
result->readers.push_back(make_uniq<ParquetFileReaderData>(std::move(bind_data.initial_reader)));
}
result->readers.push_back(make_uniq<ParquetFileReaderData>(std::move(bind_data.initial_reader)));
}

// Ensure all readers are initialized and FileListScan is sync with readers list
for (auto &reader_data : result->readers) {
string file_name;
idx_t file_idx = result->file_list_scan.current_file_idx;
bind_data.file_list->Scan(result->file_list_scan, file_name);
file_list.Scan(result->file_list_scan, file_name);
if (reader_data->union_data) {
if (file_name != reader_data->union_data->GetFileName()) {
throw InternalException("Mismatch in filename order and union reader order in parquet scan");
Expand Down Expand Up @@ -829,9 +858,9 @@ class ParquetScanFunction {

// Queries the metadataprovider for another file to scan, updating the files/reader lists in the process.
// Returns true if resized
static bool ResizeFiles(const ParquetReadBindData &bind_data, ParquetReadGlobalState &parallel_state) {
static bool ResizeFiles(ParquetReadGlobalState &parallel_state) {
string scanned_file;
if (!bind_data.file_list->Scan(parallel_state.file_list_scan, scanned_file)) {
if (!parallel_state.file_list.Scan(parallel_state.file_list_scan, scanned_file)) {
return false;
}

Expand All @@ -852,7 +881,7 @@ class ParquetScanFunction {
return false;
}

if (parallel_state.file_index >= parallel_state.readers.size() && !ResizeFiles(bind_data, parallel_state)) {
if (parallel_state.file_index >= parallel_state.readers.size() && !ResizeFiles(parallel_state)) {
return false;
}

Expand Down Expand Up @@ -895,8 +924,9 @@ class ParquetScanFunction {
vector<unique_ptr<Expression>> &filters) {
auto &data = bind_data_p->Cast<ParquetReadBindData>();

MultiFilePushdownInfo info(get);
auto new_list = data.multi_file_reader->ComplexFilterPushdown(context, *data.file_list,
data.parquet_options.file_options, get, filters);
data.parquet_options.file_options, info, filters);

if (new_list) {
data.file_list = std::move(new_list);
Expand Down Expand Up @@ -938,7 +968,7 @@ class ParquetScanFunction {

for (idx_t i = parallel_state.file_index; i < file_index_limit; i++) {
// We check if we can resize files in this loop too otherwise we will only ever open 1 file ahead
if (i >= parallel_state.readers.size() && !ResizeFiles(bind_data, parallel_state)) {
if (i >= parallel_state.readers.size() && !ResizeFiles(parallel_state)) {
return false;
}

Expand Down Expand Up @@ -1471,7 +1501,7 @@ bool ParquetWriteRotateNextFile(GlobalFunctionData &gstate, FunctionData &bind_d
//===--------------------------------------------------------------------===//
unique_ptr<TableRef> ParquetScanReplacement(ClientContext &context, ReplacementScanInput &input,
optional_ptr<ReplacementScanData> data) {
auto &table_name = input.table_name;
auto table_name = ReplacementScan::GetFullPath(input);
if (!ReplacementScan::CanReplace(table_name, {"parquet"})) {
return nullptr;
}
Expand Down
142 changes: 142 additions & 0 deletions src/duckdb/src/common/arrow/arrow_merge_event.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#include "duckdb/common/arrow/arrow_merge_event.hpp"
#include "duckdb/storage/storage_info.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
// Arrow Batch Task
//===--------------------------------------------------------------------===//

ArrowBatchTask::ArrowBatchTask(ArrowQueryResult &result, vector<idx_t> record_batch_indices, Executor &executor,
shared_ptr<Event> event_p, BatchCollectionChunkScanState scan_state,
vector<string> names, idx_t batch_size)
: ExecutorTask(executor, event_p), result(result), record_batch_indices(std::move(record_batch_indices)),
event(std::move(event_p)), batch_size(batch_size), names(std::move(names)), scan_state(std::move(scan_state)) {
}

void ArrowBatchTask::ProduceRecordBatches() {
auto &arrays = result.Arrays();
auto arrow_options = executor.context.GetClientProperties();
for (auto &index : record_batch_indices) {
auto &array = arrays[index];
D_ASSERT(array);
idx_t count;
count = ArrowUtil::FetchChunk(scan_state, arrow_options, batch_size, &array->arrow_array);
(void)count;
D_ASSERT(count != 0);
}
}

TaskExecutionResult ArrowBatchTask::ExecuteTask(TaskExecutionMode mode) {
ProduceRecordBatches();
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

//===--------------------------------------------------------------------===//
// Arrow Merge Event
//===--------------------------------------------------------------------===//

ArrowMergeEvent::ArrowMergeEvent(ArrowQueryResult &result, BatchedDataCollection &batches, Pipeline &pipeline_p)
: BasePipelineEvent(pipeline_p), result(result), batches(batches) {
record_batch_size = result.BatchSize();
}

namespace {

struct BatchesForTask {
idx_t tuple_count;
BatchedChunkIteratorRange batches;
};

struct BatchesToTaskTransformer {
public:
explicit BatchesToTaskTransformer(BatchedDataCollection &batches) : batches(batches), batch_index(0) {
batch_count = batches.BatchCount();
}
idx_t GetIndex() const {
return batch_index;
}
bool TryGetNextBatchSize(idx_t &tuple_count) {
if (batch_index >= batch_count) {
return false;
}
auto internal_index = batches.IndexToBatchIndex(batch_index++);
auto tuples_in_batch = batches.BatchSize(internal_index);
tuple_count = tuples_in_batch;
return true;
}

public:
BatchedDataCollection &batches;
idx_t batch_index;
idx_t batch_count;
};

} // namespace

void ArrowMergeEvent::Schedule() {
vector<shared_ptr<Task>> tasks;

BatchesToTaskTransformer transformer(batches);
vector<BatchesForTask> task_data;
bool finished = false;
// First we convert our list of batches into units of Storage::ROW_GROUP_SIZE tuples each
while (!finished) {
idx_t tuples_for_task = 0;
idx_t start_index = transformer.GetIndex();
idx_t end_index = start_index;
while (tuples_for_task < Storage::ROW_GROUP_SIZE) {
idx_t batch_size;
if (!transformer.TryGetNextBatchSize(batch_size)) {
finished = true;
break;
}
end_index++;
tuples_for_task += batch_size;
}
if (start_index == end_index) {
break;
}
BatchesForTask batches_for_task;
batches_for_task.tuple_count = tuples_for_task;
batches_for_task.batches = batches.BatchRange(start_index, end_index);
task_data.push_back(batches_for_task);
}

// Now we produce tasks from these units
// Every task is given a scan_state created from the range of batches
// and a vector of indices indicating the arrays (record batches) they should populate
idx_t record_batch_index = 0;
for (auto &data : task_data) {
const auto tuples = data.tuple_count;

auto full_batches = tuples / record_batch_size;
auto remainder = tuples % record_batch_size;
auto total_batches = full_batches + !!remainder;

vector<idx_t> record_batch_indices(total_batches);
for (idx_t i = 0; i < total_batches; i++) {
record_batch_indices[i] = record_batch_index++;
}

BatchCollectionChunkScanState scan_state(batches, data.batches, pipeline->executor.context);
tasks.push_back(make_uniq<ArrowBatchTask>(result, std::move(record_batch_indices), pipeline->executor,
shared_from_this(), std::move(scan_state), result.names,
record_batch_size));
}

// Allocate the list of record batches inside the query result
{
vector<unique_ptr<ArrowArrayWrapper>> arrays;
arrays.resize(record_batch_index);
for (idx_t i = 0; i < record_batch_index; i++) {
arrays[i] = make_uniq<ArrowArrayWrapper>();
}
result.SetArrowData(std::move(arrays));
}
D_ASSERT(!tasks.empty());
SetTasks(std::move(tasks));
}

} // namespace duckdb
9 changes: 7 additions & 2 deletions src/duckdb/src/common/arrow/arrow_query_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
namespace duckdb {

ArrowQueryResult::ArrowQueryResult(StatementType statement_type, StatementProperties properties, vector<string> names_p,
vector<LogicalType> types_p, ClientProperties client_properties)
vector<LogicalType> types_p, ClientProperties client_properties, idx_t batch_size)
: QueryResult(QueryResultType::ARROW_RESULT, statement_type, std::move(properties), std::move(types_p),
std::move(names_p), std::move(client_properties)) {
std::move(names_p), std::move(client_properties)),
batch_size(batch_size) {
}

ArrowQueryResult::ArrowQueryResult(ErrorData error) : QueryResult(QueryResultType::ARROW_RESULT, std::move(error)) {
Expand Down Expand Up @@ -48,4 +49,8 @@ void ArrowQueryResult::SetArrowData(vector<unique_ptr<ArrowArrayWrapper>> arrays
this->arrays = std::move(arrays);
}

idx_t ArrowQueryResult::BatchSize() const {
return batch_size;
}

} // namespace duckdb
37 changes: 37 additions & 0 deletions src/duckdb/src/common/arrow/physical_arrow_batch_collector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "duckdb/common/arrow/physical_arrow_batch_collector.hpp"
#include "duckdb/common/types/batched_data_collection.hpp"
#include "duckdb/common/arrow/arrow_query_result.hpp"
#include "duckdb/common/arrow/arrow_merge_event.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/common/arrow/physical_arrow_collector.hpp"

namespace duckdb {

unique_ptr<GlobalSinkState> PhysicalArrowBatchCollector::GetGlobalSinkState(ClientContext &context) const {
return make_uniq<ArrowBatchGlobalState>(context, *this);
}

SinkFinalizeType PhysicalArrowBatchCollector::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
auto &gstate = input.global_state.Cast<ArrowBatchGlobalState>();

auto total_tuple_count = gstate.data.Count();
if (total_tuple_count == 0) {
// Create the result containing a single empty result conversion
gstate.result = make_uniq<ArrowQueryResult>(statement_type, properties, names, types,
context.GetClientProperties(), record_batch_size);
return SinkFinalizeType::READY;
}

// Already create the final query result
gstate.result = make_uniq<ArrowQueryResult>(statement_type, properties, names, types, context.GetClientProperties(),
record_batch_size);
// Spawn an event that will populate the conversion result
auto &arrow_result = gstate.result->Cast<ArrowQueryResult>();
auto new_event = make_shared_ptr<ArrowMergeEvent>(arrow_result, gstate.data, pipeline);
event.InsertEvent(std::move(new_event));

return SinkFinalizeType::READY;
}

} // namespace duckdb
Loading

0 comments on commit 6561074

Please sign in to comment.