Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal change #1001

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions centipede/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,19 @@ cc_library(
":util",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/strings:string_view",
"@com_google_absl//absl/time",
"@com_google_riegeli//riegeli/chunk_encoding:chunk",
"@com_google_riegeli//riegeli/records:chunk_writer",
] + select({
":disable_riegeli": [],
"//conditions:default": [
"@com_google_riegeli//riegeli/base:object",
"@com_google_riegeli//riegeli/base:types",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:writer",
"@com_google_riegeli//riegeli/records:record_reader",
Expand Down
107 changes: 91 additions & 16 deletions centipede/blob_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,29 @@

#include "./centipede/blob_file.h"

#include <cmath>
#include <cstddef>
#include <cstdint>
#include <filesystem> // NOLINT: C++17
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "./centipede/defs.h"
#include "./centipede/logging.h"
#include "./centipede/remote_file.h"
#include "./centipede/util.h"
#include "riegeli/chunk_encoding/chunk.h"
#include "riegeli/records/chunk_writer.h"
#ifndef CENTIPEDE_DISABLE_RIEGELI
#include "riegeli/base/object.h"
#include "riegeli/bytes/reader.h"
Expand All @@ -36,6 +46,9 @@
#endif // CENTIPEDE_DISABLE_RIEGELI

namespace centipede {
namespace {

namespace fs = std::filesystem;

// TODO(ussuri): Return more informative statuses, at least with the file path
// included. That will require adjustments in the test: use
Expand Down Expand Up @@ -125,7 +138,6 @@ class SimpleBlobFileWriter : public BlobFileWriter {
ByteArray bytes(blob.begin(), blob.end());
ByteArray packed = PackBytesForAppendFile(bytes);
RemoteFileAppend(file_, packed);

return absl::OkStatus();
}

Expand Down Expand Up @@ -242,6 +254,7 @@ class DefaultBlobFileReader : public BlobFileReader {
#ifndef CENTIPEDE_DISABLE_RIEGELI
// Implementation of `BlobFileWriter` using Riegeli
// (https://github.com/google/riegeli).

class RiegeliWriter : public BlobFileWriter {
public:
~RiegeliWriter() override {
Expand All @@ -252,25 +265,24 @@ class RiegeliWriter : public BlobFileWriter {
absl::Status Open(std::string_view path, std::string_view mode) override {
CHECK(mode == "w" || mode == "a") << VV(mode);
if (absl::Status s = Close(); !s.ok()) return s;
writer_.Reset(CreateRiegeliFileWriter(path, mode == "a"));
const auto kWriterOpts = //
riegeli::RecordWriterBase::Options{}
.set_chunk_size(kMaxBufferedBytes)
.set_parallelism(kRiegeliParallelism);
auto chunk_writer = std::make_unique<FlushingChunkWriter>(
CreateRiegeliFileWriter(path, mode == "a"));
if (chunk_writer == nullptr) return absl::UnknownError("can't open file");
writer_.Reset(std::move(chunk_writer), kWriterOpts);
if (!writer_.ok()) return writer_.status();
ResetStats(path);
return absl::OkStatus();
}

absl::Status Write(ByteSpan blob) override {
if (!writer_.WriteRecord(AsStringView(blob))) return writer_.status();
// Riegeli's automatic flushing happens in chunks, not on record boundaries.
// Flushing explicitly after every write makes it visible to readers earlier
// especially if writes are infrequent and/or the size of records is small
// relative to chunk size; however, compression performance suffers with
// more frequent flushing.
// Writes of large chunks are not atomic. Therefore, frequent flushing can
// still leave the file in an invalid state from a partial write which is
// accounted for in `DefaultBlobFileReader::Close()` - however, the
// likelihood of that is reduced since writes may be smaller.
// TODO(b/313706444): Profile tradeoff of read freshness vs compression and
// tune parameters accordingly.
if (!writer_.Flush()) return writer_.status();
if (!writer_.WriteRecord(AsStringView(blob))) [[unlikely]]
return writer_.status();
UpdateStats(blob);
VLOG_EVERY_N_SEC(10, 30) << "Current stats: " << StatsString();
return absl::OkStatus();
}

Expand All @@ -282,15 +294,78 @@ class RiegeliWriter : public BlobFileWriter {
return absl::OkStatus();
}
if (!writer_.Close()) return writer_.status();
VLOG(1) << "Final stats: " << StatsString();
return absl::OkStatus();
}

private:
riegeli::RecordWriter<std::unique_ptr<riegeli::Writer>> writer_{
// A custom `riegeli::ChunkWriter` that ensures that the byte
// `riegeli::Writer`, underlying the whole `RiegeliWriter` class, is flushed
// after writing each chunk. This all but guarantees that the file on disk
// remains valid at all times, that is, contains only fully written blobs and
// is therefore readable by external readers (e.g. using
// `DefaultBlobFileReader`).
class FlushingChunkWriter
: public riegeli::DefaultChunkWriter<std::unique_ptr<riegeli::Writer>> {
using Base = DefaultChunkWriter;

public:
using Base::Base;

bool WriteChunk(const riegeli::Chunk &chunk) override {
if (!Base::WriteChunk(chunk)) return false;
if (!dest()->Flush()) return FailWithoutAnnotation(dest()->status());
return true;
}
};

// TODO(ussuri): Expose as `Options` once Riegeli is the sole blob writer.
static constexpr uint64_t kMaxBufferedBytes = 100L * 1024 * 1024;
static constexpr int kRiegeliParallelism = 50;

void ResetStats(std::string_view path) {
path_ = path;
opened_ = absl::Now();
blobs_ = 0;
bytes_ = 0;
}

void UpdateStats(ByteSpan blob) {
blobs_ += 1;
bytes_ += blob.size();
}

// Returns a debug string with the effective writing rate for the current file
// path. The effective rate is measured as a ratio of the total bytes passed
// to `Write()` and the elapsed time from the file opening till now.
std::string StatsString() const {
const auto basename = fs::path{path_}.filename();
const auto secs = absl::ToDoubleSeconds(absl::Now() - opened_);
const auto thruput = static_cast<uint64_t>(std::ceil(bytes_ / secs));
const auto file_size = writer_.EstimatedSize();
const auto compression = file_size > 0 ? (1.0 * bytes_ / file_size) : 0;
return absl::StrFormat(
"basename: %s "
"blobs/bytes: %llu/%llu time: %.1f sec thruput: %llu B/sec "
"compression: %.2f"
"\nfull path: %s",
basename, blobs_, bytes_, secs, thruput, compression, path_);
}

std::unique_ptr<riegeli::Writer> byte_writer_;
riegeli::RecordWriter<std::unique_ptr<FlushingChunkWriter>> writer_{
riegeli::kClosed};

// Telemetry.
std::string path_;
absl::Time opened_ = absl::InfiniteFuture();
uint64_t blobs_ = 0;
uint64_t bytes_ = 0;
};
#endif // CENTIPEDE_DISABLE_RIEGELI

} // namespace

std::unique_ptr<BlobFileReader> DefaultBlobFileReaderFactory() {
return std::make_unique<DefaultBlobFileReader>();
}
Expand Down
Loading