diff --git a/WORKSPACE b/WORKSPACE index 7cfe32b..2faaa49 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -6,18 +6,21 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") # Abseil LTS 20230125.0 http_archive( name = "com_google_absl", - sha256 = "3ea49a7d97421b88a8c48a0de16c16048e17725c7ec0f1d3ea2683a2a75adc21", # SHARED_ABSL_SHA - strip_prefix = "abseil-cpp-20230125.0", + sha256 = "987ce98f02eefbaf930d6e38ab16aa05737234d7afbab2d5c4ea7adbe50c28ed", + strip_prefix = "abseil-cpp-20230802.1", urls = [ - "https://github.com/abseil/abseil-cpp/archive/refs/tags/20230125.0.tar.gz", + "https://github.com/abseil/abseil-cpp/archive/refs/tags/20230802.1.tar.gz", ], ) + # Version: pypi-v0.11.0, 2020/10/27 git_repository( name = "com_google_absl_py", - remote = "https://github.com/abseil/abseil-py", commit = "127c98870edf5f03395ce9cf886266fa5f24455e", + remote = "https://github.com/abseil/abseil-py", + shallow_since = "1673401277 -0800", ) + # Needed by com_google_riegeli http_archive( name = "org_brotli", @@ -25,23 +28,20 @@ http_archive( strip_prefix = "brotli-3914999fcc1fda92e750ef9190aa6db9bf7bdb07", urls = ["https://github.com/google/brotli/archive/3914999fcc1fda92e750ef9190aa6db9bf7bdb07.zip"], # 2022-11-17 ) + # GoogleTest/GoogleMock framework. Used by most unit-tests. http_archive( - name = "com_google_googletest", - urls = ["https://github.com/google/googletest/archive/main.zip"], - strip_prefix = "googletest-main", + name = "com_google_googletest", + sha256 = "24e06e79a78ca5794ec6ad2bf0a1f05515cd1d05a9e10d9a6dc853078b2f3914", + strip_prefix = "googletest-main", + urls = ["https://github.com/google/googletest/archive/main.zip"], ) # V3.4.0, 20210818 http_archive( - name = "eigen3", - sha256 = "b4c198460eba6f28d34894e3a5710998818515104d6e74e5cc331ce31e46e626", - strip_prefix = "eigen-3.4.0", - urls = [ - "https://gitlab.com/libeigen/eigen/-/archive/3.4.0/eigen-3.4.0.tar.bz2", - ], - build_file_content = -""" + name = "eigen3", + build_file_content = + """ cc_library( name = 'eigen3', srcs = [], @@ -49,49 +49,58 @@ cc_library( hdrs = glob(['Eigen/**', 'unsupported/Eigen/**']), visibility = ['//visibility:public'], ) -""" +""", + sha256 = "b4c198460eba6f28d34894e3a5710998818515104d6e74e5cc331ce31e46e626", + strip_prefix = "eigen-3.4.0", + urls = [ + "https://gitlab.com/libeigen/eigen/-/archive/3.4.0/eigen-3.4.0.tar.bz2", + ], ) # `pybind11_bazel` (https://github.com/pybind/pybind11_bazel): 20230130 http_archive( - name = "pybind11_bazel", - strip_prefix = "pybind11_bazel-5f458fa53870223a0de7eeb60480dd278b442698", - sha256 = "b35f3abc3d52ee5c753fdeeb2b5129b99e796558754ca5d245e28e51c1072a21", - urls = ["https://github.com/pybind/pybind11_bazel/archive/5f458fa53870223a0de7eeb60480dd278b442698.tar.gz"], + name = "pybind11_bazel", + sha256 = "b35f3abc3d52ee5c753fdeeb2b5129b99e796558754ca5d245e28e51c1072a21", + strip_prefix = "pybind11_bazel-5f458fa53870223a0de7eeb60480dd278b442698", + urls = ["https://github.com/pybind/pybind11_bazel/archive/5f458fa53870223a0de7eeb60480dd278b442698.tar.gz"], ) + # V2.10.3, 20230130 http_archive( - name = "pybind11", - build_file = "@pybind11_bazel//:pybind11.BUILD", - strip_prefix = "pybind11-2.10.3", - sha256 = "201966a61dc826f1b1879a24a3317a1ec9214a918c8eb035be2f30c3e9cfbdcb", - urls = ["https://github.com/pybind/pybind11/archive/refs/tags/v2.10.3.zip"], + name = "pybind11", + build_file = "@pybind11_bazel//:pybind11.BUILD", + sha256 = "201966a61dc826f1b1879a24a3317a1ec9214a918c8eb035be2f30c3e9cfbdcb", + strip_prefix = "pybind11-2.10.3", + urls = ["https://github.com/pybind/pybind11/archive/refs/tags/v2.10.3.zip"], ) + load("@pybind11_bazel//:python_configure.bzl", "python_configure") + python_configure(name = "local_config_python") -# V21.12, 20230130 # proto_library, cc_proto_library, and java_proto_library rules implicitly # depend on @com_google_protobuf for protoc and proto runtimes. # This statement defines the @com_google_protobuf repo. http_archive( name = "com_google_protobuf", - sha256 = "22fdaf641b31655d4b2297f9981fa5203b2866f8332d3c6333f6b0107bb320de", - strip_prefix = "protobuf-21.12", - urls = ["https://github.com/protocolbuffers/protobuf/archive/v21.12.tar.gz"], + sha256 = "dc167b7d23ec0d6e4a3d4eae1798de6c8d162e69fa136d39753aaeb7a6e1289d", + strip_prefix = "protobuf-23.1", + urls = ["https://github.com/protocolbuffers/protobuf/archive/v23.1.tar.gz"], ) load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") + protobuf_deps() -# Riegeli does not cut releases, so we reference the head http_archive( name = "com_google_riegeli", - strip_prefix = "riegeli-master", + sha256 = "5615438b3809fdd62266030e2c6f19c457a15bfb6ef3aa8132503e8584305f8a", + strip_prefix = "riegeli-254e6d74ee0d325676739fe5075e5a1a895624cf", urls = [ - "https://github.com/google/riegeli/archive/master.zip", + "https://github.com/google/riegeli/archive/254e6d74ee0d325676739fe5075e5a1a895624cf.tar.gz", ], ) + # Riegeli's dependencies http_archive( name = "net_zstd", @@ -100,6 +109,7 @@ http_archive( strip_prefix = "zstd-1.4.5/lib", urls = ["https://github.com/facebook/zstd/archive/v1.4.5.zip"], # 2020-05-22 ) + http_archive( name = "lz4", build_file = "@com_google_riegeli//third_party:lz4.BUILD", @@ -107,6 +117,7 @@ http_archive( strip_prefix = "lz4-1.9.3/lib", urls = ["https://github.com/lz4/lz4/archive/refs/tags/v1.9.3.zip"], # 2020-11-16 ) + http_archive( name = "snappy", build_file = "@com_google_riegeli//third_party:snappy.BUILD", @@ -114,6 +125,7 @@ http_archive( strip_prefix = "snappy-1.2.0", urls = ["https://github.com/google/snappy/archive/1.2.0.zip"], # 2024-04-04 ) + http_archive( name = "crc32c", build_file = "@com_google_riegeli//third_party:crc32.BUILD", @@ -121,6 +133,7 @@ http_archive( strip_prefix = "crc32c-1.1.0", urls = ["https://github.com/google/crc32c/archive/1.1.0.zip"], # 2019-05-24 ) + http_archive( name = "zlib", build_file = "@com_google_riegeli//third_party:zlib.BUILD", @@ -128,6 +141,7 @@ http_archive( strip_prefix = "zlib-1.2.11", urls = ["http://zlib.net/fossils/zlib-1.2.11.tar.gz"], # 2017-01-15 ) + http_archive( name = "highwayhash", build_file = "@com_google_riegeli//third_party:highwayhash.BUILD", @@ -139,14 +153,16 @@ http_archive( # Tensorflow, 20230705 http_archive( name = "org_tensorflow", - strip_prefix = "tensorflow-2.12.1", sha256 = "63025cb60d00d9aa7a88807651305a38abb9bb144464e2419c03f13a089d19a6", + strip_prefix = "tensorflow-2.12.1", urls = ["https://github.com/tensorflow/tensorflow/archive/v2.12.1.zip"], ) +load("@org_tensorflow//tensorflow/tools/toolchains:cpus/aarch64/aarch64_compiler_configure.bzl", "aarch64_compiler_configure") # buildifier: disable=load-on-top + # This import (along with the org_tensorflow archive) is necessary to provide the devtoolset-9 toolchain load("@org_tensorflow//tensorflow/tools/toolchains/remote_config:configs.bzl", "initialize_rbe_configs") # buildifier: disable=load-on-top -load("@org_tensorflow//tensorflow/tools/toolchains:cpus/aarch64/aarch64_compiler_configure.bzl", "aarch64_compiler_configure") # buildifier: disable=load-on-top initialize_rbe_configs() + aarch64_compiler_configure() diff --git a/cpp/BUILD b/cpp/BUILD index 67f1e26..8213174 100644 --- a/cpp/BUILD +++ b/cpp/BUILD @@ -1,7 +1,7 @@ # ArrayRecord is a new file format for IO intensive applications. # It supports efficient random access and various compression algorithms. -load("//third_party/protobuf/bazel:proto_library.bzl", "proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") package(default_visibility = ["//visibility:public"]) @@ -17,11 +17,6 @@ cc_proto_library( deps = [":layout_proto"], ) -go_proto_library( - name = "layout_go_proto", - deps = [":layout_proto"], -) - cc_library( name = "common", hdrs = ["common.h"], @@ -120,13 +115,13 @@ cc_library( ":sequenced_chunk_writer", ":shareable_dependency", ":thread_pool", - "//third_party/protobuf:protobuf_lite", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/status", "@com_google_absl//absl/status:statusor", "@com_google_absl//absl/strings", "@com_google_absl//absl/synchronization", "@com_google_absl//absl/types:span", + "@com_google_protobuf//:protobuf_lite", "@com_google_riegeli//riegeli/base:initializer", "@com_google_riegeli//riegeli/base:object", "@com_google_riegeli//riegeli/base:options_parser", @@ -152,8 +147,10 @@ cc_library( deps = [ ":common", "@com_google_absl//absl/memory", + "@com_google_absl//absl/status", + "@com_google_absl//absl/time", + "@com_google_absl//absl/types:optional", "@com_google_riegeli//riegeli/base:object", - "@com_google_riegeli//riegeli/base:shared_buffer", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/base:types", "@com_google_riegeli//riegeli/bytes:reader", @@ -171,7 +168,6 @@ cc_library( ":parallel_for", ":shareable_dependency", ":thread_pool", - "//third_party/protobuf:protobuf_lite", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/functional:any_invocable", "@com_google_absl//absl/functional:function_ref", @@ -180,6 +176,7 @@ cc_library( "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/types:span", + "@com_google_protobuf//:protobuf_lite", "@com_google_riegeli//riegeli/base:initializer", "@com_google_riegeli//riegeli/base:object", "@com_google_riegeli//riegeli/base:options_parser", diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index a8c51b0..2794ec9 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -40,11 +40,9 @@ limitations under the License. #include "cpp/masked_reader.h" #include "cpp/parallel_for.h" #include "cpp/thread_pool.h" -#include "third_party/protobuf/message_lite.h" -#include "riegeli/base/maker.h" +#include "google/protobuf/message_lite.h" #include "riegeli/base/object.h" #include "riegeli/base/options_parser.h" -#include "riegeli/base/shared_ptr.h" #include "riegeli/base/status.h" #include "riegeli/bytes/reader.h" #include "riegeli/chunk_encoding/chunk.h" @@ -170,12 +168,12 @@ ChunkDecoder ReadChunk(Reader& reader, size_t pos, size_t len) { decoder.Fail(reader.status()); return decoder; } - MaskedReader masked_reader(*reader.NewReader(pos), len); + MaskedReader masked_reader(reader.NewReader(pos), len); if (!masked_reader.ok()) { decoder.Fail(masked_reader.status()); return decoder; } - riegeli::DefaultChunkReader chunk_reader(&masked_reader); + auto chunk_reader = riegeli::DefaultChunkReader<>(&masked_reader); Chunk chunk; if (!chunk_reader.ReadChunk(chunk)) { decoder.Fail(chunk_reader.status()); @@ -345,15 +343,15 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader.Reset( - *reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + masked_reader = MaskedReader( + reader->NewReader(state_->chunk_offsets[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader chunk_reader(&masked_reader); + riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -422,15 +420,15 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader.Reset( - *reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + masked_reader = MaskedReader( + reader->NewReader(state_->chunk_offsets[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader chunk_reader(&masked_reader); + riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -541,14 +539,14 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader.Reset( - *reader->NewReader(state_->chunk_offsets[buffer_chunks[0]]), + masked_reader = MaskedReader( + reader->NewReader(state_->chunk_offsets[buffer_chunks[0]]), buf_len); } for (auto chunk_idx : buffer_chunks) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader chunk_reader(&masked_reader); + riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -688,8 +686,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { // movable, OSS ThreadPool only takes std::function which requires all the // captures to be copyable. Therefore we must wrap the promise in a // shared_ptr to copy it over to the scheduled task. - riegeli::SharedPtr decoder_promise( - riegeli::Maker>>()); + auto decoder_promise = + std::make_shared>>(); state_->future_decoders.push( {buffer_to_add, decoder_promise->get_future()}); const auto reader = get_backing_reader(); @@ -721,8 +719,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader.Reset(*reader->NewReader(chunk_offsets.front()), - buffer_len); + masked_reader = + MaskedReader(reader->NewReader(chunk_offsets.front()), buffer_len); } if (!masked_reader.ok()) { for (auto& decoder : decoders) { @@ -735,7 +733,7 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); for (auto local_chunk_idx : IndicesOf(chunk_offsets)) { masked_reader.Seek(chunk_offsets[local_chunk_idx]); - riegeli::DefaultChunkReader chunk_reader(&masked_reader); + auto chunk_reader = riegeli::DefaultChunkReader<>(&masked_reader); Chunk chunk; if (!chunk_reader.ReadChunk(chunk)) { decoders[local_chunk_idx].Fail(chunk_reader.status()); diff --git a/cpp/array_record_reader.h b/cpp/array_record_reader.h index 10afa61..05717e6 100644 --- a/cpp/array_record_reader.h +++ b/cpp/array_record_reader.h @@ -48,7 +48,7 @@ limitations under the License. #include "cpp/common.h" #include "cpp/shareable_dependency.h" #include "cpp/thread_pool.h" -#include "third_party/protobuf/message_lite.h" +#include "google/protobuf/message_lite.h" #include "riegeli/base/initializer.h" #include "riegeli/base/object.h" #include "riegeli/bytes/reader.h" @@ -378,7 +378,8 @@ template explicit ArrayRecordReader( Src&& src, ArrayRecordReaderBase::Options options = ArrayRecordReaderBase::Options(), - ARThreadPool* pool = nullptr) -> ArrayRecordReader>; + ARThreadPool* pool = nullptr) -> +ArrayRecordReader>; } // namespace array_record diff --git a/cpp/array_record_writer.cc b/cpp/array_record_writer.cc index 62c3c51..4e28c4c 100644 --- a/cpp/array_record_writer.cc +++ b/cpp/array_record_writer.cc @@ -39,11 +39,9 @@ limitations under the License. #include "cpp/layout.pb.h" #include "cpp/sequenced_chunk_writer.h" #include "cpp/thread_pool.h" -#include "third_party/protobuf/message_lite.h" -#include "riegeli/base/maker.h" +#include "google/protobuf/message_lite.h" #include "riegeli/base/object.h" #include "riegeli/base/options_parser.h" -#include "riegeli/base/shared_ptr.h" #include "riegeli/base/status.h" #include "riegeli/bytes/chain_writer.h" #include "riegeli/chunk_encoding/chunk.h" @@ -68,7 +66,7 @@ constexpr uint32_t kZstdDefaultWindowLog = 20; // Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16` constexpr uint64_t kMagic = 0x71930e704fdae05eULL; -// zstd:3 gives a good trade-off for both the compression and decompression +// zstd:3 gives a good trade-off for both the compression and decomopression // speed. constexpr char kArrayRecordDefaultCompression[] = "zstd:3"; @@ -171,7 +169,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { return options_parser.status(); } // From our benchmarks we figured zstd:3 gives the best trade-off for both the - // compression and decompression speed. + // compression and decomopression speed. if (text == "default" || (!absl::StrContains(compressor_text, "uncompressed") && !absl::StrContains(compressor_text, "brotli") && @@ -390,28 +388,23 @@ void ArrayRecordWriterBase::Done() { submit_chunk_callback_->WriteFooterAndPostscript(writer.get()); } -riegeli::SharedPtr -ArrayRecordWriterBase::CreateEncoder() { - auto wrap_encoder = - [this](auto encoder) -> riegeli::SharedPtr { - if (pool_) { - return riegeli::SharedPtr( - riegeli::Maker(std::move(encoder))); - } else { - return riegeli::SharedPtr(std::move(encoder)); - } - }; +std::unique_ptr ArrayRecordWriterBase::CreateEncoder() { + std::unique_ptr encoder; if (options_.transpose()) { - return wrap_encoder(riegeli::Maker( + encoder = std::make_unique( options_.compressor_options(), riegeli::TransposeEncoder::TuningOptions().set_bucket_size( - options_.transpose_bucket_size()))); + options_.transpose_bucket_size())); } else { - return wrap_encoder(riegeli::Maker( + encoder = std::make_unique( options_.compressor_options(), riegeli::SimpleEncoder::TuningOptions().set_size_hint( - submit_chunk_callback_->get_last_decoded_data_size()))); + submit_chunk_callback_->get_last_decoded_data_size())); + } + if (pool_) { + return std::make_unique(std::move(encoder)); } + return encoder; } bool ArrayRecordWriterBase::WriteRecord(const google::protobuf::MessageLite& record) { @@ -439,18 +432,20 @@ bool ArrayRecordWriterBase::WriteRecordImpl(Record&& record) { if (chunk_encoder_->num_records() >= options_.group_size()) { auto writer = get_writer(); auto encoder = std::move(chunk_encoder_); - riegeli::SharedPtr chunk_promise( - riegeli::Maker>>()); + auto chunk_promise = + std::make_shared>>(); if (!writer->CommitFutureChunk(chunk_promise->get_future())) { Fail(writer->status()); return false; } chunk_encoder_ = CreateEncoder(); if (pool_ && options_.max_parallelism().value() > 1) { + std::shared_ptr shared_encoder = + std::move(encoder); submit_chunk_callback_->TrackConcurrentChunkWriters(); - pool_->Schedule([writer, encoder, chunk_promise]() mutable { + pool_->Schedule([writer, shared_encoder, chunk_promise]() mutable { AR_ENDO_TASK("Encode riegeli chunk"); - chunk_promise->set_value(EncodeChunk(encoder.get())); + chunk_promise->set_value(EncodeChunk(shared_encoder.get())); writer->SubmitFutureChunks(false); }); return true; diff --git a/cpp/array_record_writer.h b/cpp/array_record_writer.h index 2054262..4ad0d05 100644 --- a/cpp/array_record_writer.h +++ b/cpp/array_record_writer.h @@ -74,7 +74,6 @@ limitations under the License. #include "cpp/thread_pool.h" #include "riegeli/base/initializer.h" #include "riegeli/base/object.h" -#include "riegeli/base/shared_ptr.h" #include "riegeli/bytes/writer.h" #include "riegeli/chunk_encoding/chunk_encoder.h" #include "riegeli/chunk_encoding/compressor_options.h" @@ -327,7 +326,7 @@ class ArrayRecordWriterBase : public riegeli::Object { void Done() override; private: - riegeli::SharedPtr CreateEncoder(); + std::unique_ptr CreateEncoder(); template bool WriteRecordImpl(Record&& record); @@ -337,7 +336,7 @@ class ArrayRecordWriterBase : public riegeli::Object { Options options_; ARThreadPool* pool_; - riegeli::SharedPtr chunk_encoder_; + std::unique_ptr chunk_encoder_; std::unique_ptr submit_chunk_callback_; }; @@ -418,7 +417,8 @@ template explicit ArrayRecordWriter( Dest&& dest, ArrayRecordWriterBase::Options options = ArrayRecordWriterBase::Options(), - ARThreadPool* pool = nullptr) -> ArrayRecordWriter>; + ARThreadPool* pool = nullptr) -> +ArrayRecordWriter>; } // namespace array_record diff --git a/cpp/masked_reader.cc b/cpp/masked_reader.cc index 4fa9913..5315972 100644 --- a/cpp/masked_reader.cc +++ b/cpp/masked_reader.cc @@ -15,18 +15,18 @@ limitations under the License. #include "cpp/masked_reader.h" -#include #include -#include +#include #include #include "absl/memory/memory.h" +#include "absl/status/status.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" #include "cpp/common.h" #include "riegeli/base/object.h" -#include "riegeli/base/shared_buffer.h" #include "riegeli/base/status.h" #include "riegeli/base/types.h" -#include "riegeli/bytes/reader.h" namespace array_record { @@ -34,44 +34,53 @@ using riegeli::Annotate; using riegeli::Position; using riegeli::Reader; -MaskedReader::MaskedReader(Reader& src_reader, size_t length) { - Initialize(src_reader, length); +MaskedReader::MaskedReader(std::unique_ptr src_reader, + size_t length) + : buffer_(std::make_shared()) { + auto pos = src_reader->pos(); + buffer_->resize(length); + if (!src_reader->Read(length, buffer_->data())) { + Fail(Annotate(src_reader->status(), + "Could not read from the underlying reader")); + return; + } + /* + * limit_pos + * |---------------------------| + * buffer_start buffer_limit + * |................|----------| + */ + set_buffer(buffer_->data(), buffer_->size()); + set_limit_pos(pos + buffer_->size()); } -MaskedReader::MaskedReader(riegeli::SharedBuffer buffer, size_t length, +MaskedReader::MaskedReader(std::shared_ptr buffer, Position limit_pos) - : buffer_(std::move(buffer)) { - // limit_pos - // |---------------------------| - // buffer_start buffer_limit - // |................|----------| - set_buffer(buffer_.data(), length); + : buffer_(buffer) { + /* + * limit_pos + * |---------------------------| + * buffer_start buffer_limit + * |................|----------| + */ + set_buffer(buffer_->data(), buffer_->size()); set_limit_pos(limit_pos); } -void MaskedReader::Reset(riegeli::Closed) { - Reader::Reset(riegeli::kClosed); - buffer_.Reset(); +MaskedReader::MaskedReader(MaskedReader &&other) noexcept + : Reader(std::move(other)) { + buffer_ = other.buffer_; // NOLINT(bugprone-use-after-move) + other.Reset(riegeli::kClosed); // NOLINT(bugprone-use-after-move) } -void MaskedReader::Reset(Reader& src_reader, size_t length) { - Reader::Reset(); - Initialize(src_reader, length); -} - -void MaskedReader::Initialize(Reader& src_reader, size_t length) { - buffer_.Reset(length); - if (!src_reader.Read(length, buffer_.mutable_data())) { - Fail(Annotate(src_reader.status(), - "Could not read from the underlying reader")); - return; - } - // limit_pos - // |---------------------------| - // buffer_start buffer_limit - // |................|----------| - set_buffer(buffer_.data(), length); - set_limit_pos(src_reader.pos()); +MaskedReader &MaskedReader::operator=(MaskedReader &&other) noexcept { + // Move other + Reader::operator=(static_cast(other)); + // Copy the shared buffer. + buffer_ = other.buffer_; + // Close `other` + other.Reset(riegeli::kClosed); + return *this; } bool MaskedReader::PullSlow(size_t min_length, size_t recommended_length) { @@ -79,19 +88,21 @@ bool MaskedReader::PullSlow(size_t min_length, size_t recommended_length) { return false; } -bool MaskedReader::SeekSlow(Position new_pos) { +bool MaskedReader::SeekSlow(riegeli::Position new_pos) { Fail(FailedPreconditionError("Should not seek beyond buffer")); return false; } -std::optional MaskedReader::SizeImpl() { return limit_pos(); } +absl::optional MaskedReader::SizeImpl() { + return limit_pos(); +} std::unique_ptr MaskedReader::NewReaderImpl(Position initial_pos) { if (!ok()) { return nullptr; } - std::unique_ptr reader = absl::WrapUnique( - new MaskedReader(buffer_, start_to_limit(), limit_pos())); + std::unique_ptr reader = + absl::WrapUnique(new MaskedReader(buffer_, limit_pos())); if (!reader->Seek(initial_pos)) { return nullptr; } diff --git a/cpp/masked_reader.h b/cpp/masked_reader.h index 28d8663..6d9d412 100644 --- a/cpp/masked_reader.h +++ b/cpp/masked_reader.h @@ -16,12 +16,11 @@ limitations under the License. #ifndef ARRAY_RECORD_CPP_MASKED_READER_H_ #define ARRAY_RECORD_CPP_MASKED_READER_H_ -#include #include -#include +#include +#include "absl/types/optional.h" #include "riegeli/base/object.h" -#include "riegeli/base/shared_buffer.h" #include "riegeli/base/types.h" #include "riegeli/bytes/reader.h" @@ -48,13 +47,10 @@ class MaskedReader : public riegeli::Reader { public: explicit MaskedReader(riegeli::Closed) : riegeli::Reader(riegeli::kClosed) {} - MaskedReader(riegeli::Reader& src_reader, size_t length); + MaskedReader(std::unique_ptr src_reader, size_t length); - MaskedReader(MaskedReader&& other) = default; - MaskedReader& operator=(MaskedReader&& other) = default; - - void Reset(riegeli::Closed); - void Reset(riegeli::Reader& src_reader, size_t length); + MaskedReader(MaskedReader &&other) noexcept; + MaskedReader &operator=(MaskedReader &&other) noexcept; bool SupportsRandomAccess() override { return true; } bool SupportsNewReader() override { return true; } @@ -63,18 +59,16 @@ class MaskedReader : public riegeli::Reader { bool PullSlow(size_t min_length, size_t recommended_length) override; bool SeekSlow(riegeli::Position new_pos) override; - std::optional SizeImpl() override; + absl::optional SizeImpl() override; std::unique_ptr NewReaderImpl( riegeli::Position initial_pos) override; private: // Private constructor that copies itself. - MaskedReader(riegeli::SharedBuffer buffer, size_t length, + MaskedReader(std::shared_ptr buffer, riegeli::Position limit_pos); - void Initialize(riegeli::Reader& src_reader, size_t length); - - riegeli::SharedBuffer buffer_; + std::shared_ptr buffer_; }; } // namespace array_record diff --git a/cpp/masked_reader_test.cc b/cpp/masked_reader_test.cc index 2d8ac0d..adf1a4e 100644 --- a/cpp/masked_reader_test.cc +++ b/cpp/masked_reader_test.cc @@ -29,7 +29,7 @@ TEST(MaskedReaderTest, SanityTest) { auto data = std::string("0123456789abcdef"); auto base_reader = StringReader(data); // 56789abc - MaskedReader masked_reader1(*base_reader.NewReader(5), 8); + auto masked_reader1 = MaskedReader(base_reader.NewReader(5), 8); // Matches where we offset the reader. EXPECT_EQ(masked_reader1.pos(), 5); // Matches offset + mask length diff --git a/cpp/parallel_for.h b/cpp/parallel_for.h index 5ad3484..48629dd 100644 --- a/cpp/parallel_for.h +++ b/cpp/parallel_for.h @@ -282,6 +282,19 @@ inline void ParallelFor(SeqT seq, ARThreadPool* pool, Function func, opts.max_parallelism, DivRoundUp(*seq.end() - *seq.begin(), SeqT::Stride() * kMinItersPerBatch)); + // Unfortunately TF ThreadPool has no interface to monitor queue fullness + // Serialized vanilla for-loop for handling any of: + // + // * No ThreadPool provided. + // + // * ThreadPool has fallen very far behind. + // + // * Small arrays w/ only 1 batch of work to do. + // + // Note that the compiler will inline this logic, making ParallelFor + // equivalent to a traditional C++ for-loop for the cases above. + if (pool == nullptr || + desired_threads <= 1 ) { for (size_t idx : seq) { func(idx); diff --git a/cpp/sequenced_chunk_writer.h b/cpp/sequenced_chunk_writer.h index 6c1a043..18660ed 100644 --- a/cpp/sequenced_chunk_writer.h +++ b/cpp/sequenced_chunk_writer.h @@ -279,7 +279,7 @@ class SequencedChunkWriter : public SequencedChunkWriterBase { template explicit SequencedChunkWriter(Dest&& dest) - -> SequencedChunkWriter>; + -> SequencedChunkWriter>; } // namespace array_record diff --git a/cpp/sequenced_chunk_writer_test.cc b/cpp/sequenced_chunk_writer_test.cc index e6b45ab..c9ac904 100644 --- a/cpp/sequenced_chunk_writer_test.cc +++ b/cpp/sequenced_chunk_writer_test.cc @@ -17,6 +17,7 @@ limitations under the License. #include #include // NOLINT(build/c++11) +#include #include #include #include @@ -30,7 +31,6 @@ limitations under the License. #include "cpp/common.h" #include "cpp/thread_pool.h" #include "riegeli/base/maker.h" -#include "riegeli/base/shared_ptr.h" #include "riegeli/bytes/chain_writer.h" #include "riegeli/bytes/cord_writer.h" #include "riegeli/bytes/string_reader.h" @@ -49,25 +49,27 @@ TEST(SequencedChunkWriterTest, RvalCtorTest) { // riegeli writer. { std::string dest; - riegeli::StringWriter str_writer(&dest); - SequencedChunkWriter to_string(std::move(str_writer)); + auto str_writer = riegeli::StringWriter(&dest); + auto to_string = SequencedChunkWriter(std::move(str_writer)); } { absl::Cord cord; - riegeli::CordWriter cord_writer(&cord); - SequencedChunkWriter to_cord(std::move(cord_writer)); + auto cord_writer = riegeli::CordWriter(&cord); + auto to_cord = SequencedChunkWriter(std::move(cord_writer)); } { std::string dest; - riegeli::StringWriter str_writer(&dest); + auto str_writer = riegeli::StringWriter(&dest); auto to_string = - riegeli::Maker(std::move(str_writer)).UniquePtr(); + std::make_unique>>( + std::move(str_writer)); } { absl::Cord cord; - riegeli::CordWriter cord_writer(&cord); - auto to_cord = riegeli::Maker(std::move(cord_writer)) - .UniquePtr(); + auto cord_writer = riegeli::CordWriter(&cord); + auto to_cord = + std::make_unique>>( + std::move(cord_writer)); } } @@ -76,25 +78,26 @@ TEST(SequencedChunkWriterTest, DestArgsCtorTest) { // templated riegeli writer. { std::string dest; - SequencedChunkWriter to_string( - riegeli::Maker(&dest)); + auto to_string = + SequencedChunkWriter(riegeli::Maker(&dest)); } { absl::Cord cord; - SequencedChunkWriter to_cord(riegeli::Maker(&cord)); + auto to_cord = + SequencedChunkWriter(riegeli::Maker(&cord)); } { std::string dest; - auto to_string = riegeli::Maker( - riegeli::Maker(&dest)) - .UniquePtr(); + auto to_string = + std::make_unique>>( + riegeli::Maker(&dest)); } { absl::Cord cord; - auto to_cord = riegeli::Maker( - riegeli::Maker(&cord)) - .UniquePtr(); + auto to_cord = + std::make_unique>>( + riegeli::Maker(&cord)); } } @@ -117,8 +120,8 @@ TEST(SequencedChunkWriterTest, SanityTestCodeSnippet) { std::string encoded; auto callback = TestCommitChunkCallback(); - riegeli::SharedPtr writer(riegeli::Maker( - riegeli::Maker(&encoded))); + auto writer = std::make_shared>>( + riegeli::Maker(&encoded)); writer->set_submit_chunk_callback(&callback); ASSERT_TRUE(writer->ok()) << writer->status(); @@ -184,8 +187,8 @@ TEST(SequencedChunkWriterTest, SanityTestBadChunk) { std::string encoded; auto callback = TestCommitChunkCallback(); - riegeli::SharedPtr writer(riegeli::Maker( - riegeli::Maker(&encoded))); + auto writer = std::make_shared>>( + riegeli::Maker(&encoded)); writer->set_submit_chunk_callback(&callback); ASSERT_TRUE(writer->ok()) << writer->status(); std::packaged_task()> encoding_task( diff --git a/oss/build_whl.sh b/oss/build_whl.sh index 275c868..8267364 100755 --- a/oss/build_whl.sh +++ b/oss/build_whl.sh @@ -38,9 +38,7 @@ function main() { write_to_bazelrc "test --crosstool_top=${CROSSTOOL_TOP}" fi - # Using a previous version of Blaze to avoid: - # https://github.com/bazelbuild/bazel/issues/8622 - export USE_BAZEL_VERSION=5.4.0 + export USE_BAZEL_VERSION="${BAZEL_VERSION}" bazel clean bazel build ... bazel test --verbose_failures --test_output=errors ... diff --git a/oss/runner_common.sh b/oss/runner_common.sh index 2ee2c8c..323d106 100644 --- a/oss/runner_common.sh +++ b/oss/runner_common.sh @@ -15,7 +15,7 @@ function build_and_test_array_record() { # Build wheels for multiple Python minor versions. PYTHON_MAJOR_VERSION=3 - for PYTHON_MINOR_VERSION in 9 10 11 12 + for PYTHON_MINOR_VERSION in 10 11 12 do PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION} PYTHON_BIN=/opt/python/cp${PYTHON_MAJOR_VERSION}${PYTHON_MINOR_VERSION}-cp${PYTHON_MAJOR_VERSION}${PYTHON_MINOR_VERSION}/bin diff --git a/python/BUILD b/python/BUILD index 42ceba6..58ed6ce 100644 --- a/python/BUILD +++ b/python/BUILD @@ -9,7 +9,6 @@ licenses(["notice"]) pybind_extension( name = "array_record_module", srcs = ["array_record_module.cc"], - import_test = False, deps = [ "@com_google_absl//absl/status", "@com_google_absl//absl/strings", diff --git a/setup.py b/setup.py index f5d6dda..c38a1e2 100644 --- a/setup.py +++ b/setup.py @@ -32,17 +32,15 @@ def has_ext_modules(self): packages=find_packages(), include_package_data=True, package_data={'': ['*.so']}, - python_requires='>=3.9', + python_requires='>=3.10', install_requires=REQUIRED_PACKAGES, - extras_require={ - 'beam': BEAM_EXTRAS - }, + extras_require={'beam': BEAM_EXTRAS}, url='https://github.com/google/array_record', license='Apache-2.0', classifiers=[ - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], zip_safe=False, distclass=BinaryDistribution,