Skip to content

Commit

Permalink
Vendor DuckDB v0.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
carlopi committed Nov 14, 2023
1 parent ab7dbe9 commit 2804429
Show file tree
Hide file tree
Showing 119 changed files with 1,787 additions and 652 deletions.
17 changes: 7 additions & 10 deletions src/duckdb/extension/icu/icu-timebucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,21 @@ struct ICUTimeBucket : public ICUDateFunc {

static inline timestamp_t WidthConvertibleToDaysCommon(int32_t bucket_width_days, const timestamp_t ts,
const timestamp_t origin, icu::Calendar *calendar) {
const auto trunc_days = TruncationFactory(DatePartSpecifier::DAY);
const auto sub_days = SubtractFactory(DatePartSpecifier::DAY);

uint64_t tmp_micros = SetTime(calendar, ts);
trunc_days(calendar, tmp_micros);
timestamp_t truncated_ts = GetTimeUnsafe(calendar, tmp_micros);

int64_t ts_days = sub_days(calendar, origin, truncated_ts);
int64_t ts_days = sub_days(calendar, origin, ts);
int64_t result_days = (ts_days / bucket_width_days) * bucket_width_days;
if (result_days < NumericLimits<int32_t>::Minimum() || result_days > NumericLimits<int32_t>::Maximum()) {
throw OutOfRangeException("Timestamp out of range");
}
if (ts_days < 0 && ts_days % bucket_width_days != 0) {
result_days =
SubtractOperatorOverflowCheck::Operation<int32_t, int32_t, int32_t>(result_days, bucket_width_days);
timestamp_t bucket = Add(calendar, origin, interval_t {0, static_cast<int32_t>(result_days), 0});
if (ts < bucket) {
D_ASSERT(ts < origin);
bucket = Add(calendar, bucket, interval_t {0, -bucket_width_days, 0});
D_ASSERT(ts > bucket);
}

return Add(calendar, origin, interval_t {0, static_cast<int32_t>(result_days), 0});
return bucket;
}

static inline timestamp_t WidthConvertibleToMonthsCommon(int32_t bucket_width_months, const timestamp_t ts,
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/icu/icu-timezone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ static void ICUTimeZoneFunction(ClientContext &context, TableFunctionInput &data
break;
}

// What PG reports is the total offset for today,
// which is the ICU total offset (i.e., "raw") plus the DST offset.
raw_offset_ms += dst_offset_ms;
output.SetValue(2, index, Value::INTERVAL(Interval::FromMicro(raw_offset_ms * Interval::MICROS_PER_MSEC)));
output.SetValue(3, index, Value(dst_offset_ms != 0));
++index;
Expand Down
29 changes: 11 additions & 18 deletions src/duckdb/extension/json/buffered_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool JSONFileHandle::IsOpen() const {
}

void JSONFileHandle::Close() {
if (IsOpen() && file_handle->OnDiskFile()) {
if (IsOpen() && !file_handle->IsPipe()) {
file_handle->Close();
file_handle = nullptr;
}
Expand Down Expand Up @@ -72,30 +72,23 @@ void JSONFileHandle::ReadAtPosition(char *pointer, idx_t size, idx_t position, b
D_ASSERT(size != 0);
if (plain_file_source) {
file_handle->Read(pointer, size, position);
actual_reads++;

return;
}

if (sample_run) { // Cache the buffer
} else if (sample_run) { // Cache the buffer
file_handle->Read(pointer, size, position);
actual_reads++;

cached_buffers.emplace_back(allocator.Allocate(size));
memcpy(cached_buffers.back().get(), pointer, size);
cached_size += size;
} else {
if (!cached_buffers.empty() || position < cached_size) {
ReadFromCache(pointer, size, position);
}

return;
}

if (!cached_buffers.empty() || position < cached_size) {
ReadFromCache(pointer, size, position);
actual_reads++;
if (size != 0) {
file_handle->Read(pointer, size, position);
}
}

if (size != 0) {
file_handle->Read(pointer, size, position);
actual_reads++;
if (++actual_reads > requested_reads) {
throw InternalException("JSONFileHandle performed more actual reads than requested reads");
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/duckdb/extension/json/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,22 @@ unique_ptr<GlobalTableFunctionState> JSONGlobalTableFunctionState::Init(ClientCo

idx_t JSONGlobalTableFunctionState::MaxThreads() const {
auto &bind_data = state.bind_data;
if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED) {
return state.system_threads;
}

if (!state.json_readers.empty() && state.json_readers[0]->HasFileHandle()) {
// We opened and auto-detected a file, so we can get a better estimate
auto &reader = *state.json_readers[0];
if (reader.GetFormat() == JSONFormat::NEWLINE_DELIMITED) { // Auto-detected NDJSON
return state.system_threads;
if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED ||
reader.GetFormat() == JSONFormat::NEWLINE_DELIMITED) {
return MaxValue<idx_t>(state.json_readers[0]->GetFileHandle().FileSize() / bind_data.maximum_object_size,
1);
}
}

if (bind_data.options.format == JSONFormat::NEWLINE_DELIMITED) {
// We haven't opened any files, so this is our best bet
return state.system_threads;
}

// One reader per file
return bind_data.files.size();
}
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ static void GetFieldIDs(const Value &field_ids_value, ChildFieldIDs &field_ids,
}
}

unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyInfo &info, vector<string> &names,
vector<LogicalType> &sql_types) {
unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo &info, const vector<string> &names,
const vector<LogicalType> &sql_types) {
D_ASSERT(names.size() == sql_types.size());
bool row_group_size_bytes_set = false;
auto bind_data = make_uniq<ParquetWriteBindData>();
Expand Down
18 changes: 6 additions & 12 deletions src/duckdb/src/catalog/catalog_entry/view_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ unique_ptr<CreateInfo> ViewCatalogEntry::GetInfo() const {
result->query = unique_ptr_cast<SQLStatement, SelectStatement>(query->Copy());
result->aliases = aliases;
result->types = types;
result->temporary = temporary;
return std::move(result);
}

Expand All @@ -58,23 +59,16 @@ string ViewCatalogEntry::ToSQL() const {
//! Return empty sql with view name so pragma view_tables don't complain
return sql;
}
return sql + "\n;";
auto info = GetInfo();
auto result = info->ToString();
return result + ";\n";
}

unique_ptr<CatalogEntry> ViewCatalogEntry::Copy(ClientContext &context) const {
D_ASSERT(!internal);
CreateViewInfo create_info(schema, name);
create_info.query = unique_ptr_cast<SQLStatement, SelectStatement>(query->Copy());
for (idx_t i = 0; i < aliases.size(); i++) {
create_info.aliases.push_back(aliases[i]);
}
for (idx_t i = 0; i < types.size(); i++) {
create_info.types.push_back(types[i]);
}
create_info.temporary = temporary;
create_info.sql = sql;
auto create_info = GetInfo();

return make_uniq<ViewCatalogEntry>(catalog, schema, create_info);
return make_uniq<ViewCatalogEntry>(catalog, schema, create_info->Cast<CreateViewInfo>());
}

} // namespace duckdb
7 changes: 3 additions & 4 deletions src/duckdb/src/catalog/catalog_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ bool CatalogSet::AlterOwnership(CatalogTransaction transaction, ChangeOwnershipI
bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name, AlterInfo &alter_info) {
// lock the catalog for writing
lock_guard<mutex> write_lock(catalog.GetWriteLock());
// lock this catalog set to disallow reading
lock_guard<mutex> read_lock(catalog_lock);

// first check if the entry exists in the unordered set
EntryIndex entry_index;
Expand All @@ -210,9 +212,6 @@ bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name,
throw CatalogException("Cannot alter entry \"%s\" because it is an internal system entry", entry->name);
}

// lock this catalog set to disallow reading
lock_guard<mutex> read_lock(catalog_lock);

// create a new entry and replace the currently stored one
// set the timestamp to the timestamp of the current transaction
// and point it to the updated table node
Expand Down Expand Up @@ -316,6 +315,7 @@ void CatalogSet::DropEntryInternal(CatalogTransaction transaction, EntryIndex en
bool CatalogSet::DropEntry(CatalogTransaction transaction, const string &name, bool cascade, bool allow_drop_internal) {
// lock the catalog for writing
lock_guard<mutex> write_lock(catalog.GetWriteLock());
lock_guard<mutex> read_lock(catalog_lock);
// we can only delete an entry that exists
EntryIndex entry_index;
auto entry = GetEntryInternal(transaction, name, &entry_index);
Expand All @@ -326,7 +326,6 @@ bool CatalogSet::DropEntry(CatalogTransaction transaction, const string &name, b
throw CatalogException("Cannot drop entry \"%s\" because it is an internal system entry", entry->name);
}

lock_guard<mutex> read_lock(catalog_lock);
DropEntryInternal(transaction, std::move(entry_index), *entry, cascade);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/arrow/appender/union_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void ArrowUnionData::Append(ArrowAppendData &append_data, Vector &input, idx_t f

duckdb::vector<Vector> child_vectors;
for (const auto &child : UnionType::CopyMemberTypes(input.GetType())) {
child_vectors.emplace_back(child.second);
child_vectors.emplace_back(child.second, size);
}

for (idx_t input_idx = from; input_idx < to; input_idx++) {
Expand Down
25 changes: 17 additions & 8 deletions src/duckdb/src/common/arrow/arrow_appender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,26 +193,26 @@ static void InitializeFunctionPointers(ArrowAppendData &append_data, const Logic
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowVarcharData<string_t>>(append_data);
} else {
InitializeAppenderForType<ArrowVarcharData<string_t, ArrowVarcharConverter, uint32_t>>(append_data);
InitializeAppenderForType<ArrowVarcharData<string_t, ArrowVarcharConverter, int32_t>>(append_data);
}
break;
case LogicalTypeId::UUID:
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter>>(append_data);
} else {
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, uint32_t>>(append_data);
InitializeAppenderForType<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, int32_t>>(append_data);
}
break;
case LogicalTypeId::ENUM:
switch (type.InternalType()) {
case PhysicalType::UINT8:
InitializeAppenderForType<ArrowEnumData<uint8_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int8_t>>(append_data);
break;
case PhysicalType::UINT16:
InitializeAppenderForType<ArrowEnumData<uint16_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int16_t>>(append_data);
break;
case PhysicalType::UINT32:
InitializeAppenderForType<ArrowEnumData<uint32_t>>(append_data);
InitializeAppenderForType<ArrowEnumData<int32_t>>(append_data);
break;
default:
throw InternalException("Unsupported internal enum type");
Expand All @@ -227,11 +227,20 @@ static void InitializeFunctionPointers(ArrowAppendData &append_data, const Logic
case LogicalTypeId::STRUCT:
InitializeAppenderForType<ArrowStructData>(append_data);
break;
case LogicalTypeId::LIST:
InitializeAppenderForType<ArrowListData>(append_data);
case LogicalTypeId::LIST: {
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowListData<int64_t>>(append_data);
} else {
InitializeAppenderForType<ArrowListData<int32_t>>(append_data);
}
break;
}
case LogicalTypeId::MAP:
InitializeAppenderForType<ArrowMapData>(append_data);
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowMapData<int64_t>>(append_data);
} else {
InitializeAppenderForType<ArrowMapData<int32_t>>(append_data);
}
break;
default:
throw NotImplementedException("Unsupported type in DuckDB -> Arrow Conversion: %s\n", type.ToString());
Expand Down
6 changes: 5 additions & 1 deletion src/duckdb/src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ void SetArrowFormat(DuckDBArrowSchemaHolder &root_holder, ArrowSchema &child, co
break;
}
case LogicalTypeId::LIST: {
child.format = "+l";
if (options.arrow_offset_size == ArrowOffsetSize::LARGE) {
child.format = "+L";
} else {
child.format = "+l";
}
child.n_children = 1;
root_holder.nested_children.emplace_back();
root_holder.nested_children.back().resize(1);
Expand Down
67 changes: 67 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "duckdb/common/types/timestamp.hpp"
#include "duckdb/common/types/vector.hpp"
#include "duckdb/common/types/vector_buffer.hpp"
#include "duckdb/core_functions/aggregate/quantile_enum.hpp"
#include "duckdb/execution/index/art/art.hpp"
#include "duckdb/execution/index/art/node.hpp"
#include "duckdb/execution/operator/scan/csv/base_csv_reader.hpp"
Expand Down Expand Up @@ -4571,6 +4572,44 @@ ProfilerPrintFormat EnumUtil::FromString<ProfilerPrintFormat>(const char *value)
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<QuantileSerializationType>(QuantileSerializationType value) {
switch(value) {
case QuantileSerializationType::NON_DECIMAL:
return "NON_DECIMAL";
case QuantileSerializationType::DECIMAL_DISCRETE:
return "DECIMAL_DISCRETE";
case QuantileSerializationType::DECIMAL_DISCRETE_LIST:
return "DECIMAL_DISCRETE_LIST";
case QuantileSerializationType::DECIMAL_CONTINUOUS:
return "DECIMAL_CONTINUOUS";
case QuantileSerializationType::DECIMAL_CONTINUOUS_LIST:
return "DECIMAL_CONTINUOUS_LIST";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
}

template<>
QuantileSerializationType EnumUtil::FromString<QuantileSerializationType>(const char *value) {
if (StringUtil::Equals(value, "NON_DECIMAL")) {
return QuantileSerializationType::NON_DECIMAL;
}
if (StringUtil::Equals(value, "DECIMAL_DISCRETE")) {
return QuantileSerializationType::DECIMAL_DISCRETE;
}
if (StringUtil::Equals(value, "DECIMAL_DISCRETE_LIST")) {
return QuantileSerializationType::DECIMAL_DISCRETE_LIST;
}
if (StringUtil::Equals(value, "DECIMAL_CONTINUOUS")) {
return QuantileSerializationType::DECIMAL_CONTINUOUS;
}
if (StringUtil::Equals(value, "DECIMAL_CONTINUOUS_LIST")) {
return QuantileSerializationType::DECIMAL_CONTINUOUS_LIST;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<QueryNodeType>(QueryNodeType value) {
switch(value) {
Expand Down Expand Up @@ -5118,6 +5157,29 @@ SinkFinalizeType EnumUtil::FromString<SinkFinalizeType>(const char *value) {
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<SinkNextBatchType>(SinkNextBatchType value) {
switch(value) {
case SinkNextBatchType::READY:
return "READY";
case SinkNextBatchType::BLOCKED:
return "BLOCKED";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
}

template<>
SinkNextBatchType EnumUtil::FromString<SinkNextBatchType>(const char *value) {
if (StringUtil::Equals(value, "READY")) {
return SinkNextBatchType::READY;
}
if (StringUtil::Equals(value, "BLOCKED")) {
return SinkNextBatchType::BLOCKED;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<SinkResultType>(SinkResultType value) {
switch(value) {
Expand Down Expand Up @@ -6010,6 +6072,8 @@ const char* EnumUtil::ToChars<UnionInvalidReason>(UnionInvalidReason value) {
return "VALIDITY_OVERLAP";
case UnionInvalidReason::TAG_MISMATCH:
return "TAG_MISMATCH";
case UnionInvalidReason::NULL_TAG:
return "NULL_TAG";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
Expand All @@ -6032,6 +6096,9 @@ UnionInvalidReason EnumUtil::FromString<UnionInvalidReason>(const char *value) {
if (StringUtil::Equals(value, "TAG_MISMATCH")) {
return UnionInvalidReason::TAG_MISMATCH;
}
if (StringUtil::Equals(value, "NULL_TAG")) {
return UnionInvalidReason::NULL_TAG;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

Expand Down
Loading

0 comments on commit 2804429

Please sign in to comment.