Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes build failure with apache-arrow 14 #1609

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions .github/workflows/build-test-graph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ jobs:
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt update
sudo apt install -y libarrow-dev=12.0.1-1 \
libarrow-dataset-dev=12.0.1-1 \
libparquet-dev=12.0.1-1 \
libarrow-acero-dev=12.0.1-1 \
libarrow-flight-dev=12.0.1-1 \
libgandiva-dev=12.0.1-1 \
libparquet-dev=12.0.1-1 \
libarrow-cuda-dev=12.0.1-1
sudo apt install -y libarrow-dev=14.0.0-1 \
libarrow-dataset-dev=14.0.0-1 \
libarrow-acero-dev=14.0.0-1 \
libarrow-flight-dev=14.0.0-1 \
libgandiva-dev=14.0.0-1 \
libparquet-dev=14.0.0-1

# install clang-format
sudo curl -L https://github.com/muttleyxd/clang-tools-static-binaries/releases/download/master-1d7ec53d/clang-format-11_linux-amd64 --output /usr/bin/clang-format
Expand Down
13 changes: 6 additions & 7 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,12 @@ jobs:
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt update
sudo apt install -y libarrow-dev=11.0.0-1 \
libarrow-dataset-dev=11.0.0-1 \
libarrow-flight-dev=11.0.0-1 \
libgandiva-dev=11.0.0-1 \
libparquet-dev=11.0.0-1 \
libplasma-dev=11.0.0-1 \
libarrow-cuda-dev=11.0.0-1
sudo apt install -y libarrow-dev=14.0.0-1 \
libarrow-dataset-dev=14.0.0-1 \
libarrow-acero-dev=14.0.0-1 \
libarrow-flight-dev=14.0.0-1 \
libgandiva-dev=14.0.0-1 \
libparquet-dev=14.0.0-1

# install deps for java
sudo apt install -y default-jdk-headless maven
Expand Down
8 changes: 8 additions & 0 deletions modules/basic/ds/arrow_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ Status DeserializeRecordBatches(
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
RETURN_ON_ARROW_ERROR_AND_ASSIGN(
batch_reader, arrow::ipc::RecordBatchStreamReader::Open(&reader));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(batch_reader->ReadAll(batches));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(*batches, batch_reader->ToRecordBatches());
#endif
return Status::OK();
}

Expand Down Expand Up @@ -567,7 +571,11 @@ Status DeserializeTable(const std::shared_ptr<arrow::Buffer> buffer,
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
RETURN_ON_ARROW_ERROR_AND_ASSIGN(
batch_reader, arrow::ipc::RecordBatchStreamReader::Open(&reader));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(batch_reader->ReadAll(table));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(*table, batch_reader->ToTable());
#endif
return Status::OK();
}

