Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in-progress 304 rock updates to mgr:openfd_objects #221

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/MemObject.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ MemObject::~MemObject()
assert(xitTable.index < 0);
assert(memCache.index < 0);
assert(swapout.sio == nullptr);
assert(!update.reader && !update.writer);

data_hdr.freeContent();
}
Expand Down Expand Up @@ -198,6 +199,12 @@ MemObject::stat(MemBuf * mb) const
if (swapout.sio.getRaw())
mb->appendf("\tswapout: %" PRId64 " bytes written\n", (int64_t) swapout.sio->offset());

if (update.reader)
mb->appendf("\tupdate: %" PRId64 " bytes read\n", update.reader->offset());

if (update.writer)
mb->appendf("\tupdate: %" PRId64 " bytes written\n", update.reader->offset());

if (xitTable.index >= 0)
mb->appendf("\ttransient index: %d state: %d\n", xitTable.index, xitTable.io);
if (memCache.index >= 0)
Expand Down Expand Up @@ -264,6 +271,7 @@ void
MemObject::reset()
{
assert(swapout.sio == nullptr);
assert(!update.reader && !update.writer);
data_hdr.freeContent();
inmem_lo = 0;
/* Should we check for clients? */
Expand Down
10 changes: 10 additions & 0 deletions src/MemObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ class MemObject

SwapOut swapout;

// IO for Rock::HeaderUpdater
class Update
{
public:
StoreIOState::Pointer reader; ///< reads old headers and old data
StoreIOState::Pointer writer; ///< writes new headers and old data
};
rousskov marked this conversation as resolved.
Show resolved Hide resolved

Update update;

/* TODO: Remove this change-minimizing hack */
using Io = Store::IoStatus;
static constexpr Io ioUndecided = Store::ioUndecided;
Expand Down
99 changes: 58 additions & 41 deletions src/fs/rock/RockHeaderUpdater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const I
AsyncJob("Rock::HeaderUpdater"),
store(aStore),
update(anUpdate),
reader(),
writer(),
bytesRead(0),
staleSwapHeaderSize(0),
staleSplicingPointNext(-1)
Expand All @@ -33,7 +31,7 @@ Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const I
bool
Rock::HeaderUpdater::doneAll() const
{
return !reader && !writer && AsyncJob::doneAll();
return !reader() && !writer() && AsyncJob::doneAll();
}

void
Expand All @@ -42,20 +40,23 @@ Rock::HeaderUpdater::swanSong()
if (update.stale || update.fresh)
store->map->abortUpdating(update);

if (reader) {
reader->close(StoreIOState::readerDone);
reader = nullptr;
auto &mem = update.entry->mem();

if (reader()) {
reader()->close(StoreIOState::readerDone);
mem.update.reader = nullptr;
}

if (writer) {
writer->close(StoreIOState::writerGone);
if (writer()) {
writer()->close(StoreIOState::writerGone);
// Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
// Also required to avoid IoState destructor assertions.
// We can do this because we closed update earlier or aborted it above.
dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
writer = nullptr;
dynamic_cast<IoState&>(*writer()).writeableAnchor_ = nullptr;
mem.update.writer = nullptr;
}


AsyncJob::swanSong();
}

Expand All @@ -71,10 +72,12 @@ Rock::HeaderUpdater::start()
void
Rock::HeaderUpdater::startReading()
{
reader = store->openStoreIO(
*update.entry,
&NoteDoneReading,
this);
auto &mem = update.entry->mem();
mem.update.reader = store->openStoreIO(
*update.entry,
&NoteDoneReading,
this);

readMore("need swap entry metadata");
}

Expand All @@ -83,15 +86,15 @@ Rock::HeaderUpdater::stopReading(const char *why)
{
debugs(47, 7, why);

Must(reader);
const IoState &rockReader = dynamic_cast<IoState&>(*reader);
Must(reader());
const IoState &rockReader = dynamic_cast<IoState&>(*reader());
update.stale.splicingPoint = rockReader.splicingPoint;
staleSplicingPointNext = rockReader.staleSplicingPointNext;
debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
" body continues at " << staleSplicingPointNext);

reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
reader = nullptr; // so that swanSong() does not try to close again
reader()->close(StoreIOState::readerDone); // calls noteDoneReading(0)
update.entry->mem().update.reader = nullptr; // so that swanSong() does not try to close again
}

void
Expand Down Expand Up @@ -127,9 +130,9 @@ void
Rock::HeaderUpdater::readMore(const char *why)
{
debugs(47, 7, "from " << bytesRead << " because " << why);
Must(reader);
Must(reader());
readerBuffer.clear();
storeRead(reader,
storeRead(reader(),
readerBuffer.rawAppendStart(store->slotSize),
store->slotSize,
bytesRead,
Expand All @@ -151,12 +154,12 @@ Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Poin
void
Rock::HeaderUpdater::noteDoneReading(int errflag)
{
debugs(47, 5, errflag << " writer=" << writer);
if (!reader) {
debugs(47, 5, errflag << " writer=" << writer());
if (!reader()) {
Must(!errflag); // we only initiate successful closures
Must(writer); // otherwise we would be done() and would not be called
Must(writer()); // otherwise we would be done() and would not be called
} else {
reader = nullptr; // we are done reading
update.entry->mem().update.reader = nullptr; // we are done reading
Must(errflag); // any external closures ought to be errors
mustStop("read error");
}
Expand All @@ -165,13 +168,15 @@ Rock::HeaderUpdater::noteDoneReading(int errflag)
void
Rock::HeaderUpdater::startWriting()
{
writer = store->createUpdateIO(
update,
&NoteDoneWriting,
this);
Must(writer);
auto &mem = update.entry->mem();

IoState &rockWriter = dynamic_cast<IoState&>(*writer);
mem.update.writer = store->createUpdateIO(
update,
&NoteDoneWriting,
this);
Must(writer());

IoState &rockWriter = dynamic_cast<IoState&>(*writer());
rockWriter.staleSplicingPointNext = staleSplicingPointNext;

// here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
Expand All @@ -180,8 +185,6 @@ Rock::HeaderUpdater::startWriting()

off_t offset = 0; // current writing offset (for debugging)

const auto &mem = update.entry->mem();

{
debugs(20, 7, "fresh store meta for " << *update.entry);
size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below
Expand All @@ -197,7 +200,7 @@ Rock::HeaderUpdater::startWriting()
update.entry->swap_file_sz = savedEntrySwapFileSize;

Must(freshSwapHeader);
writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
writer()->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
stalePrefixSz += mem.swap_hdr_sz;
freshPrefixSz += freshSwapHeaderSize;
offset += freshSwapHeaderSize;
Expand All @@ -207,7 +210,7 @@ Rock::HeaderUpdater::startWriting()
{
debugs(20, 7, "fresh HTTP header @ " << offset);
const auto httpHeader = mem.freshestReply().pack();
writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
writer()->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
const auto &staleReply = mem.baseReply();
Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below
Must(staleReply.hdr_sz > 0); // already initialized
Expand All @@ -219,7 +222,7 @@ Rock::HeaderUpdater::startWriting()

{
debugs(20, 7, "moved HTTP body prefix @ " << offset);
writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
writer()->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
offset += exchangeBuffer.length();
exchangeBuffer.clear();
}
Expand All @@ -233,7 +236,7 @@ Rock::HeaderUpdater::startWriting()
swap_file_sz -= stalePrefixSz;
swap_file_sz += freshPrefixSz;

writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
writer()->close(StoreIOState::wroteAll); // should call noteDoneWriting()
}

void
Expand All @@ -249,17 +252,19 @@ Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Poin
void
Rock::HeaderUpdater::noteDoneWriting(int errflag)
{
debugs(47, 5, errflag << " reader=" << reader);
debugs(47, 5, errflag << " reader=" << reader());
Must(!errflag);
Must(!reader); // if we wrote everything, then we must have read everything
Must(!reader()); // if we wrote everything, then we must have read everything

Must(writer);
IoState &rockWriter = dynamic_cast<IoState&>(*writer);
Must(writer());
IoState &rockWriter = dynamic_cast<IoState&>(*writer());
update.fresh.splicingPoint = rockWriter.splicingPoint;
debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
store->map->closeForUpdating(update);
rockWriter.writeableAnchor_ = nullptr;
writer = nullptr; // we are done writing
update.entry->mem().update.writer = nullptr; // we are done writing
auto &mem = update.entry->mem();
mem.swapout.sio = nullptr;

Must(doneAll());
}
Expand Down Expand Up @@ -291,3 +296,15 @@ Rock::HeaderUpdater::parseReadBytes()
startWriting();
}

StoreIOState::Pointer
Rock::HeaderUpdater::reader() const
{
return update.entry->mem().update.reader;
}

StoreIOState::Pointer
Rock::HeaderUpdater::writer() const
{
return update.entry->mem().update.writer;
}

4 changes: 2 additions & 2 deletions src/fs/rock/RockHeaderUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class HeaderUpdater: public AsyncJob
Rock::SwapDir::Pointer store; ///< cache_dir where the entry is stored
Ipc::StoreMapUpdate update; ///< Ipc::StoreMap update reservation

StoreIOState::Pointer reader; ///< reads old headers and old data
StoreIOState::Pointer writer; ///< writes new headers and old data
StoreIOState::Pointer reader() const;
StoreIOState::Pointer writer() const;

SBuf readerBuffer; ///< I/O buffer for a single read operation
SBuf exchangeBuffer; ///< bytes read but not yet discarded or written
Expand Down
9 changes: 6 additions & 3 deletions src/stat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,13 @@ statObjectsOpenfdFilter(const StoreEntry * e)
if (e->mem_obj == nullptr)
return 0;

if (e->mem_obj->swapout.sio == nullptr)
return 0;
if (e->mem_obj->swapout.sio)
return 1;

if (e->mem_obj->update.reader || e->mem_obj->update.writer)
return 1;

return 1;
return 0;
rousskov marked this conversation as resolved.
Show resolved Hide resolved
}

static void
Expand Down