Skip to content

Commit

Permalink
Add objects to pending list when delete objects failed.
Browse files Browse the repository at this point in the history
Retry delete when migration done.

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Jul 25, 2024
1 parent d547c1f commit 9f44b45
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
10 changes: 9 additions & 1 deletion src/server/server/vineyard_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,12 @@ Status VineyardServer::DelData(
"Fastpath deletion can only be applied to blobs");
}
context_.post([this, memory_trim, ids, callback] {
for (auto const id : ids) {
std::vector<ObjectID> migratings, non_migratings;
std::unique_lock<std::mutex> lock =
FindMigratingObjects(ids, migratings, non_migratings);
RemoveFromMigrationList(non_migratings);
AddPendingObjects(migratings);
for (auto const id : non_migratings) {
VINEYARD_DISCARD(bulk_store_->OnDelete(id, memory_trim));
}
VINEYARD_DISCARD(callback(Status::OK(), ids));
Expand All @@ -791,7 +796,10 @@ Status VineyardServer::DelData(
std::vector<ObjectID> migratings, non_migratings;
std::unique_lock<std::mutex> lock = self->FindMigratingObjects(
ids_to_delete, migratings, non_migratings);
VLOG(100) << "migrating object num:" << migratings.size()
<< ", non-migrating object num:" << non_migratings.size();
self->RemoveFromMigrationList(non_migratings);
self->AddPendingObjects(migratings);

VCATCH_JSON_ERROR(
meta, s,
Expand Down
41 changes: 36 additions & 5 deletions src/server/server/vineyard_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -249,14 +250,17 @@ class VineyardServer : public std::enable_shared_from_this<VineyardServer> {
}

void UnlockMigratingObjects(std::vector<ObjectID> const& ids) {
std::lock_guard<std::mutex> lock(migrating_objects_mutex_);
for (auto const& id : ids) {
if (migrating_objects_.find(id) != migrating_objects_.end()) {
if (--migrating_objects_[id] == 0) {
migrating_objects_.erase(id);
{
std::lock_guard<std::mutex> lock(migrating_objects_mutex_);
for (auto const& id : ids) {
if (migrating_objects_.find(id) != migrating_objects_.end()) {
if (--migrating_objects_[id] == 0) {
migrating_objects_.erase(id);
}
}
}
}
DeletePendingObjects();
}

std::unique_lock<std::mutex> FindMigratingObjects(
Expand Down Expand Up @@ -299,6 +303,30 @@ class VineyardServer : public std::enable_shared_from_this<VineyardServer> {
}
}

void DeletePendingObjects() {
std::vector<ObjectID> ids;
{
std::lock_guard<std::mutex> lock(pendding_to_delete_objects_mutex_);
if (pendding_to_delete_objects_.empty()) {
return;
}
for (auto const& id : pendding_to_delete_objects_) {
ids.push_back(id);
}
pendding_to_delete_objects_.clear();
}
this->DelData(ids, false, false, false, false,
[](const Status& status) { return Status::OK(); });
}

void AddPendingObjects(std::vector<ObjectID> const& ids) {
VLOG(100) << "Add object to pending delete list, size:" << ids.size();
std::lock_guard<std::mutex> lock(pendding_to_delete_objects_mutex_);
for (auto const& id : ids) {
pendding_to_delete_objects_.insert(id);
}
}

~VineyardServer();

private:
Expand Down Expand Up @@ -349,6 +377,9 @@ class VineyardServer : public std::enable_shared_from_this<VineyardServer> {

std::unordered_map<ObjectID, int> migrating_objects_;
std::mutex migrating_objects_mutex_;
// It must be blob.
std::unordered_set<ObjectID> pendding_to_delete_objects_;
std::mutex pendding_to_delete_objects_mutex_;
};

} // namespace vineyard
Expand Down

0 comments on commit 9f44b45

Please sign in to comment.