Expand Down
6 changes: 5 additions & 1 deletion modules/fuse/adaptors/arrow_ipc/serializer_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ static void from_arrow_view(Client* client, std::string const& path,
std::shared_ptr<arrow::Table> table;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;

VINEYARD_CHECK_OK(reader->ReadAll(&batches));
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
CHECK_ARROW_ERROR(reader->ReadAll(&batches));
#else
CHECK_ARROW_ERROR_AND_ASSIGN(batches, reader->ToRecordBatches());
#endif

VINEYARD_CHECK_OK(RecordBatchesToTable(batches, &table));

Expand Down
18 changes: 11 additions & 7 deletions modules/graph/fragment/property_graph_utils_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,28 @@ boost::leaf::result<void> generate_local_id_list(
static_cast<size_t>(0), chunks.size(),
[pool, fid, &parser, &ovg2l_maps, &chunks,
&lid_list](size_t chunk_index) -> boost::leaf::result<void> {
ArrowBuilderType<VID_T> builder(pool);
arrow::BufferBuilder builder(pool);
auto chunk = std::dynamic_pointer_cast<ArrowArrayType<VID_T>>(
chunks[chunk_index]);
chunks[chunk_index].reset(); // release the used chunks
ARROW_OK_OR_RAISE(builder.Resize(chunk->length()));
ARROW_OK_OR_RAISE(builder.Resize(chunk->length() * sizeof(VID_T)));
builder.UnsafeAdvance(chunk->length() * sizeof(VID_T));

const VID_T* vec = chunk->raw_values();
VID_T* builder_data = reinterpret_cast<VID_T*>(builder.mutable_data());
for (int64_t i = 0; i < chunk->length(); ++i) {
VID_T gid = vec[i];
if (parser.GetFid(gid) == fid) {
builder[i] = parser.GenerateId(0, parser.GetLabelId(gid),
parser.GetOffset(gid));
builder_data[i] = parser.GenerateId(0, parser.GetLabelId(gid),
parser.GetOffset(gid));
} else {
builder[i] = ovg2l_maps[parser.GetLabelId(gid)].at(gid);
builder_data[i] = ovg2l_maps[parser.GetLabelId(gid)].at(gid);
}
}
ARROW_OK_OR_RAISE(builder.Advance(chunk->length()));
ARROW_OK_OR_RAISE(builder.Finish(&lid_list[chunk_index]));
std::shared_ptr<arrow::Buffer> buffer;
ARROW_OK_OR_RAISE(builder.Finish(&buffer));
lid_list[chunk_index] =
std::make_shared<ArrowArrayType<VID_T>>(chunk->length(), buffer);
return {};
},
concurrency);
Expand Down
29 changes: 29 additions & 0 deletions modules/graph/utils/table_shuffler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ Status TableAppender::Apply(
}
if (builder->GetField(0)->length() == builder->initial_capacity()) {
std::shared_ptr<arrow::RecordBatch> tmp_batch;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(builder->Flush(&tmp_batch));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(tmp_batch, builder->Flush());
#endif
batches_out.emplace_back(std::move(tmp_batch));
}
return Status::OK();
Expand All @@ -356,7 +360,11 @@ Status TableAppender::Flush(
// If there's no batch, we need an empty batch to make an empty table
if (builder->GetField(0)->length() != 0 || batches_out.size() == 0) {
std::shared_ptr<arrow::RecordBatch> batch;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
RETURN_ON_ARROW_ERROR(builder->Flush(&batch));
#else
RETURN_ON_ARROW_ERROR_AND_ASSIGN(batch, builder->Flush());
#endif
batches_out.emplace_back(std::move(batch));
}
return Status::OK();
Expand Down Expand Up @@ -633,13 +641,23 @@ void DeserializeSelectedRows(grape::OutArchive& arc,
int64_t row_num;
arc >> row_num;
std::unique_ptr<arrow::RecordBatchBuilder> builder;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(arrow::RecordBatchBuilder::Make(
schema, arrow::default_memory_pool(), row_num, &builder));
#else
ARROW_CHECK_OK_AND_ASSIGN(builder,
arrow::RecordBatchBuilder::Make(
schema, arrow::default_memory_pool(), row_num));
#endif
int col_num = builder->num_fields();
for (int col_id = 0; col_id != col_num; ++col_id) {
DeserializeSelectedItems(arc, row_num, builder->GetField(col_id));
}
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(builder->Flush(&batch_out));
#else
ARROW_CHECK_OK_AND_ASSIGN(batch_out, builder->Flush());
#endif
}

void SelectItems(std::shared_ptr<arrow::Array> array,
Expand Down Expand Up @@ -687,15 +705,26 @@ void SelectRows(std::shared_ptr<arrow::RecordBatch> record_batch_in,
return;
}
std::unique_ptr<arrow::RecordBatchBuilder> builder;
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(arrow::RecordBatchBuilder::Make(record_batch_in->schema(),
arrow::default_memory_pool(),
row_num, &builder));
#else
ARROW_CHECK_OK_AND_ASSIGN(
builder,
arrow::RecordBatchBuilder::Make(record_batch_in->schema(),
arrow::default_memory_pool(), row_num));
#endif
int col_num = builder->num_fields();
for (int col_id = 0; col_id != col_num; ++col_id) {
SelectItems(record_batch_in->column(col_id), offset,
builder->GetField(col_id));
}
#if defined(ARROW_VERSION) && ARROW_VERSION < 9000000
ARROW_CHECK_OK(builder->Flush(&record_batch_out));
#else
ARROW_CHECK_OK_AND_ASSIGN(record_batch_out, builder->Flush());
#endif
}

boost::leaf::result<void> ShuffleTableByOffsetLists(
Expand Down