Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vendor DuckDB v0.9.2 #35

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/duckdb/extension/icu/icu-timebucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,21 @@ struct ICUTimeBucket : public ICUDateFunc {

static inline timestamp_t WidthConvertibleToDaysCommon(int32_t bucket_width_days, const timestamp_t ts,
const timestamp_t origin, icu::Calendar *calendar) {
const auto trunc_days = TruncationFactory(DatePartSpecifier::DAY);
const auto sub_days = SubtractFactory(DatePartSpecifier::DAY);

uint64_t tmp_micros = SetTime(calendar, ts);
trunc_days(calendar, tmp_micros);
timestamp_t truncated_ts = GetTimeUnsafe(calendar, tmp_micros);

int64_t ts_days = sub_days(calendar, origin, truncated_ts);
int64_t ts_days = sub_days(calendar, origin, ts);
int64_t result_days = (ts_days / bucket_width_days) * bucket_width_days;
if (result_days < NumericLimits<int32_t>::Minimum() || result_days > NumericLimits<int32_t>::Maximum()) {
throw OutOfRangeException("Timestamp out of range");
}
if (ts_days < 0 && ts_days % bucket_width_days != 0) {
result_days =
SubtractOperatorOverflowCheck::Operation<int32_t, int32_t, int32_t>(result_days, bucket_width_days);
timestamp_t bucket = Add(calendar, origin, interval_t {0, static_cast<int32_t>(result_days), 0});
if (ts < bucket) {
D_ASSERT(ts < origin);
bucket = Add(calendar, bucket, interval_t {0, -bucket_width_days, 0});
D_ASSERT(ts > bucket);
}

return Add(calendar, origin, interval_t {0, static_cast<int32_t>(result_days), 0});
return bucket;
}

static inline timestamp_t WidthConvertibleToMonthsCommon(int32_t bucket_width_months, const timestamp_t ts,
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/icu/icu-timezone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ static void ICUTimeZoneFunction(ClientContext &context, TableFunctionInput &data
break;
}

// What PG reports is the total offset for today,
// which is the ICU total offset (i.e., "raw") plus the DST offset.
raw_offset_ms += dst_offset_ms;
output.SetValue(2, index, Value::INTERVAL(Interval::FromMicro(raw_offset_ms * Interval::MICROS_PER_MSEC)));
output.SetValue(3, index, Value(dst_offset_ms != 0));
++index;
Expand Down
29 changes: 11 additions & 18 deletions src/duckdb/extension/json/buffered_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool JSONFileHandle::IsOpen() const {
}

void JSONFileHandle::Close() {
if (IsOpen() && file_handle->OnDiskFile()) {
if (IsOpen() && !file_handle->IsPipe()) {
file_handle->Close();
file_handle = nullptr;
}
Expand Down Expand Up @@ -72,30 +72,23 @@ void JSONFileHandle::ReadAtPosition(char *pointer, idx_t size, idx_t position, b
D_ASSERT(size != 0);
if (plain_file_source) {
file_handle->Read(pointer, size, position);
actual_reads++;

return;
}

if (sample_run) { // Cache the buffer
} else if (sample_run) { // Cache the buffer
file_handle->Read(pointer, size, position);
actual_reads++;

cached_buffers.emplace_back(allocator.Allocate(size));
memcpy(cached_buffers.back().get(), pointer, size);
cached_size += size;
} else {
if (!cached_buffers.empty() || position < cached_size) {
ReadFromCache(pointer, size, position);
}

return;
}

if (!cached_buffers.empty() || position < cached_size) {
ReadFromCache(pointer, size, position);
actual_reads++;
if (size != 0) {
file_handle->Read(pointer, size, position);
}
}

if (size != 0) {
file_handle->Read(pointer, size, position);
actual_reads++;
if (++actual_reads > requested_reads) {
throw InternalException("JSONFileHandle performed more actual reads than requested reads");
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/duckdb/extension/json/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,22 @@ unique_ptr<GlobalTableFunctionState> JSONGlobalTableFunctionState::Init(ClientCo

idx_t JSONGlobalTableFunctionState::MaxThreads() const {
auto &bind_data = state.bind_data;
if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED) {
return state.system_threads;
}

if (!state.json_readers.empty() && state.json_readers[0]->HasFileHandle()) {
// We opened and auto-detected a file, so we can get a better estimate
auto &reader = *state.json_readers[0];
if (reader.GetFormat() == JSONFormat::NEWLINE_DELIMITED) { // Auto-detected NDJSON
return state.system_threads;
if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED ||
reader.GetFormat() == JSONFormat::NEWLINE_DELIMITED) {
return MaxValue<idx_t>(state.json_readers[0]->GetFileHandle().FileSize() / bind_data.maximum_object_size,
1);
}
}

if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED) {
// We haven't opened any files, so this is our best bet
return state.system_threads;
}

// One reader per file
return bind_data.files.size();
}
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ static void GetFieldIDs(const Value &field_ids_value, ChildFieldIDs &field_ids,
}
}

unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyInfo &info, vector<string> &names,
vector<LogicalType> &sql_types) {
unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo &info, const vector<string> &names,
const vector<LogicalType> &sql_types) {
D_ASSERT(names.size() == sql_types.size());
bool row_group_size_bytes_set = false;
auto bind_data = make_uniq<ParquetWriteBindData>();
Expand Down
18 changes: 6 additions & 12 deletions src/duckdb/src/catalog/catalog_entry/view_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ unique_ptr<CreateInfo> ViewCatalogEntry::GetInfo() const {
result->query = unique_ptr_cast<SQLStatement, SelectStatement>(query->Copy());
result->aliases = aliases;
result->types = types;
result->temporary = temporary;
return std::move(result);
}

Expand All @@ -58,23 +59,16 @@ string ViewCatalogEntry::ToSQL() const {
//! Return empty sql with view name so pragma view_tables don't complain
return sql;
}
return sql + "\n;";
auto info = GetInfo();
auto result = info->ToString();
return result + ";\n";
}

unique_ptr<CatalogEntry> ViewCatalogEntry::Copy(ClientContext &context) const {
D_ASSERT(!internal);
CreateViewInfo create_info(schema, name);
create_info.query = unique_ptr_cast<SQLStatement, SelectStatement>(query->Copy());
for (idx_t i = 0; i < aliases.size(); i++) {
create_info.aliases.push_back(aliases[i]);
}
for (idx_t i = 0; i < types.size(); i++) {
create_info.types.push_back(types[i]);
}
create_info.temporary = temporary;
create_info.sql = sql;
auto create_info = GetInfo();

return make_uniq<ViewCatalogEntry>(catalog, schema, create_info);
return make_uniq<ViewCatalogEntry>(catalog, schema, create_info->Cast<CreateViewInfo>());
}

} // namespace duckdb
7 changes: 3 additions & 4 deletions src/duckdb/src/catalog/catalog_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ bool CatalogSet::AlterOwnership(CatalogTransaction transaction, ChangeOwnershipI
bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name, AlterInfo &alter_info) {
// lock the catalog for writing
lock_guard<mutex> write_lock(catalog.GetWriteLock());
// lock this catalog set to disallow reading
lock_guard<mutex> read_lock(catalog_lock);

// first check if the entry exists in the unordered set
EntryIndex entry_index;
Expand All @@ -210,9 +212,6 @@ bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name,
throw CatalogException("Cannot alter entry \"%s\" because it is an internal system entry", entry->name);
}

// lock this catalog set to disallow reading
lock_guard<mutex> read_lock(catalog_lock);

// create a new entry and replace the currently stored one
// set the timestamp to the timestamp of the current transaction
// and point it to the updated table node
Expand Down Expand Up @@ -316,6 +315,7 @@ void CatalogSet::DropEntryInternal(CatalogTransaction transaction, EntryIndex en
bool CatalogSet::DropEntry(CatalogTransaction transaction, const string &name, bool cascade, bool allow_drop_internal) {
// lock the catalog for writing
lock_guard<mutex> write_lock(catalog.GetWriteLock());
lock_guard<mutex> read_lock(catalog_lock);
// we can only delete an entry that exists
EntryIndex entry_index;
auto entry = GetEntryInternal(transaction, name, &entry_index);
Expand All @@ -326,7 +326,6 @@ bool CatalogSet::DropEntry(CatalogTransaction transaction, const string &name, b
throw CatalogException("Cannot drop entry \"%s\" because it is an internal system entry", entry->name);
}

lock_guard<mutex> read_lock(catalog_lock);
DropEntryInternal(transaction, std::move(entry_index), *entry, cascade);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/arrow/appender/union_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void ArrowUnionData::Append(ArrowAppendData &append_data, Vector &input, idx_t f

duckdb::vector<Vector> child_vectors;
for (const auto &child : UnionType::CopyMemberTypes(input.GetType())) {
child_vectors.emplace_back(child.second);
child_vectors.emplace_back(child.second, size);
}

for (idx_t input_idx = from; input_idx < to; input_idx++) {
Expand Down
25 changes: 17 additions & 8 deletions src/duckdb/src/common/arrow/arrow_appender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,26 +193,26 @@ static void InitializeFunctionPointers(ArrowAppendData &append_data, const Logic
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowVarcharData<string_t>>(append_data);
} else {
InitializeAppenderForType<ArrowVarcharData<string_t, ArrowVarcharConverter, uint32_t>>(append_data);
InitializeAppenderForType<ArrowVarcharData<string_t, ArrowVarcharConverter, int32_t>>(append_data);
}
break;
case LogicalTypeId::UUID:
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter>>(append_data);
} else {
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, uint32_t>>(append_data);
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, int32_t>>(append_data);
}
break;
case LogicalTypeId::ENUM:
switch (type.InternalType()) {
case PhysicalType::UINT8:
InitializeAppenderForType<ArrowEnumData<uint8_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int8_t>>(append_data);
break;
case PhysicalType::UINT16:
InitializeAppenderForType<ArrowEnumData<uint16_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int16_t>>(append_data);
break;
case PhysicalType::UINT32:
InitializeAppenderForType<ArrowEnumData<uint32_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int32_t>>(append_data);
break;
default:
throw InternalException("Unsupported internal enum type");
Expand All @@ -227,11 +227,20 @@ static void InitializeFunctionPointers(ArrowAppendData &append_data, const Logic
case LogicalTypeId::STRUCT:
InitializeAppenderForType<ArrowStructData>(append_data);
break;
case LogicalTypeId::LIST:
InitializeAppenderForType<ArrowListData>(append_data);
case LogicalTypeId::LIST: {
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowListData<int64_t>>(append_data);
} else {
InitializeAppenderForType<ArrowListData<int32_t>>(append_data);
}
break;
}
case LogicalTypeId::MAP:
InitializeAppenderForType<ArrowMapData>(append_data);
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowMapData<int64_t>>(append_data);
} else {
InitializeAppenderForType<ArrowMapData<int32_t>>(append_data);
}
break;
default:
throw NotImplementedException("Unsupported type in DuckDB -> Arrow Conversion: %s\n", type.ToString());
Expand Down
6 changes: 5 additions & 1 deletion src/duckdb/src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ void SetArrowFormat(DuckDBArrowSchemaHolder &root_holder, ArrowSchema &child, co
break;
}
case LogicalTypeId::LIST: {
child.format = "+l";
if (options.arrow_offset_size == ArrowOffsetSize::LARGE) {
child.format = "+L";
} else {
child.format = "+l";
}
child.n_children = 1;
root_holder.nested_children.emplace_back();
root_holder.nested_children.back().resize(1);
Expand Down
67 changes: 67 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "duckdb/common/types/timestamp.hpp"
#include "duckdb/common/types/vector.hpp"
#include "duckdb/common/types/vector_buffer.hpp"
#include "duckdb/core_functions/aggregate/quantile_enum.hpp"
#include "duckdb/execution/index/art/art.hpp"
#include "duckdb/execution/index/art/node.hpp"
#include "duckdb/execution/operator/scan/csv/base_csv_reader.hpp"
Expand Down Expand Up @@ -4571,6 +4572,44 @@ ProfilerPrintFormat EnumUtil::FromString<ProfilerPrintFormat>(const char *value)
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<QuantileSerializationType>(QuantileSerializationType value) {
switch(value) {
case QuantileSerializationType::NON_DECIMAL:
return "NON_DECIMAL";
case QuantileSerializationType::DECIMAL_DISCRETE:
return "DECIMAL_DISCRETE";
case QuantileSerializationType::DECIMAL_DISCRETE_LIST:
return "DECIMAL_DISCRETE_LIST";
case QuantileSerializationType::DECIMAL_CONTINUOUS:
return "DECIMAL_CONTINUOUS";
case QuantileSerializationType::DECIMAL_CONTINUOUS_LIST:
return "DECIMAL_CONTINUOUS_LIST";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
}

template<>
QuantileSerializationType EnumUtil::FromString<QuantileSerializationType>(const char *value) {
if (StringUtil::Equals(value, "NON_DECIMAL")) {
return QuantileSerializationType::NON_DECIMAL;
}
if (StringUtil::Equals(value, "DECIMAL_DISCRETE")) {
return QuantileSerializationType::DECIMAL_DISCRETE;
}
if (StringUtil::Equals(value, "DECIMAL_DISCRETE_LIST")) {
return QuantileSerializationType::DECIMAL_DISCRETE_LIST;
}
if (StringUtil::Equals(value, "DECIMAL_CONTINUOUS")) {
return QuantileSerializationType::DECIMAL_CONTINUOUS;
}
if (StringUtil::Equals(value, "DECIMAL_CONTINUOUS_LIST")) {
return QuantileSerializationType::DECIMAL_CONTINUOUS_LIST;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<QueryNodeType>(QueryNodeType value) {
switch(value) {
Expand Down Expand Up @@ -5118,6 +5157,29 @@ SinkFinalizeType EnumUtil::FromString<SinkFinalizeType>(const char *value) {
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<SinkNextBatchType>(SinkNextBatchType value) {
switch(value) {
case SinkNextBatchType::READY:
return "READY";
case SinkNextBatchType::BLOCKED:
return "BLOCKED";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
}

template<>
SinkNextBatchType EnumUtil::FromString<SinkNextBatchType>(const char *value) {
if (StringUtil::Equals(value, "READY")) {
return SinkNextBatchType::READY;
}
if (StringUtil::Equals(value, "BLOCKED")) {
return SinkNextBatchType::BLOCKED;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<SinkResultType>(SinkResultType value) {
switch(value) {
Expand Down Expand Up @@ -6010,6 +6072,8 @@ const char* EnumUtil::ToChars<UnionInvalidReason>(UnionInvalidReason value) {
return "VALIDITY_OVERLAP";
case UnionInvalidReason::TAG_MISMATCH:
return "TAG_MISMATCH";
case UnionInvalidReason::NULL_TAG:
return "NULL_TAG";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
Expand All @@ -6032,6 +6096,9 @@ UnionInvalidReason EnumUtil::FromString<UnionInvalidReason>(const char *value) {
if (StringUtil::Equals(value, "TAG_MISMATCH")) {
return UnionInvalidReason::TAG_MISMATCH;
}
if (StringUtil::Equals(value, "NULL_TAG")) {
return UnionInvalidReason::NULL_TAG;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

Expand Down
Loading