From 8a113d1e6c2932b2078cb76a801314c65b276868 Mon Sep 17 00:00:00 2001 From: Marcin Kowalczyk Date: Fri, 8 Nov 2024 12:53:46 -0800 Subject: [PATCH] Riegeli usage cleanup: Use `riegeli::SharedPtr` instead of `std::shared_ptr`. Do not convert it from `std::unique_ptr`: `riegeli::SharedPtr` always allocates the control block together with the object, hence this is not supported, and this is more efficient anyway. Utilize CTAD more. Let `MaskedReader` constructor take `riegeli::Reader&` instead of `std::unique_ptr`. There is no ownership transfer, the `riegeli::Reader` is accessed only during construction. Use `riegeli::SharedBuffer` instead of `std::shared_ptr` in `MaskedReader`. It has faster refcounting and a smaller overhead. It does not track the exact size but this is not needed here. Use defaulted move constructor and move assignment of `MaskedReader`. They do the right thing. Add `MaskedReader::Reset()` to reset the instance to a state equivalent to a newly constructed state in-place. Use `std::optional` instead of `absl::optional`. Use `std::optional` or storing the value directly instead of `std::unique_ptr` when movability is not needed. PiperOrigin-RevId: 694608145 --- cpp/BUILD | 8 +-- cpp/array_record_reader.cc | 34 ++++++------ cpp/array_record_writer.cc | 41 +++++++------- cpp/array_record_writer.h | 5 +- cpp/masked_reader.cc | 87 +++++++++++++----------------- cpp/masked_reader.h | 26 +++++---- cpp/masked_reader_test.cc | 2 +- cpp/parallel_for.h | 15 ++---- cpp/sequenced_chunk_writer.h | 12 ++--- cpp/sequenced_chunk_writer_test.cc | 49 ++++++++--------- python/array_record_module.cc | 1 + 11 files changed, 138 insertions(+), 142 deletions(-) diff --git a/cpp/BUILD b/cpp/BUILD index a18c427..67f1e26 100644 --- a/cpp/BUILD +++ b/cpp/BUILD @@ -71,7 +71,6 @@ cc_library( ":common", ":thread_pool", "@com_google_absl//absl/base:core_headers", - "@com_google_absl//absl/functional:function_ref", "@com_google_absl//absl/status", "@com_google_absl//absl/synchronization", ], @@ -131,6 +130,7 @@ cc_library( "@com_google_riegeli//riegeli/base:initializer", "@com_google_riegeli//riegeli/base:object", "@com_google_riegeli//riegeli/base:options_parser", + "@com_google_riegeli//riegeli/base:shared_ptr", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/bytes:chain_writer", "@com_google_riegeli//riegeli/bytes:writer", @@ -152,10 +152,8 @@ cc_library( deps = [ ":common", "@com_google_absl//absl/memory", - "@com_google_absl//absl/status", - "@com_google_absl//absl/time", - "@com_google_absl//absl/types:optional", "@com_google_riegeli//riegeli/base:object", + "@com_google_riegeli//riegeli/base:shared_buffer", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/base:types", "@com_google_riegeli//riegeli/bytes:reader", @@ -185,6 +183,7 @@ cc_library( "@com_google_riegeli//riegeli/base:initializer", "@com_google_riegeli//riegeli/base:object", "@com_google_riegeli//riegeli/base:options_parser", + "@com_google_riegeli//riegeli/base:shared_ptr", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/bytes:reader", "@com_google_riegeli//riegeli/chunk_encoding:chunk", @@ -207,6 +206,7 @@ cc_test( "@com_google_absl//absl/types:span", "@com_google_googletest//:gtest_main", "@com_google_riegeli//riegeli/base:initializer", + "@com_google_riegeli//riegeli/base:shared_ptr", "@com_google_riegeli//riegeli/bytes:chain_writer", "@com_google_riegeli//riegeli/bytes:cord_writer", "@com_google_riegeli//riegeli/bytes:string_reader", diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index b5731a0..a8c51b0 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -41,8 +41,10 @@ limitations under the License. #include "cpp/parallel_for.h" #include "cpp/thread_pool.h" #include "third_party/protobuf/message_lite.h" +#include "riegeli/base/maker.h" #include "riegeli/base/object.h" #include "riegeli/base/options_parser.h" +#include "riegeli/base/shared_ptr.h" #include "riegeli/base/status.h" #include "riegeli/bytes/reader.h" #include "riegeli/chunk_encoding/chunk.h" @@ -168,12 +170,12 @@ ChunkDecoder ReadChunk(Reader& reader, size_t pos, size_t len) { decoder.Fail(reader.status()); return decoder; } - MaskedReader masked_reader(reader.NewReader(pos), len); + MaskedReader masked_reader(*reader.NewReader(pos), len); if (!masked_reader.ok()) { decoder.Fail(masked_reader.status()); return decoder; } - auto chunk_reader = riegeli::DefaultChunkReader<>(&masked_reader); + riegeli::DefaultChunkReader chunk_reader(&masked_reader); Chunk chunk; if (!chunk_reader.ReadChunk(chunk)) { decoder.Fail(chunk_reader.status()); @@ -343,15 +345,15 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + masked_reader.Reset( + *reader->NewReader(state_->chunk_offsets[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); + riegeli::DefaultChunkReader chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -420,15 +422,15 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + masked_reader.Reset( + *reader->NewReader(state_->chunk_offsets[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); + riegeli::DefaultChunkReader chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -539,14 +541,14 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[buffer_chunks[0]]), + masked_reader.Reset( + *reader->NewReader(state_->chunk_offsets[buffer_chunks[0]]), buf_len); } for (auto chunk_idx : buffer_chunks) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); masked_reader.Seek(state_->chunk_offsets[chunk_idx]); - riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); + riegeli::DefaultChunkReader chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { return chunk_reader.status(); @@ -686,8 +688,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { // movable, OSS ThreadPool only takes std::function which requires all the // captures to be copyable. Therefore we must wrap the promise in a // shared_ptr to copy it over to the scheduled task. - auto decoder_promise = - std::make_shared>>(); + riegeli::SharedPtr decoder_promise( + riegeli::Maker>>()); state_->future_decoders.push( {buffer_to_add, decoder_promise->get_future()}); const auto reader = get_backing_reader(); @@ -719,8 +721,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { MaskedReader masked_reader(riegeli::kClosed); { AR_ENDO_SCOPE("MaskedReader"); - masked_reader = - MaskedReader(reader->NewReader(chunk_offsets.front()), buffer_len); + masked_reader.Reset(*reader->NewReader(chunk_offsets.front()), + buffer_len); } if (!masked_reader.ok()) { for (auto& decoder : decoders) { @@ -733,7 +735,7 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); for (auto local_chunk_idx : IndicesOf(chunk_offsets)) { masked_reader.Seek(chunk_offsets[local_chunk_idx]); - auto chunk_reader = riegeli::DefaultChunkReader<>(&masked_reader); + riegeli::DefaultChunkReader chunk_reader(&masked_reader); Chunk chunk; if (!chunk_reader.ReadChunk(chunk)) { decoders[local_chunk_idx].Fail(chunk_reader.status()); diff --git a/cpp/array_record_writer.cc b/cpp/array_record_writer.cc index e61a5e6..62c3c51 100644 --- a/cpp/array_record_writer.cc +++ b/cpp/array_record_writer.cc @@ -40,8 +40,10 @@ limitations under the License. #include "cpp/sequenced_chunk_writer.h" #include "cpp/thread_pool.h" #include "third_party/protobuf/message_lite.h" +#include "riegeli/base/maker.h" #include "riegeli/base/object.h" #include "riegeli/base/options_parser.h" +#include "riegeli/base/shared_ptr.h" #include "riegeli/base/status.h" #include "riegeli/bytes/chain_writer.h" #include "riegeli/chunk_encoding/chunk.h" @@ -66,7 +68,7 @@ constexpr uint32_t kZstdDefaultWindowLog = 20; // Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16` constexpr uint64_t kMagic = 0x71930e704fdae05eULL; -// zstd:3 gives a good trade-off for both the compression and decomopression +// zstd:3 gives a good trade-off for both the compression and decompression // speed. constexpr char kArrayRecordDefaultCompression[] = "zstd:3"; @@ -169,7 +171,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { return options_parser.status(); } // From our benchmarks we figured zstd:3 gives the best trade-off for both the - // compression and decomopression speed. + // compression and decompression speed. if (text == "default" || (!absl::StrContains(compressor_text, "uncompressed") && !absl::StrContains(compressor_text, "brotli") && @@ -388,23 +390,28 @@ void ArrayRecordWriterBase::Done() { submit_chunk_callback_->WriteFooterAndPostscript(writer.get()); } -std::unique_ptr ArrayRecordWriterBase::CreateEncoder() { - std::unique_ptr encoder; +riegeli::SharedPtr +ArrayRecordWriterBase::CreateEncoder() { + auto wrap_encoder = + [this](auto encoder) -> riegeli::SharedPtr { + if (pool_) { + return riegeli::SharedPtr( + riegeli::Maker(std::move(encoder))); + } else { + return riegeli::SharedPtr(std::move(encoder)); + } + }; if (options_.transpose()) { - encoder = std::make_unique( + return wrap_encoder(riegeli::Maker( options_.compressor_options(), riegeli::TransposeEncoder::TuningOptions().set_bucket_size( - options_.transpose_bucket_size())); + options_.transpose_bucket_size()))); } else { - encoder = std::make_unique( + return wrap_encoder(riegeli::Maker( options_.compressor_options(), riegeli::SimpleEncoder::TuningOptions().set_size_hint( - submit_chunk_callback_->get_last_decoded_data_size())); - } - if (pool_) { - return std::make_unique(std::move(encoder)); + submit_chunk_callback_->get_last_decoded_data_size()))); } - return encoder; } bool ArrayRecordWriterBase::WriteRecord(const google::protobuf::MessageLite& record) { @@ -432,20 +439,18 @@ bool ArrayRecordWriterBase::WriteRecordImpl(Record&& record) { if (chunk_encoder_->num_records() >= options_.group_size()) { auto writer = get_writer(); auto encoder = std::move(chunk_encoder_); - auto chunk_promise = - std::make_shared>>(); + riegeli::SharedPtr chunk_promise( + riegeli::Maker>>()); if (!writer->CommitFutureChunk(chunk_promise->get_future())) { Fail(writer->status()); return false; } chunk_encoder_ = CreateEncoder(); if (pool_ && options_.max_parallelism().value() > 1) { - std::shared_ptr shared_encoder = - std::move(encoder); submit_chunk_callback_->TrackConcurrentChunkWriters(); - pool_->Schedule([writer, shared_encoder, chunk_promise]() mutable { + pool_->Schedule([writer, encoder, chunk_promise]() mutable { AR_ENDO_TASK("Encode riegeli chunk"); - chunk_promise->set_value(EncodeChunk(shared_encoder.get())); + chunk_promise->set_value(EncodeChunk(encoder.get())); writer->SubmitFutureChunks(false); }); return true; diff --git a/cpp/array_record_writer.h b/cpp/array_record_writer.h index a47c39f..8b4e9f5 100644 --- a/cpp/array_record_writer.h +++ b/cpp/array_record_writer.h @@ -74,6 +74,7 @@ limitations under the License. #include "cpp/thread_pool.h" #include "riegeli/base/initializer.h" #include "riegeli/base/object.h" +#include "riegeli/base/shared_ptr.h" #include "riegeli/bytes/writer.h" #include "riegeli/chunk_encoding/chunk_encoder.h" #include "riegeli/chunk_encoding/compressor_options.h" @@ -326,7 +327,7 @@ class ArrayRecordWriterBase : public riegeli::Object { void Done() override; private: - std::unique_ptr CreateEncoder(); + riegeli::SharedPtr CreateEncoder(); template bool WriteRecordImpl(Record&& record); @@ -336,7 +337,7 @@ class ArrayRecordWriterBase : public riegeli::Object { Options options_; ARThreadPool* pool_; - std::unique_ptr chunk_encoder_; + riegeli::SharedPtr chunk_encoder_; std::unique_ptr submit_chunk_callback_; }; diff --git a/cpp/masked_reader.cc b/cpp/masked_reader.cc index 5315972..4fa9913 100644 --- a/cpp/masked_reader.cc +++ b/cpp/masked_reader.cc @@ -15,18 +15,18 @@ limitations under the License. #include "cpp/masked_reader.h" +#include #include -#include +#include #include #include "absl/memory/memory.h" -#include "absl/status/status.h" -#include "absl/time/clock.h" -#include "absl/time/time.h" #include "cpp/common.h" #include "riegeli/base/object.h" +#include "riegeli/base/shared_buffer.h" #include "riegeli/base/status.h" #include "riegeli/base/types.h" +#include "riegeli/bytes/reader.h" namespace array_record { @@ -34,53 +34,44 @@ using riegeli::Annotate; using riegeli::Position; using riegeli::Reader; -MaskedReader::MaskedReader(std::unique_ptr src_reader, - size_t length) - : buffer_(std::make_shared()) { - auto pos = src_reader->pos(); - buffer_->resize(length); - if (!src_reader->Read(length, buffer_->data())) { - Fail(Annotate(src_reader->status(), - "Could not read from the underlying reader")); - return; - } - /* - * limit_pos - * |---------------------------| - * buffer_start buffer_limit - * |................|----------| - */ - set_buffer(buffer_->data(), buffer_->size()); - set_limit_pos(pos + buffer_->size()); +MaskedReader::MaskedReader(Reader& src_reader, size_t length) { + Initialize(src_reader, length); } -MaskedReader::MaskedReader(std::shared_ptr buffer, +MaskedReader::MaskedReader(riegeli::SharedBuffer buffer, size_t length, Position limit_pos) - : buffer_(buffer) { - /* - * limit_pos - * |---------------------------| - * buffer_start buffer_limit - * |................|----------| - */ - set_buffer(buffer_->data(), buffer_->size()); + : buffer_(std::move(buffer)) { + // limit_pos + // |---------------------------| + // buffer_start buffer_limit + // |................|----------| + set_buffer(buffer_.data(), length); set_limit_pos(limit_pos); } -MaskedReader::MaskedReader(MaskedReader &&other) noexcept - : Reader(std::move(other)) { - buffer_ = other.buffer_; // NOLINT(bugprone-use-after-move) - other.Reset(riegeli::kClosed); // NOLINT(bugprone-use-after-move) +void MaskedReader::Reset(riegeli::Closed) { + Reader::Reset(riegeli::kClosed); + buffer_.Reset(); } -MaskedReader &MaskedReader::operator=(MaskedReader &&other) noexcept { - // Move other - Reader::operator=(static_cast(other)); - // Copy the shared buffer. - buffer_ = other.buffer_; - // Close `other` - other.Reset(riegeli::kClosed); - return *this; +void MaskedReader::Reset(Reader& src_reader, size_t length) { + Reader::Reset(); + Initialize(src_reader, length); +} + +void MaskedReader::Initialize(Reader& src_reader, size_t length) { + buffer_.Reset(length); + if (!src_reader.Read(length, buffer_.mutable_data())) { + Fail(Annotate(src_reader.status(), + "Could not read from the underlying reader")); + return; + } + // limit_pos + // |---------------------------| + // buffer_start buffer_limit + // |................|----------| + set_buffer(buffer_.data(), length); + set_limit_pos(src_reader.pos()); } bool MaskedReader::PullSlow(size_t min_length, size_t recommended_length) { @@ -88,21 +79,19 @@ bool MaskedReader::PullSlow(size_t min_length, size_t recommended_length) { return false; } -bool MaskedReader::SeekSlow(riegeli::Position new_pos) { +bool MaskedReader::SeekSlow(Position new_pos) { Fail(FailedPreconditionError("Should not seek beyond buffer")); return false; } -absl::optional MaskedReader::SizeImpl() { - return limit_pos(); -} +std::optional MaskedReader::SizeImpl() { return limit_pos(); } std::unique_ptr MaskedReader::NewReaderImpl(Position initial_pos) { if (!ok()) { return nullptr; } - std::unique_ptr reader = - absl::WrapUnique(new MaskedReader(buffer_, limit_pos())); + std::unique_ptr reader = absl::WrapUnique( + new MaskedReader(buffer_, start_to_limit(), limit_pos())); if (!reader->Seek(initial_pos)) { return nullptr; } diff --git a/cpp/masked_reader.h b/cpp/masked_reader.h index 2a1de89..28d8663 100644 --- a/cpp/masked_reader.h +++ b/cpp/masked_reader.h @@ -16,11 +16,12 @@ limitations under the License. #ifndef ARRAY_RECORD_CPP_MASKED_READER_H_ #define ARRAY_RECORD_CPP_MASKED_READER_H_ +#include #include -#include +#include -#include "absl/types/optional.h" #include "riegeli/base/object.h" +#include "riegeli/base/shared_buffer.h" #include "riegeli/base/types.h" #include "riegeli/bytes/reader.h" @@ -47,10 +48,13 @@ class MaskedReader : public riegeli::Reader { public: explicit MaskedReader(riegeli::Closed) : riegeli::Reader(riegeli::kClosed) {} - MaskedReader(std::unique_ptr src_reader, size_t length); + MaskedReader(riegeli::Reader& src_reader, size_t length); - MaskedReader(MaskedReader &&other) noexcept; - MaskedReader &operator=(MaskedReader &&other) noexcept; + MaskedReader(MaskedReader&& other) = default; + MaskedReader& operator=(MaskedReader&& other) = default; + + void Reset(riegeli::Closed); + void Reset(riegeli::Reader& src_reader, size_t length); bool SupportsRandomAccess() override { return true; } bool SupportsNewReader() override { return true; } @@ -59,16 +63,18 @@ class MaskedReader : public riegeli::Reader { bool PullSlow(size_t min_length, size_t recommended_length) override; bool SeekSlow(riegeli::Position new_pos) override; - absl::optional SizeImpl() override; - std::unique_ptr - NewReaderImpl(riegeli::Position initial_pos) override; + std::optional SizeImpl() override; + std::unique_ptr NewReaderImpl( + riegeli::Position initial_pos) override; private: // Private constructor that copies itself. - MaskedReader(std::shared_ptr buffer, + MaskedReader(riegeli::SharedBuffer buffer, size_t length, riegeli::Position limit_pos); - std::shared_ptr buffer_; + void Initialize(riegeli::Reader& src_reader, size_t length); + + riegeli::SharedBuffer buffer_; }; } // namespace array_record diff --git a/cpp/masked_reader_test.cc b/cpp/masked_reader_test.cc index adf1a4e..2d8ac0d 100644 --- a/cpp/masked_reader_test.cc +++ b/cpp/masked_reader_test.cc @@ -29,7 +29,7 @@ TEST(MaskedReaderTest, SanityTest) { auto data = std::string("0123456789abcdef"); auto base_reader = StringReader(data); // 56789abc - auto masked_reader1 = MaskedReader(base_reader.NewReader(5), 8); + MaskedReader masked_reader1(*base_reader.NewReader(5), 8); // Matches where we offset the reader. EXPECT_EQ(masked_reader1.pos(), 5); // Matches offset + mask length diff --git a/cpp/parallel_for.h b/cpp/parallel_for.h index 4a13967..5ad3484 100644 --- a/cpp/parallel_for.h +++ b/cpp/parallel_for.h @@ -20,19 +20,14 @@ limitations under the License. #include #include #include -#include #include -#include -#include -#include "absl/base/thread_annotations.h" -#include "absl/functional/function_ref.h" +#include "absl/base/optimization.h" #include "absl/status/status.h" #include "absl/synchronization/mutex.h" #include "cpp/common.h" #include "cpp/thread_pool.h" - namespace array_record { // kDynamicBatchSize - when a batch size isn't specified, ParallelFor defaults @@ -162,7 +157,7 @@ namespace parallel_for_internal { // ParallelForClosure - a single heap-allocated object that holds the loop's // state. The object will delete itself when the final task completes. template - class ParallelForClosure { +class ParallelForClosure { public: static constexpr bool kIsDynamicBatch = (kItersPerBatch == kDynamicBatchSize); ParallelForClosure(SeqT seq, Function func) @@ -175,8 +170,8 @@ template // Don't push more tasks to the pool than we have work for. Also, if // parallelism is limited by desired_threads not thread pool size, subtract // 1 from the number of threads to push to account for the main thread. - size_t n_threads = std::min(desired_threads - 1, - pool->NumThreads()); + size_t n_threads = + std::min(desired_threads - 1, pool->NumThreads()); // Handle dynamic batch size. if (kIsDynamicBatch) { @@ -287,7 +282,7 @@ inline void ParallelFor(SeqT seq, ARThreadPool* pool, Function func, opts.max_parallelism, DivRoundUp(*seq.end() - *seq.begin(), SeqT::Stride() * kMinItersPerBatch)); - if (!pool || desired_threads <= 1) { + ) { for (size_t idx : seq) { func(idx); } diff --git a/cpp/sequenced_chunk_writer.h b/cpp/sequenced_chunk_writer.h index b255ca2..18660ed 100644 --- a/cpp/sequenced_chunk_writer.h +++ b/cpp/sequenced_chunk_writer.h @@ -122,7 +122,7 @@ class SequencedChunkWriterBase : public riegeli::Object { // // Example 2: concurrent access // - // auto writer = std::make_shared(...) + // riegeli::SharedPtr writer(riegeli::MakerSchedule([writer, // chunk_promise = std::move(chunk_promise)]() mutable { @@ -201,8 +201,8 @@ class SequencedChunkWriterBase : public riegeli::Object { // // // Step 1: open the writer with file backend. // File* file = file::OpenOrDie(...); -// auto writer = std::make_shared>( -// riegeli::Maker(filename_or_file)); +// riegeli::SharedPtr writer(riegeli::Maker( +// riegeli::Maker(filename_or_file))); // // // Step 2: create a chunk encoding task. // std::packaged_task()> encoding_task( @@ -217,8 +217,8 @@ class SequencedChunkWriterBase : public riegeli::Object { // // Step 4: Computes the encoding task in a thread pool. // pool->Schedule([=,encoding_task=std::move(encoding_task)]() mutable { // encoding_task(); // std::promise fulfilled. -// // shared_ptr pevents the writer to go out of scope, so it is safe to -// // invoke the method here. +// // riegeli::SharedPtr pevents the writer to go out of scope, so it is +// // safe to invoke the method here. // writer->SubmitFutureChunks(false); // }); // @@ -257,7 +257,7 @@ class SequencedChunkWriterBase : public riegeli::Object { // SequencedChunkWriter writes_to_file( // riegeli::Maker(filename_or_file)); // -// User may also use std::make_shared<...> or std::make_unique to construct the +// User may also use riegeli::SharedPtr or std::make_unique to construct the // instance, as shown in the previous example. template class SequencedChunkWriter : public SequencedChunkWriterBase { diff --git a/cpp/sequenced_chunk_writer_test.cc b/cpp/sequenced_chunk_writer_test.cc index c9ac904..e6b45ab 100644 --- a/cpp/sequenced_chunk_writer_test.cc +++ b/cpp/sequenced_chunk_writer_test.cc @@ -17,7 +17,6 @@ limitations under the License. #include #include // NOLINT(build/c++11) -#include #include #include #include @@ -31,6 +30,7 @@ limitations under the License. #include "cpp/common.h" #include "cpp/thread_pool.h" #include "riegeli/base/maker.h" +#include "riegeli/base/shared_ptr.h" #include "riegeli/bytes/chain_writer.h" #include "riegeli/bytes/cord_writer.h" #include "riegeli/bytes/string_reader.h" @@ -49,27 +49,25 @@ TEST(SequencedChunkWriterTest, RvalCtorTest) { // riegeli writer. { std::string dest; - auto str_writer = riegeli::StringWriter(&dest); - auto to_string = SequencedChunkWriter(std::move(str_writer)); + riegeli::StringWriter str_writer(&dest); + SequencedChunkWriter to_string(std::move(str_writer)); } { absl::Cord cord; - auto cord_writer = riegeli::CordWriter(&cord); - auto to_cord = SequencedChunkWriter(std::move(cord_writer)); + riegeli::CordWriter cord_writer(&cord); + SequencedChunkWriter to_cord(std::move(cord_writer)); } { std::string dest; - auto str_writer = riegeli::StringWriter(&dest); + riegeli::StringWriter str_writer(&dest); auto to_string = - std::make_unique>>( - std::move(str_writer)); + riegeli::Maker(std::move(str_writer)).UniquePtr(); } { absl::Cord cord; - auto cord_writer = riegeli::CordWriter(&cord); - auto to_cord = - std::make_unique>>( - std::move(cord_writer)); + riegeli::CordWriter cord_writer(&cord); + auto to_cord = riegeli::Maker(std::move(cord_writer)) + .UniquePtr(); } } @@ -78,26 +76,25 @@ TEST(SequencedChunkWriterTest, DestArgsCtorTest) { // templated riegeli writer. { std::string dest; - auto to_string = - SequencedChunkWriter(riegeli::Maker(&dest)); + SequencedChunkWriter to_string( + riegeli::Maker(&dest)); } { absl::Cord cord; - auto to_cord = - SequencedChunkWriter(riegeli::Maker(&cord)); + SequencedChunkWriter to_cord(riegeli::Maker(&cord)); } { std::string dest; - auto to_string = - std::make_unique>>( - riegeli::Maker(&dest)); + auto to_string = riegeli::Maker( + riegeli::Maker(&dest)) + .UniquePtr(); } { absl::Cord cord; - auto to_cord = - std::make_unique>>( - riegeli::Maker(&cord)); + auto to_cord = riegeli::Maker( + riegeli::Maker(&cord)) + .UniquePtr(); } } @@ -120,8 +117,8 @@ TEST(SequencedChunkWriterTest, SanityTestCodeSnippet) { std::string encoded; auto callback = TestCommitChunkCallback(); - auto writer = std::make_shared>>( - riegeli::Maker(&encoded)); + riegeli::SharedPtr writer(riegeli::Maker( + riegeli::Maker(&encoded))); writer->set_submit_chunk_callback(&callback); ASSERT_TRUE(writer->ok()) << writer->status(); @@ -187,8 +184,8 @@ TEST(SequencedChunkWriterTest, SanityTestBadChunk) { std::string encoded; auto callback = TestCommitChunkCallback(); - auto writer = std::make_shared>>( - riegeli::Maker(&encoded)); + riegeli::SharedPtr writer(riegeli::Maker( + riegeli::Maker(&encoded))); writer->set_submit_chunk_callback(&callback); ASSERT_TRUE(writer->ok()) << writer->status(); std::packaged_task()> encoding_task( diff --git a/python/array_record_module.cc b/python/array_record_module.cc index 35e725f..09f2fa5 100644 --- a/python/array_record_module.cc +++ b/python/array_record_module.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ +#include #include #include