Skip to content

Commit

Permalink
GH-36099: [C++] Add Utf8View and BinaryView to the c data bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Nov 1, 2023
1 parent 5d6192c commit 2d13898
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 20 deletions.
88 changes: 74 additions & 14 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
Expand All @@ -41,6 +42,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"
#include "arrow/util/small_vector.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"
Expand All @@ -51,6 +53,8 @@ namespace arrow {
using internal::checked_cast;
using internal::checked_pointer_cast;

using internal::Zip;

using internal::SmallVector;
using internal::StaticVector;

Expand Down Expand Up @@ -260,7 +264,7 @@ struct SchemaExporter {
// Dictionary type: parent struct describes index type,
// child dictionary struct describes value type.
RETURN_NOT_OK(VisitTypeInline(*dict_type.index_type(), this));
dict_exporter_.reset(new SchemaExporter());
dict_exporter_ = std::make_unique<SchemaExporter>();
RETURN_NOT_OK(dict_exporter_->ExportType(*dict_type.value_type()));
} else {
RETURN_NOT_OK(VisitTypeInline(type, this));
Expand Down Expand Up @@ -357,10 +361,14 @@ struct SchemaExporter {

Status Visit(const LargeBinaryType& type) { return SetFormat("Z"); }

Status Visit(const BinaryViewType& type) { return SetFormat("vz"); }

Status Visit(const StringType& type) { return SetFormat("u"); }

Status Visit(const LargeStringType& type) { return SetFormat("U"); }

Status Visit(const StringViewType& type) { return SetFormat("vu"); }

Status Visit(const Date32Type& type) { return SetFormat("tdD"); }

Status Visit(const Date64Type& type) { return SetFormat("tdm"); }
Expand Down Expand Up @@ -517,13 +525,14 @@ namespace {

struct ExportedArrayPrivateData : PoolAllocationMixin<ExportedArrayPrivateData> {
// The buffers are owned by the ArrayData member
StaticVector<const void*, 3> buffers_;
SmallVector<const void*, 3> buffers_;
struct ArrowArray dictionary_;
SmallVector<struct ArrowArray, 1> children_;
SmallVector<struct ArrowArray*, 4> child_pointers_;

std::shared_ptr<ArrayData> data_;
std::shared_ptr<Device::SyncEvent> sync_;
std::vector<int64_t> variadic_buffer_sizes_;

ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
Expand Down Expand Up @@ -566,15 +575,31 @@ struct ArrayExporter {
--n_buffers;
++buffers_begin;
}

bool need_variadic_buffer_sizes =
data->type->id() == Type::BINARY_VIEW || data->type->id() == Type::STRING_VIEW;
if (need_variadic_buffer_sizes) {
++n_buffers;
}

export_.buffers_.resize(n_buffers);
std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(),
[](const std::shared_ptr<Buffer>& buffer) -> const void* {
return buffer ? buffer->data() : nullptr;
});

if (need_variadic_buffer_sizes) {
auto variadic_buffers = util::span(data->buffers).subspan(2);
export_.variadic_buffer_sizes_.resize(variadic_buffers.size());
for (auto [size, buf] : Zip(export_.variadic_buffer_sizes_, variadic_buffers)) {
size = buf->size();
}
export_.buffers_.back() = export_.variadic_buffer_sizes_.data();
}

// Export dictionary
if (data->dictionary != nullptr) {
dict_exporter_.reset(new ArrayExporter());
dict_exporter_ = std::make_unique<ArrayExporter>();
RETURN_NOT_OK(dict_exporter_->Export(data->dictionary));
}

Expand Down Expand Up @@ -791,7 +816,7 @@ Status InvalidFormatString(std::string_view v) {

class FormatStringParser {
public:
FormatStringParser() {}
FormatStringParser() = default;

explicit FormatStringParser(std::string_view v) : view_(v), index_(0) {}

Expand Down Expand Up @@ -937,8 +962,6 @@ Result<DecodedMetadata> DecodeMetadata(const char* metadata) {
}

struct SchemaImporter {
SchemaImporter() : c_struct_(nullptr), guard_(nullptr) {}

Status Import(struct ArrowSchema* src) {
if (ArrowSchemaIsReleased(src)) {
return Status::Invalid("Cannot import released ArrowSchema");
Expand Down Expand Up @@ -1064,6 +1087,8 @@ struct SchemaImporter {
return ProcessPrimitive(binary());
case 'Z':
return ProcessPrimitive(large_binary());
case 'v':
return ProcessBinaryView();
case 'w':
return ProcessFixedSizeBinary();
case 'd':
Expand All @@ -1076,6 +1101,17 @@ struct SchemaImporter {
return f_parser_.Invalid();
}

Status ProcessBinaryView() {
RETURN_NOT_OK(f_parser_.CheckHasNext());
switch (f_parser_.Next()) {
case 'z':
return ProcessPrimitive(binary_view());
case 'u':
return ProcessPrimitive(utf8_view());
}
return f_parser_.Invalid();
}

Status ProcessTemporal() {
RETURN_NOT_OK(f_parser_.CheckHasNext());
switch (f_parser_.Next()) {
Expand Down Expand Up @@ -1337,8 +1373,8 @@ struct SchemaImporter {
return Status::OK();
}

struct ArrowSchema* c_struct_;
SchemaExportGuard guard_;
struct ArrowSchema* c_struct_{nullptr};
SchemaExportGuard guard_{nullptr};
FormatStringParser f_parser_;
int64_t recursion_level_;
std::vector<SchemaImporter> child_importers_;
Expand Down Expand Up @@ -1406,7 +1442,7 @@ class ImportedBuffer : public Buffer {
std::shared_ptr<ImportedArrayData> import)
: Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {}

~ImportedBuffer() override {}
~ImportedBuffer() override = default;

std::shared_ptr<Device::SyncEvent> device_sync_event() override {
return import_->device_sync_;
Expand All @@ -1418,9 +1454,7 @@ class ImportedBuffer : public Buffer {

struct ArrayImporter {
explicit ArrayImporter(const std::shared_ptr<DataType>& type)
: type_(type),
zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
device_type_(DeviceAllocationType::kCPU) {}
: type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)) {}

Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) {
ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id));
Expand Down Expand Up @@ -1568,6 +1602,10 @@ struct ArrayImporter {

Status Visit(const LargeBinaryType& type) { return ImportStringLike(type); }

Status Visit(const StringViewType& type) { return ImportBinaryView(type); }

Status Visit(const BinaryViewType& type) { return ImportBinaryView(type); }

Status Visit(const ListType& type) { return ImportListLike(type); }

Status Visit(const LargeListType& type) { return ImportListLike(type); }
Expand Down Expand Up @@ -1646,6 +1684,28 @@ struct ArrayImporter {
return Status::OK();
}

Status ImportBinaryView(const BinaryViewType&) {
RETURN_NOT_OK(CheckNoChildren());
if (c_struct_->n_buffers < 3) {
return Status::Invalid("Expected at least 3 buffers for imported type ",
type_->ToString(), ", ArrowArray struct has ",
c_struct_->n_buffers);
}
RETURN_NOT_OK(AllocateArrayData());
RETURN_NOT_OK(ImportNullBitmap());
RETURN_NOT_OK(ImportFixedSizeBuffer(1, BinaryViewType::kSize));

// The last C data buffer stores buffer sizes, and shouldn't be imported
auto* buffer_sizes =
static_cast<const int64_t*>(c_struct_->buffers[c_struct_->n_buffers - 1]);

for (int32_t buffer_id = 2; buffer_id < c_struct_->n_buffers - 1; ++buffer_id) {
RETURN_NOT_OK(ImportBuffer(buffer_id, buffer_sizes[buffer_id - 2]));
}
data_->buffers.pop_back();
return Status::OK();
}

template <typename StringType>
Status ImportStringLike(const StringType& type) {
RETURN_NOT_OK(CheckNoChildren());
Expand Down Expand Up @@ -1790,7 +1850,7 @@ struct ArrayImporter {
std::shared_ptr<Buffer> zero_size_buffer_;

std::shared_ptr<MemoryManager> memory_mgr_;
DeviceAllocationType device_type_;
DeviceAllocationType device_type_{};
};

} // namespace
Expand Down Expand Up @@ -1996,7 +2056,7 @@ class ArrayStreamBatchReader : public RecordBatchReader {
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() {
~ArrayStreamBatchReader() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
Expand Down
26 changes: 24 additions & 2 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"

// TODO(GH-37221): Remove these ifdef checks when compute dependency is removed
#ifdef ARROW_COMPUTE
Expand All @@ -57,6 +58,7 @@ using internal::ArrayStreamExportTraits;
using internal::checked_cast;
using internal::SchemaExportGuard;
using internal::SchemaExportTraits;
using internal::Zip;

template <typename T>
struct ExportTraits {};
Expand Down Expand Up @@ -353,6 +355,8 @@ TEST_F(TestSchemaExport, Primitive) {
TestPrimitive(large_binary(), "Z");
TestPrimitive(utf8(), "u");
TestPrimitive(large_utf8(), "U");
TestPrimitive(binary_view(), "vz");
TestPrimitive(utf8_view(), "vu");

TestPrimitive(decimal(16, 4), "d:16,4");
TestPrimitive(decimal256(16, 4), "d:16,4,256");
Expand Down Expand Up @@ -556,12 +560,24 @@ struct ArrayExportChecker {
--expected_n_buffers;
++expected_buffers;
}
ASSERT_EQ(c_export->n_buffers, expected_n_buffers);
bool has_variadic_buffer_sizes = expected_data.type->id() == Type::STRING_VIEW ||
expected_data.type->id() == Type::BINARY_VIEW;
ASSERT_EQ(c_export->n_buffers, expected_n_buffers + has_variadic_buffer_sizes);
ASSERT_NE(c_export->buffers, nullptr);
for (int64_t i = 0; i < c_export->n_buffers; ++i) {

for (int64_t i = 0; i < expected_n_buffers; ++i) {
auto expected_ptr = expected_buffers[i] ? expected_buffers[i]->data() : nullptr;
ASSERT_EQ(c_export->buffers[i], expected_ptr);
}
if (has_variadic_buffer_sizes) {
auto variadic_buffers = util::span(expected_data.buffers).subspan(2);
auto variadic_buffer_sizes = util::span(
static_cast<const int64_t*>(c_export->buffers[c_export->n_buffers - 1]),
variadic_buffers.size());
for (auto [buf, size] : Zip(variadic_buffers, variadic_buffer_sizes)) {
ASSERT_EQ(buf->size(), size);
}
}

if (expected_data.dictionary != nullptr) {
// Recurse into dictionary
Expand Down Expand Up @@ -874,6 +890,8 @@ TEST_F(TestArrayExport, Primitive) {
TestPrimitive(large_binary(), R"(["foo", "bar", null])");
TestPrimitive(utf8(), R"(["foo", "bar", null])");
TestPrimitive(large_utf8(), R"(["foo", "bar", null])");
TestPrimitive(binary_view(), R"(["foo", "bar", null])");
TestPrimitive(utf8_view(), R"(["foo", "bar", null])");

TestPrimitive(decimal(16, 4), R"(["1234.5670", null])");
TestPrimitive(decimal256(16, 4), R"(["1234.5670", null])");
Expand Down Expand Up @@ -1891,6 +1909,10 @@ TEST_F(TestSchemaImport, String) {
CheckImport(large_utf8());
FillPrimitive("Z");
CheckImport(large_binary());
FillPrimitive("vu");
CheckImport(utf8_view());
FillPrimitive("vz");
CheckImport(binary_view());

FillPrimitive("w:3");
CheckImport(fixed_size_binary(3));
Expand Down
2 changes: 1 addition & 1 deletion dev/archery/archery/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
BOOL = ArrowBool()


@click.group()
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option("--debug", type=BOOL, is_flag=True, default=False,
help="Increase logging with debugging output.")
@click.option("--pdb", type=BOOL, is_flag=True, default=False,
Expand Down
4 changes: 1 addition & 3 deletions dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1860,9 +1860,7 @@ def _temp_path():
.skip_tester('Go')
.skip_tester('Java')
.skip_tester('JS')
.skip_tester('Rust')
.skip_format(SKIP_C_SCHEMA, 'C++')
.skip_format(SKIP_C_ARRAY, 'C++'),
.skip_tester('Rust'),

generate_extension_case()
.skip_tester('C#')
Expand Down
12 changes: 12 additions & 0 deletions docs/source/format/CDataInterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ strings:
+-----------------+---------------------------------------------------+------------+
| ``Z`` | large binary | |
+-----------------+---------------------------------------------------+------------+
| ``vz`` | binary view | |
+-----------------+---------------------------------------------------+------------+
| ``u`` | utf-8 string | |
+-----------------+---------------------------------------------------+------------+
| ``U`` | large utf-8 string | |
+-----------------+---------------------------------------------------+------------+
| ``vu`` | utf-8 view | |
+-----------------+---------------------------------------------------+------------+
| ``d:19,10`` | decimal128 [precision 19, scale 10] | |
+-----------------+---------------------------------------------------+------------+
| ``d:19,10,NNN`` | decimal bitwidth = NNN [precision 19, scale 10] | |
Expand Down Expand Up @@ -542,6 +546,14 @@ parameterized extension types).
The ``ArrowArray`` structure exported from an extension array simply points
to the storage data of the extension array.

Binary view arrays
------------------

For binary or utf-8 view arrays, an extra buffer is appended which stores
the lengths of each variadic data buffer as ``int64_t``. This buffer is
necessary since these buffer lengths are not trivially extractable from
other data in an array of binary or utf-8 view type.

.. _c-data-interface-semantics:

Semantics
Expand Down

0 comments on commit 2d13898

Please sign in to comment.