Skip to content

Commit

Permalink
apacheGH-36099: [C++] Add Utf8View and BinaryView to the c ABI (apach…
Browse files Browse the repository at this point in the history
…e#38443)

### Rationale for this change

Utf8View and BinaryView should be added to the c ABI spec and to the c++ library's importer/exporter.

### Are these changes tested?

Yes, minimally

### Are there any user-facing changes?

View arrays will be importable/exportable through the c ABI in c++

* Closes: apache#36099

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz authored and dgreiss committed Feb 17, 2024
1 parent 6350026 commit 2cf041f
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 39 deletions.
87 changes: 73 additions & 14 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
Expand All @@ -41,6 +42,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"
#include "arrow/util/small_vector.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"
Expand Down Expand Up @@ -260,7 +262,7 @@ struct SchemaExporter {
// Dictionary type: parent struct describes index type,
// child dictionary struct describes value type.
RETURN_NOT_OK(VisitTypeInline(*dict_type.index_type(), this));
dict_exporter_.reset(new SchemaExporter());
dict_exporter_ = std::make_unique<SchemaExporter>();
RETURN_NOT_OK(dict_exporter_->ExportType(*dict_type.value_type()));
} else {
RETURN_NOT_OK(VisitTypeInline(type, this));
Expand Down Expand Up @@ -357,10 +359,14 @@ struct SchemaExporter {

Status Visit(const LargeBinaryType& type) { return SetFormat("Z"); }

Status Visit(const BinaryViewType& type) { return SetFormat("vz"); }

Status Visit(const StringType& type) { return SetFormat("u"); }

Status Visit(const LargeStringType& type) { return SetFormat("U"); }

Status Visit(const StringViewType& type) { return SetFormat("vu"); }

Status Visit(const Date32Type& type) { return SetFormat("tdD"); }

Status Visit(const Date64Type& type) { return SetFormat("tdm"); }
Expand Down Expand Up @@ -521,13 +527,14 @@ namespace {

struct ExportedArrayPrivateData : PoolAllocationMixin<ExportedArrayPrivateData> {
// The buffers are owned by the ArrayData member
StaticVector<const void*, 3> buffers_;
SmallVector<const void*, 3> buffers_;
struct ArrowArray dictionary_;
SmallVector<struct ArrowArray, 1> children_;
SmallVector<struct ArrowArray*, 4> child_pointers_;

std::shared_ptr<ArrayData> data_;
std::shared_ptr<Device::SyncEvent> sync_;
std::vector<int64_t> variadic_buffer_sizes_;

ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
Expand Down Expand Up @@ -570,15 +577,32 @@ struct ArrayExporter {
--n_buffers;
++buffers_begin;
}

bool need_variadic_buffer_sizes =
data->type->id() == Type::BINARY_VIEW || data->type->id() == Type::STRING_VIEW;
if (need_variadic_buffer_sizes) {
++n_buffers;
}

export_.buffers_.resize(n_buffers);
std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(),
[](const std::shared_ptr<Buffer>& buffer) -> const void* {
return buffer ? buffer->data() : nullptr;
});

if (need_variadic_buffer_sizes) {
auto variadic_buffers = util::span(data->buffers).subspan(2);
export_.variadic_buffer_sizes_.resize(variadic_buffers.size());
size_t i = 0;
for (const auto& buf : variadic_buffers) {
export_.variadic_buffer_sizes_[i++] = buf->size();
}
export_.buffers_.back() = export_.variadic_buffer_sizes_.data();
}

// Export dictionary
if (data->dictionary != nullptr) {
dict_exporter_.reset(new ArrayExporter());
dict_exporter_ = std::make_unique<ArrayExporter>();
RETURN_NOT_OK(dict_exporter_->Export(data->dictionary));
}

Expand Down Expand Up @@ -795,7 +819,7 @@ Status InvalidFormatString(std::string_view v) {

class FormatStringParser {
public:
FormatStringParser() {}
FormatStringParser() = default;

explicit FormatStringParser(std::string_view v) : view_(v), index_(0) {}

Expand Down Expand Up @@ -941,8 +965,6 @@ Result<DecodedMetadata> DecodeMetadata(const char* metadata) {
}

struct SchemaImporter {
SchemaImporter() : c_struct_(nullptr), guard_(nullptr) {}

Status Import(struct ArrowSchema* src) {
if (ArrowSchemaIsReleased(src)) {
return Status::Invalid("Cannot import released ArrowSchema");
Expand Down Expand Up @@ -1068,6 +1090,8 @@ struct SchemaImporter {
return ProcessPrimitive(binary());
case 'Z':
return ProcessPrimitive(large_binary());
case 'v':
return ProcessBinaryView();
case 'w':
return ProcessFixedSizeBinary();
case 'd':
Expand All @@ -1080,6 +1104,17 @@ struct SchemaImporter {
return f_parser_.Invalid();
}

Status ProcessBinaryView() {
RETURN_NOT_OK(f_parser_.CheckHasNext());
switch (f_parser_.Next()) {
case 'z':
return ProcessPrimitive(binary_view());
case 'u':
return ProcessPrimitive(utf8_view());
}
return f_parser_.Invalid();
}

Status ProcessTemporal() {
RETURN_NOT_OK(f_parser_.CheckHasNext());
switch (f_parser_.Next()) {
Expand Down Expand Up @@ -1360,8 +1395,8 @@ struct SchemaImporter {
return Status::OK();
}

struct ArrowSchema* c_struct_;
SchemaExportGuard guard_;
struct ArrowSchema* c_struct_{nullptr};
SchemaExportGuard guard_{nullptr};
FormatStringParser f_parser_;
int64_t recursion_level_;
std::vector<SchemaImporter> child_importers_;
Expand Down Expand Up @@ -1429,7 +1464,7 @@ class ImportedBuffer : public Buffer {
std::shared_ptr<ImportedArrayData> import)
: Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {}

~ImportedBuffer() override {}
~ImportedBuffer() override = default;

std::shared_ptr<Device::SyncEvent> device_sync_event() override {
return import_->device_sync_;
Expand All @@ -1441,9 +1476,7 @@ class ImportedBuffer : public Buffer {

struct ArrayImporter {
explicit ArrayImporter(const std::shared_ptr<DataType>& type)
: type_(type),
zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
device_type_(DeviceAllocationType::kCPU) {}
: type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)) {}

Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) {
ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id));
Expand Down Expand Up @@ -1591,6 +1624,10 @@ struct ArrayImporter {

Status Visit(const LargeBinaryType& type) { return ImportStringLike(type); }

Status Visit(const StringViewType& type) { return ImportBinaryView(type); }

Status Visit(const BinaryViewType& type) { return ImportBinaryView(type); }

Status Visit(const ListType& type) { return ImportListLike(type); }

Status Visit(const LargeListType& type) { return ImportListLike(type); }
Expand Down Expand Up @@ -1673,6 +1710,28 @@ struct ArrayImporter {
return Status::OK();
}

Status ImportBinaryView(const BinaryViewType&) {
RETURN_NOT_OK(CheckNoChildren());
if (c_struct_->n_buffers < 3) {
return Status::Invalid("Expected at least 3 buffers for imported type ",
type_->ToString(), ", ArrowArray struct has ",
c_struct_->n_buffers);
}
RETURN_NOT_OK(AllocateArrayData());
RETURN_NOT_OK(ImportNullBitmap());
RETURN_NOT_OK(ImportFixedSizeBuffer(1, BinaryViewType::kSize));

// The last C data buffer stores buffer sizes, and shouldn't be imported
auto* buffer_sizes =
static_cast<const int64_t*>(c_struct_->buffers[c_struct_->n_buffers - 1]);

for (int32_t buffer_id = 2; buffer_id < c_struct_->n_buffers - 1; ++buffer_id) {
RETURN_NOT_OK(ImportBuffer(buffer_id, buffer_sizes[buffer_id - 2]));
}
data_->buffers.pop_back();
return Status::OK();
}

template <typename StringType>
Status ImportStringLike(const StringType& type) {
RETURN_NOT_OK(CheckNoChildren());
Expand Down Expand Up @@ -1836,7 +1895,7 @@ struct ArrayImporter {
std::shared_ptr<Buffer> zero_size_buffer_;

std::shared_ptr<MemoryManager> memory_mgr_;
DeviceAllocationType device_type_;
DeviceAllocationType device_type_{DeviceAllocationType::kCPU};
};

} // namespace
Expand Down Expand Up @@ -2042,7 +2101,7 @@ class ArrayStreamBatchReader : public RecordBatchReader {
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() {
~ArrayStreamBatchReader() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
Expand Down
Loading

0 comments on commit 2cf041f

Please sign in to comment.