Skip to content

Commit

Permalink
Generate the unique id for vineyard objects/blobs. (#1988)
Browse files Browse the repository at this point in the history
Fixes #1987

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Aug 21, 2024
1 parent ee11896 commit 59732c0
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 58 deletions.
54 changes: 46 additions & 8 deletions src/common/util/uuid.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ limitations under the License.
#include <mach/mach.h>
#endif

#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <ctime>
Expand Down Expand Up @@ -141,31 +143,67 @@ using SessionID = int64_t;
*/
using PlasmaID = std::string;

class IDGenerator {
public:
static IDGenerator& getInstance() {
static IDGenerator instance;
return instance;
}

ObjectID GenerateID(InstanceID id = 0) {
auto timestamp = GetCurrentTimestamp();
auto instance_id = id & 0x3FFUL;
uint64_t sequence = sequence_.fetch_add(1) & sequence_mask;

return ((timestamp << timestamp_shift) |
(instance_id << instance_id_shift) | sequence);
}

private:
const uint64_t timestamp_shift = 22; // 41 bits for timestamp
const uint64_t instance_id_shift = 12; // 10 bits for instance id
const uint64_t sequence_mask = 0xFFFUL; // 12 bits for sequence number

std::atomic<uint64_t> sequence_{0};

IDGenerator() = default;

uint64_t GetCurrentTimestamp() {
auto now = std::chrono::high_resolution_clock::now();
auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
return (ts & 0x1FFFFFFFFFF);
}
};

/*
* @brief Make empty blob and preallocate blob always mapping to the same place
* Others will be mapped randomly between
* (0x8000000000000000UL,0xFFFFFFFFFFFFFFFFUL) exclusively.
*/
inline ObjectID GenerateBlobID(const uintptr_t ptr) {
static IDGenerator& idGenerator = IDGenerator::getInstance();
if (ptr == 0x8000000000000000UL ||
ptr == std::numeric_limits<uintptr_t>::max()) {
return static_cast<uint64_t>(ptr) | 0x8000000000000000UL;
}
auto ts = detail::cycleclock::now() % (0x7FFFFFFFFFFFFFFFUL - 2) + 1;
return (0x7FFFFFFFFFFFFFFFUL & static_cast<uint64_t>(ts)) |
0x8000000000000000UL;
return (idGenerator.GenerateID() | 0x8000000000000000UL);
}

inline SessionID GenerateSessionID() {
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
static IDGenerator& idGenerator = IDGenerator::getInstance();
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID();
}

inline ObjectID GenerateObjectID() {
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
inline ObjectID GenerateObjectID(InstanceID instance_id = 0) {
static IDGenerator& idGenerator = IDGenerator::getInstance();
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID(instance_id);
}

inline ObjectID GenerateSignature() {
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
inline ObjectID GenerateSignature(InstanceID instance_id = 0) {
static IDGenerator& idGenerator = IDGenerator::getInstance();
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID(instance_id);
}

const std::string ObjectIDToString(const ObjectID id);
Expand Down
34 changes: 18 additions & 16 deletions src/server/server/vineyard_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ Status VineyardServer::ListName(

namespace detail {

Status validate_metadata(const json& tree, json& result, Signature& signature) {
Status validate_metadata(const json& tree, json& result, Signature& signature,
InstanceID instance_id) {
// validate typename
auto type_name_node = tree.value("typename", json(nullptr));
if (type_name_node.is_null() || !type_name_node.is_string()) {
Expand All @@ -468,7 +469,7 @@ Status validate_metadata(const json& tree, json& result, Signature& signature) {
RETURN_ON_ASSERT(tree.contains("instance_id"),
"The instance_id filed must be presented");
result = tree;
signature = GenerateSignature();
signature = GenerateSignature(instance_id);
if (result.find("signature") != result.end()) {
signature = result["signature"].get<Signature>();
} else {
Expand All @@ -485,20 +486,21 @@ Status validate_metadata(const json& tree, json& result, Signature& signature) {

Status put_members_recursively(
std::shared_ptr<IMetaService> metadata_service_ptr, const json& meta,
json& tree, std::string const& instance_name) {
json& tree, std::string const& instance_name, InstanceID instance_id) {
for (auto& item : tree.items()) {
if (item.value().is_object()) {
auto& sub_tree = item.value();
if (!sub_tree.contains("id")) {
Signature signature;
RETURN_ON_ERROR(validate_metadata(sub_tree, sub_tree, signature));
RETURN_ON_ERROR(
validate_metadata(sub_tree, sub_tree, signature, instance_id));

// recursively create members
RETURN_ON_ERROR(put_members_recursively(metadata_service_ptr, meta,
sub_tree, instance_name));
RETURN_ON_ERROR(put_members_recursively(
metadata_service_ptr, meta, sub_tree, instance_name, instance_id));

Status s;
ObjectID id = GenerateObjectID();
ObjectID id = GenerateObjectID(instance_id);
InstanceID computed_instance_id = 0;
std::vector<meta_tree::op_t> ops;
VCATCH_JSON_ERROR(
Expand Down Expand Up @@ -545,18 +547,18 @@ Status VineyardServer::CreateData(
InstanceID& computed_instance_id) {
if (status.ok()) {
auto decorated_tree = json::object();
RETURN_ON_ERROR(
detail::validate_metadata(tree, decorated_tree, signature));
RETURN_ON_ERROR(detail::validate_metadata(
tree, decorated_tree, signature, self->instance_id()));

// expand trees: for putting many metadatas in a single call
if (recursive) {
RETURN_ON_ERROR(detail::put_members_recursively(
self->meta_service_ptr_, meta, decorated_tree,
self->instance_name_));
self->instance_name_, self->instance_id()));
}

Status s;
id = GenerateObjectID();
id = GenerateObjectID(self->instance_id());
VCATCH_JSON_ERROR(
meta, s,
meta_tree::PutDataOps(meta, self->instance_name(), id,
Expand Down Expand Up @@ -590,8 +592,8 @@ Status VineyardServer::CreateData(
for (auto const& tree : trees) {
Signature signature;
auto decorated_tree = json::object();
RETURN_ON_ERROR(
detail::validate_metadata(tree, decorated_tree, signature));
RETURN_ON_ERROR(detail::validate_metadata(
tree, decorated_tree, signature, self->instance_id()));
signatures.emplace_back(signature);
decorated_trees.emplace_back(decorated_tree);
}
Expand All @@ -601,12 +603,12 @@ Status VineyardServer::CreateData(
for (auto& decorated_tree : decorated_trees) {
RETURN_ON_ERROR(detail::put_members_recursively(
self->meta_service_ptr_, meta, decorated_tree,
self->instance_name_));
self->instance_name_, self->instance_id()));
}
}

for (auto& decorated_tree : decorated_trees) {
ObjectID id = GenerateObjectID();
ObjectID id = GenerateObjectID(self->instance_id());
InstanceID computed_instance_id = UnspecifiedInstanceID();
Status s;
VCATCH_JSON_ERROR(meta, s,
Expand Down Expand Up @@ -726,7 +728,7 @@ Status VineyardServer::ShallowCopy(const ObjectID id,
ENSURE_VINEYARDD_READY();
auto self(shared_from_this());
RETURN_ON_ASSERT(!IsBlob(id), "The blobs cannot be shallow copied");
ObjectID target_id = GenerateObjectID();
ObjectID target_id = GenerateObjectID(self->instance_id());
meta_service_ptr_->RequestToShallowCopy(
[id, extra_metadata, target_id](const Status& status, const json& meta,
std::vector<meta_tree::op_t>& ops,
Expand Down
106 changes: 106 additions & 0 deletions test/concurrent_id_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/** Copyright 2020-2023 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>
#include <unordered_set>
#include <vector>

#include "common/util/logging.h"
#include "common/util/status.h"

#include "client/client.h"
#include "client/ds/blob.h"

using namespace vineyard; // NOLINT(build/namespaces)

const int num_threads = 16;
const int ids_per_thread = 10000;

void testGenerateBlobID(std::string ipc_socket) {
std::unordered_set<uint64_t> blob_ids;
std::mutex mtx;

auto generate_blob_id = [&]() {
auto client = std::make_shared<Client>();
VINEYARD_CHECK_OK(client->Connect(ipc_socket));
for (int i = 0; i < ids_per_thread; ++i) {
std::unique_ptr<BlobWriter> blob_writer;

VINEYARD_CHECK_OK(client->CreateBlob(1, blob_writer));
auto blob_id = blob_writer->id();
std::lock_guard<std::mutex> lock(mtx);
auto result = blob_ids.insert(blob_id);
if (!result.second) {
LOG(ERROR) << "Duplicated blob id: " << blob_id;
}
CHECK(result.second == true);
}
VINEYARD_CHECK_OK(client->Clear());
};

std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(generate_blob_id);
}
for (auto& thread : threads) {
thread.join();
}
}

void testGenerateObjectID(std::string ipc_socket) {
std::unordered_set<uint64_t> object_ids;
std::mutex mtx;

auto generate_object_id = [&]() {
auto client = std::make_shared<Client>();
VINEYARD_CHECK_OK(client->Connect(ipc_socket));
for (int i = 0; i < ids_per_thread; ++i) {
std::unique_ptr<BlobWriter> blob_writer;
VINEYARD_CHECK_OK(client->CreateBlob(1, blob_writer));
std::shared_ptr<Object> object = blob_writer->Seal(*client.get());
auto object_id = object->id();
std::lock_guard<std::mutex> lock(mtx);
auto result = object_ids.insert(object_id);
if (!result.second) {
LOG(ERROR) << "Duplicated object id: " << object_id;
}
CHECK(result.second == true);
}
VINEYARD_CHECK_OK(client->Clear());
};

std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(generate_object_id);
}
for (auto& thread : threads) {
thread.join();
}
}

int main(int argc, char** argv) {
if (argc < 2) {
printf("usage ./concurrent_id_test <ipc_socket>");
return 1;
}
std::string ipc_socket = std::string(argv[1]);

testGenerateBlobID(ipc_socket);
testGenerateObjectID(ipc_socket);
return 0;
}
33 changes: 0 additions & 33 deletions test/id_test.cc

This file was deleted.

2 changes: 1 addition & 1 deletion test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def run_vineyard_cpp_tests(meta, allocator, endpoints, tests):
run_test(tests, 'hashmap_test')
run_test(tests, 'hashmap_mvcc_test')
# run_test(tests, 'hosseinmoein_dataframe_test')
run_test(tests, 'id_test')
run_test(tests, 'concurrent_id_test')
run_test(tests, 'invalid_connect_test', '127.0.0.1:%d' % rpc_socket_port)
run_test(tests, 'large_meta_test')
run_test(tests, 'list_object_test')
Expand Down

0 comments on commit 59732c0

Please sign in to comment.