Skip to content

Commit

Permalink
#Centipede RiegeliWriter: Increase effective throughput by 10-15x
Browse files Browse the repository at this point in the history
`RiegeliWriter`:

* Switched from flushing after each written blob to maximal leveraging of Riegeli's internal buffering and flushing after N blobs / M bytes / X time (whichever comes first), while maintaining the critical property that only complete records are ever committed to the output file.

The buffered flushing parameters are currently fixed. It might be nice to make them configurable, but right now that would require exposing Riegeli-specific knobs in the generic `BlobFileWriter` API. Once Riegeli becomes the only blob writer in Centipede (maybe soon), we'll be able to parameterize.

Measured effect:

* A real-life distillation run concurrently reading 50 large input shards and mutually exclusively writing them to a single output shard:
  - wall clock time: 5500 sec -> 360 sec
  - effective CPU core utilization: 0.3 -> 2.8
  - no measurable change in RSS usage profile
* Features file in the run above (large blobs):
  - write throughput: ~340 KB/sec -> ~5300 KB/sec
  - file compression: ~6x -> ~7x
* Corpus file in the run above (small blobs):
  - write throughput: ~11 KB/sec -> ~9000 KB/sec
  - file compression: ~2x -> ~12x

PiperOrigin-RevId: 607818600
  • Loading branch information
