diff --git a/centipede/BUILD b/centipede/BUILD index cc80a394..415a5329 100644 --- a/centipede/BUILD +++ b/centipede/BUILD @@ -448,12 +448,19 @@ cc_library( ":util", "@com_google_absl//absl/log", "@com_google_absl//absl/log:check", + "@com_google_absl//absl/memory", "@com_google_absl//absl/status", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/strings:string_view", + "@com_google_absl//absl/time", + "@com_google_riegeli//riegeli/chunk_encoding:chunk", + "@com_google_riegeli//riegeli/records:chunk_writer", ] + select({ ":disable_riegeli": [], "//conditions:default": [ "@com_google_riegeli//riegeli/base:object", + "@com_google_riegeli//riegeli/base:types", "@com_google_riegeli//riegeli/bytes:reader", "@com_google_riegeli//riegeli/bytes:writer", "@com_google_riegeli//riegeli/records:record_reader", diff --git a/centipede/blob_file.cc b/centipede/blob_file.cc index 14a43f3b..20d9a536 100644 --- a/centipede/blob_file.cc +++ b/centipede/blob_file.cc @@ -14,19 +14,29 @@ #include "./centipede/blob_file.h" +#include #include +#include +#include // NOLINT: C++17 #include +#include #include +#include #include #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/status/status.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" #include "./centipede/defs.h" #include "./centipede/logging.h" #include "./centipede/remote_file.h" #include "./centipede/util.h" +#include "riegeli/chunk_encoding/chunk.h" +#include "riegeli/records/chunk_writer.h" #ifndef CENTIPEDE_DISABLE_RIEGELI #include "riegeli/base/object.h" #include "riegeli/bytes/reader.h" @@ -36,6 +46,9 @@ #endif // CENTIPEDE_DISABLE_RIEGELI namespace centipede { +namespace { + +namespace fs = std::filesystem; // TODO(ussuri): Return more informative statuses, at least with the file path // included. That will require adjustments in the test: use @@ -125,7 +138,6 @@ class SimpleBlobFileWriter : public BlobFileWriter { ByteArray bytes(blob.begin(), blob.end()); ByteArray packed = PackBytesForAppendFile(bytes); RemoteFileAppend(file_, packed); - return absl::OkStatus(); } @@ -242,6 +254,7 @@ class DefaultBlobFileReader : public BlobFileReader { #ifndef CENTIPEDE_DISABLE_RIEGELI // Implementation of `BlobFileWriter` using Riegeli // (https://github.com/google/riegeli). + class RiegeliWriter : public BlobFileWriter { public: ~RiegeliWriter() override { @@ -252,25 +265,24 @@ class RiegeliWriter : public BlobFileWriter { absl::Status Open(std::string_view path, std::string_view mode) override { CHECK(mode == "w" || mode == "a") << VV(mode); if (absl::Status s = Close(); !s.ok()) return s; - writer_.Reset(CreateRiegeliFileWriter(path, mode == "a")); + const auto kWriterOpts = // + riegeli::RecordWriterBase::Options{} + .set_chunk_size(kMaxBufferedBytes) + .set_parallelism(kRiegeliParallelism); + auto chunk_writer = std::make_unique( + CreateRiegeliFileWriter(path, mode == "a")); + if (chunk_writer == nullptr) return absl::UnknownError("can't open file"); + writer_.Reset(std::move(chunk_writer), kWriterOpts); if (!writer_.ok()) return writer_.status(); + ResetStats(path); return absl::OkStatus(); } absl::Status Write(ByteSpan blob) override { - if (!writer_.WriteRecord(AsStringView(blob))) return writer_.status(); - // Riegeli's automatic flushing happens in chunks, not on record boundaries. - // Flushing explicitly after every write makes it visible to readers earlier - // especially if writes are infrequent and/or the size of records is small - // relative to chunk size; however, compression performance suffers with - // more frequent flushing. - // Writes of large chunks are not atomic. Therefore, frequent flushing can - // still leave the file in an invalid state from a partial write which is - // accounted for in `DefaultBlobFileReader::Close()` - however, the - // likelihood of that is reduced since writes may be smaller. - // TODO(b/313706444): Profile tradeoff of read freshness vs compression and - // tune parameters accordingly. - if (!writer_.Flush()) return writer_.status(); + if (!writer_.WriteRecord(AsStringView(blob))) [[unlikely]] + return writer_.status(); + UpdateStats(blob); + VLOG_EVERY_N_SEC(10, 30) << "Current stats: " << StatsString(); return absl::OkStatus(); } @@ -282,15 +294,78 @@ class RiegeliWriter : public BlobFileWriter { return absl::OkStatus(); } if (!writer_.Close()) return writer_.status(); + VLOG(1) << "Final stats: " << StatsString(); return absl::OkStatus(); } private: - riegeli::RecordWriter> writer_{ + // A custom `riegeli::ChunkWriter` that ensures that the byte + // `riegeli::Writer`, underlying the whole `RiegeliWriter` class, is flushed + // after writing each chunk. This all but guarantees that the file on disk + // remains valid at all times, that is, contains only fully written blobs and + // is therefore readable by external readers (e.g. using + // `DefaultBlobFileReader`). + class FlushingChunkWriter + : public riegeli::DefaultChunkWriter> { + using Base = DefaultChunkWriter; + + public: + using Base::Base; + + bool WriteChunk(const riegeli::Chunk &chunk) override { + if (!Base::WriteChunk(chunk)) return false; + if (!dest()->Flush()) return FailWithoutAnnotation(dest()->status()); + return true; + } + }; + + // TODO(ussuri): Expose as `Options` once Riegeli is the sole blob writer. + static constexpr uint64_t kMaxBufferedBytes = 100L * 1024 * 1024; + static constexpr int kRiegeliParallelism = 50; + + void ResetStats(std::string_view path) { + path_ = path; + opened_ = absl::Now(); + blobs_ = 0; + bytes_ = 0; + } + + void UpdateStats(ByteSpan blob) { + blobs_ += 1; + bytes_ += blob.size(); + } + + // Returns a debug string with the effective writing rate for the current file + // path. The effective rate is measured as a ratio of the total bytes passed + // to `Write()` and the elapsed time from the file opening till now. + std::string StatsString() const { + const auto basename = fs::path{path_}.filename(); + const auto secs = absl::ToDoubleSeconds(absl::Now() - opened_); + const auto thruput = static_cast(std::ceil(bytes_ / secs)); + const auto file_size = writer_.EstimatedSize(); + const auto compression = file_size > 0 ? (1.0 * bytes_ / file_size) : 0; + return absl::StrFormat( + "basename: %s " + "blobs/bytes: %llu/%llu time: %.1f sec thruput: %llu B/sec " + "compression: %.2f" + "\nfull path: %s", + basename, blobs_, bytes_, secs, thruput, compression, path_); + } + + std::unique_ptr byte_writer_; + riegeli::RecordWriter> writer_{ riegeli::kClosed}; + + // Telemetry. + std::string path_; + absl::Time opened_ = absl::InfiniteFuture(); + uint64_t blobs_ = 0; + uint64_t bytes_ = 0; }; #endif // CENTIPEDE_DISABLE_RIEGELI +} // namespace + std::unique_ptr DefaultBlobFileReaderFactory() { return std::make_unique(); }