ussuri authored and copybara-github committed Feb 16, 2024
1 parent d1f5dbb commit cc15acf
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 14 deletions.
4 changes: 4 additions & 0 deletions centipede/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,15 @@ cc_library(
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/strings:string_view",
"@com_google_absl//absl/time",
] + select({
":disable_riegeli": [],
"//conditions:default": [
"@com_google_riegeli//riegeli/base:object",
"@com_google_riegeli//riegeli/base:types",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:writer",
"@com_google_riegeli//riegeli/records:record_reader",
Expand Down
142 changes: 128 additions & 14 deletions centipede/blob_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@
#include "./centipede/blob_file.h"

#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <vector>

#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "./centipede/defs.h"
#include "./centipede/logging.h"
#include "./centipede/remote_file.h"
#include "./centipede/util.h"
#ifndef CENTIPEDE_DISABLE_RIEGELI
#include "riegeli/base/object.h"
#include "riegeli/base/types.h"
#include "riegeli/bytes/reader.h"
#include "riegeli/bytes/writer.h"
#include "riegeli/records/record_reader.h"
#include "riegeli/records/record_writer.h"
#endif // CENTIPEDE_DISABLE_RIEGELI

namespace centipede {
namespace {

// TODO(ussuri): Return more informative statuses, at least with the file path
// included. That will require adjustments in the test: use
Expand Down Expand Up @@ -125,7 +132,6 @@ class SimpleBlobFileWriter : public BlobFileWriter {
ByteArray bytes(blob.begin(), blob.end());
ByteArray packed = PackBytesForAppendFile(bytes);
RemoteFileAppend(file_, packed);

return absl::OkStatus();
}

Expand Down Expand Up @@ -252,25 +258,28 @@ class RiegeliWriter : public BlobFileWriter {
absl::Status Open(std::string_view path, std::string_view mode) override {
CHECK(mode == "w" || mode == "a") << VV(mode);
if (absl::Status s = Close(); !s.ok()) return s;
writer_.Reset(CreateRiegeliFileWriter(path, mode == "a"));
const auto kWriterOpts =
riegeli::RecordWriterBase::Options{}.set_chunk_size(kMaxBufferedBytes);
writer_.Reset(CreateRiegeliFileWriter(path, mode == "a"), kWriterOpts);
if (!writer_.ok()) return writer_.status();
path_ = path;
opened_at_ = absl::Now();
flushed_at_ = absl::Now();
written_blobs_ = 0;
written_bytes_ = 0;
buffered_blobs_ = 0;
buffered_bytes_ = 0;
return absl::OkStatus();
}

absl::Status Write(ByteSpan blob) override {
const auto now = absl::Now();
if (!PreWriteFlush(blob.size())) return writer_.status();
if (!writer_.WriteRecord(AsStringView(blob))) return writer_.status();
// Riegeli's automatic flushing happens in chunks, not on record boundaries.
// Flushing explicitly after every write makes it visible to readers earlier
// especially if writes are infrequent and/or the size of records is small
// relative to chunk size; however, compression performance suffers with
// more frequent flushing.
// Writes of large chunks are not atomic. Therefore, frequent flushing can
// still leave the file in an invalid state from a partial write which is
// accounted for in `DefaultBlobFileReader::Close()` - however, the
// likelihood of that is reduced since writes may be smaller.
// TODO(b/313706444): Profile tradeoff of read freshness vs compression and
// tune parameters accordingly.
if (!writer_.Flush()) return writer_.status();
if (!PostWriteFlush(blob.size())) return writer_.status();
write_duration_ += absl::Now() - now;
if (written_blobs_ + buffered_blobs_ % 10000 == 0)
VLOG(10) << "Current stats: " << StatsString();
return absl::OkStatus();
}

Expand All @@ -282,15 +291,120 @@ class RiegeliWriter : public BlobFileWriter {
return absl::OkStatus();
}
if (!writer_.Close()) return writer_.status();
flushed_at_ = absl::Now();
written_blobs_ += buffered_blobs_;
written_bytes_ += buffered_bytes_;
buffered_blobs_ = 0;
buffered_bytes_ = 0;
VLOG(1) << "Final stats: " << StatsString();
return absl::OkStatus();
}

private:
static constexpr uint64_t kMB = 1024UL * 1024UL;
// Buffering/flushing control settings. The defaults were chosen based on
// experimental runs and intuition with the idea to balance good buffering
// performance and a steady stream of blobs being committed to the file, so
// external readers see updates frequently enough.
// TODO(ussuri): Once Riegeli is the sole blob writer, maybe expose these
// as Centipede flags and plumb them through `DefaultBlobFileWriterFactory()`.
static constexpr uint64_t kMaxBufferedBlobs = 10000;
// Riegeli's default is 1 MB.
static constexpr uint64_t kMaxBufferedBytes = 100 * kMB;
static constexpr absl::Duration kMaxFlushInterval = absl::Minutes(1);
// For each record, Riegeli also writes its offset in the stream to the file.
static constexpr size_t kRiegeliPerRecordMetadataSize = sizeof(uint64_t);

// Riegeli's automatic flushing occurs when it accumulates over
// `Options::chunk_size()` of data, not on record boundaries. Our outputs
// are continuously consumed by external readers, so we can't tolerate
// partially written records at the end of a file. Therefore, we explicitly
// flush when we're just about to cross the chunk size boundary, or if the
// client writes infrequently, or if the size of records is small relative
// to the chunk size. The latter two cases are to make the data visible to
// readers earlier; however, note that the compression performance may
// suffer.
bool PreWriteFlush(size_t blob_size) {
const auto record_size = blob_size + kRiegeliPerRecordMetadataSize;
const std::string_view flush_reason =
(buffered_blobs_ > kMaxBufferedBlobs) ? "blobs"
: (buffered_bytes_ + record_size > kMaxBufferedBytes) ? "bytes"
: (absl::Now() - flushed_at_ > kMaxFlushInterval) ? "time"
: "";
if (!flush_reason.empty()) {
VLOG(20) << "Flushing b/c " << flush_reason << ": " << StatsString();
if (!writer_.Flush(riegeli::FlushType::kFromMachine)) return false;
flushed_at_ = absl::Now();
written_blobs_ += buffered_blobs_;
written_bytes_ += buffered_bytes_;
buffered_blobs_ = 0;
buffered_bytes_ = 0;
}
return true;
}

// In the rare case where the current blob itself exceeds the chunk size,
// `Write()` will auto-flush a portion of it to the file, but the remainder
// will remain in the buffer, so we need to force-flush it to maintain file
// completeness.
bool PostWriteFlush(size_t blob_size) {
const auto record_size = blob_size + kRiegeliPerRecordMetadataSize;
if (record_size >= kMaxBufferedBytes) {
VLOG(20) << "Post-write flushing b/c blob size: " << StatsString();
if (!writer_.Flush(riegeli::FlushType::kFromMachine)) return false;
flushed_at_ = absl::Now();
written_blobs_ += 1;
written_bytes_ += record_size;
buffered_blobs_ = 0;
buffered_bytes_ = 0;
} else {
buffered_blobs_ += 1;
buffered_bytes_ += record_size;
}
return true;
}

// Returns a debug string with the effective writing rate for the current file
// path. The effective rate is measured as a ratio of the total bytes passed
// to `Write()` and the elapsed time from the file opening till now.
std::string StatsString() const {
const auto opened_secs = absl::ToDoubleSeconds(absl::Now() - opened_at_);
const auto write_secs = absl::ToDoubleSeconds(write_duration_);
const auto total_bytes = written_bytes_ + buffered_bytes_;
const auto throughput =
write_secs > 0.0 ? (1.0 * total_bytes / write_secs) : 0;
const auto file_size = writer_.EstimatedSize();
const auto compression =
file_size > 0 ? (1.0 * written_bytes_ / file_size) : 0;
std::string stats = absl::StrFormat(
"written/buffered blobs: %llu/%llu, written/buffered bytes: %llu/%llu, "
"opened: %f sec, writing: %f sec, throughput: %.0f B/sec, "
"file size: %llu, compression: %.1f, path: %s",
written_blobs_, buffered_blobs_, written_bytes_, buffered_bytes_,
opened_secs, write_secs, throughput, file_size, compression, path_);
return stats;
}

// The underlying Riegeli writer.
riegeli::RecordWriter<std::unique_ptr<riegeli::Writer>> writer_{
riegeli::kClosed};

// Buffering/flushing control.
absl::Time flushed_at_ = absl::InfiniteFuture();
uint64_t buffered_blobs_ = 0;
uint64_t buffered_bytes_ = 0;

// Telemetry.
std::string path_;
absl::Time opened_at_ = absl::InfiniteFuture();
absl::Duration write_duration_ = absl::ZeroDuration();
uint64_t written_blobs_ = 0;
uint64_t written_bytes_ = 0;
};
#endif // CENTIPEDE_DISABLE_RIEGELI

} // namespace

std::unique_ptr<BlobFileReader> DefaultBlobFileReaderFactory() {
return std::make_unique<DefaultBlobFileReader>();
}
Expand Down

0 comments on commit cc15acf

Please sign in to comment.