Skip to content

Commit

Permalink
Support multi-thread for writer
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Aug 13, 2024
1 parent fe6ba6c commit 32f46c4
Show file tree
Hide file tree
Showing 19 changed files with 400 additions and 81 deletions.
2 changes: 1 addition & 1 deletion apps/analytical_engine/core/utils/gart_vertex_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class GartVertexArray {

if (unlikely(offset >= data_.size())) {
LOG(ERROR) << "offset: " << offset << " data size: " << data_.size()
<< "loc: " << loc.GetValue();
<< " loc: " << loc.GetValue();
}
return data_[offset];
}
Expand Down
29 changes: 10 additions & 19 deletions interfaces/fragment/gart_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,18 @@ class GartFragment {

inner_offsets_[vlabel] =
blob_info[i]["vertex_table"]["max_inner_location"].get<int64_t>() - 1;
max_inner_offsets_[vlabel] =
blob_info[i]["vertex_table"]["max_inner"].get<int64_t>() - 1;

outer_offsets_[vlabel] =
blob_info[i]["vertex_table"]["min_outer_location"].get<int64_t>();
min_outer_offsets_[vlabel] =
blob_info[i]["vertex_table"]["min_outer"].get<int64_t>();
min_outer_offsets_[vlabel] =
max_outer_id_offset_ + 1 -
(blob_info[i]["vertex_table"]["max"].get<int64_t>() -
min_outer_offsets_[vlabel]);

vertex_table_lens_[vlabel] =
vertex_table_blob->allocated_size() / sizeof(vid_t);

Expand All @@ -310,25 +320,6 @@ class GartFragment {
hashmap_t::View(client_, vertex_map_blob, vertex_map_hmapview));
vertex_maps_[vlabel] = vertex_map_hmapview;

for (int64_t j = inner_offsets_[vlabel]; j >= 0; j--) {
vid_t v = vertex_tables_[vlabel][j];
auto delete_flag = v >> (sizeof(vid_t) * 8 - 1);
if (delete_flag == 0) {
max_inner_offsets_[vlabel] = vid_parser.GetOffset(v);
break;
}
}

for (int64_t j = outer_offsets_[vlabel]; j < vertex_table_lens_[vlabel];
j++) {
vid_t v = vertex_tables_[vlabel][j];
auto delete_flag = v >> (sizeof(vid_t) * 8 - 1);
if (delete_flag == 0) {
min_outer_offsets_[vlabel] = vid_parser.GetOffset(v);
break;
}
}

// init ovl2g
uint64_t ovl2g_obj_id =
blob_info[i]["ovl2g"]["object_id"].get<uint64_t>();
Expand Down
5 changes: 5 additions & 0 deletions vegito/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

option (WITH_TEST "build for test" OFF)
option (WITH_GLOBAL_VERTEX_MAP "use global vertex map?" OFF)
option (USE_MULTI_THREADS "use multiple threads to process logs ?" OFF)

if(WITH_GLOBAL_VERTEX_MAP)
add_definitions(-DUSE_GLOBAL_VERTEX_MAP)
endif()

if (USE_MULTI_THREADS)
add_definitions(-DUSE_MULTI_THREADS)
endif()

### Forbid in-source builds ###
if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_BINARY_DIR}")
message(FATAL_ERROR "In-source builds are not allowed.")
Expand Down
1 change: 1 addition & 0 deletions vegito/include/seggraph/epoch_graph_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class EpochGraphWriter {
dir_t dir = EOUT);
uintptr_t locate_segment_ptr(segid_t seg_id, label_t label, dir_t dir = EOUT);
vertex_t new_vertex(bool use_recycled_vertex = false);
vertex_t new_vertex(vertex_t real_vertex_id);
void put_vertex(vertex_t vertex_id, std::string_view data);
void put_edge(vertex_t src, label_t label, vertex_t dst,
std::string_view edge_data = "") {
Expand Down
7 changes: 6 additions & 1 deletion vegito/include/seggraph/segment_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class SegGraph {

rg_map(rg_map) {
vertex_futexes = array_allocator.allocate<Futex>(max_vertex_id + 1);

#ifdef USE_MULTI_THREADS
seg_init_flag.resize(max_vertex_id + 1, 0);
#endif
seg_mutexes =
array_allocator.allocate<std::shared_timed_mutex*>(max_seg_id + 1);

Expand Down Expand Up @@ -224,6 +226,9 @@ class SegGraph {

Futex* vertex_futexes;
std::shared_timed_mutex** seg_mutexes;
#ifdef USE_MULTI_THREADS
std::vector<int> seg_init_flag;
#endif
uintptr_t* vertex_ptrs;
uintptr_t* edge_label_ptrs;

Expand Down
178 changes: 173 additions & 5 deletions vegito/src/framework/bench_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdint>
#include <fstream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -146,6 +147,9 @@ Status init_graph_schema(string etcd_endpoint, string etcd_prefix,

graph_store->set_vertex_label_num(vlabel_num);
graph_store->init_id_parser(vlabel_num);
#ifdef USE_MULTI_THREADS
graph_store->init_mutexes(vlabel_num);
#endif
graph_schema.elabel_offset = vlabel_num;
int prop_offset = 0;

Expand Down Expand Up @@ -513,12 +517,100 @@ Status init_graph_schema(string etcd_endpoint, string etcd_prefix,
return Status::OK();
}

#ifdef USE_MULTI_THREADS
void Runner::process_log_thread(int pid, int thread_id) {
std::string log_str;
while (true) {
while (!logs_.try_pop(log_str)) {
working_state_mutex_[thread_id]->lock_shared();
bool is_working = is_working_[thread_id];
working_state_mutex_[thread_id]->unlock_shared();
if (is_working) {
working_state_mutex_[thread_id]->lock();
is_working_[thread_id] = false;
working_state_mutex_[thread_id]->unlock();
}
std::this_thread::yield();
}
working_state_mutex_[thread_id]->lock_shared();
bool is_working = is_working_[thread_id];
working_state_mutex_[thread_id]->unlock_shared();
if (!is_working) {
working_state_mutex_[thread_id]->lock();
is_working_[thread_id] = true;
working_state_mutex_[thread_id]->unlock();
}
std::string_view log(log_str);
auto sv_vec = splitString(log, '|');
string_view op(sv_vec[0]);
int cur_epoch = stoi(string(sv_vec[1]));
sv_vec.erase(sv_vec.begin(), sv_vec.begin() + 1);
sv_vec.pop_back();
if (op == "add_vertex") {
process_add_vertex(sv_vec, graph_stores_[pid]);
} else if (op == "add_edge") {
process_add_edge(sv_vec, graph_stores_[pid]);
} else if (op == "delete_vertex") {
process_del_vertex(sv_vec, graph_stores_[pid]);
} else if (op == "delete_edge") {
process_del_edge(sv_vec, graph_stores_[pid]);
} else if (op == "update_vertex") {
process_update_vertex(sv_vec, graph_stores_[pid]);
} else {
LOG(ERROR) << "Unsupported operator " << op;
}
}
}
#endif

void Runner::apply_log_to_store_(const string_view& log, int p_id) {
#ifndef USE_MULTI_THREADS
auto sv_vec = splitString(log, '|');
string_view op(sv_vec[0]);
int cur_epoch = stoi(string(sv_vec[1]));
#else
size_t firstDelim = log.find('|');
size_t secondDelim = log.find('|', firstDelim + 1);
std::string_view op = log.substr(0, firstDelim);
int cur_epoch = stoi(
std::string(log.substr(firstDelim + 1, secondDelim - firstDelim - 1)));
#endif

#ifdef USE_MULTI_THREADS
if (cur_epoch > latest_epoch_) {
while (!logs_.empty()) {
std::this_thread::yield();
}

for (auto idx = 0; idx < FLAGS_num_threads; idx++) {
while (true) {
working_state_mutex_[idx]->lock_shared();
bool is_working = is_working_[idx];
working_state_mutex_[idx]->unlock_shared();
if (is_working) {
std::this_thread::yield();
} else {
break;
}
}
}

static auto startTime = std::chrono::high_resolution_clock::now();
graph_stores_[p_id]->update_blob(latest_epoch_);
graph_stores_[p_id]->insert_blob_schema(latest_epoch_);
// put schema to etcd
graph_stores_[p_id]->put_blob_json_etcd(latest_epoch_);
auto endTime = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - start_time_)
.count();
start_time_ = endTime;

cout << "update epoch " << latest_epoch_ << " frag = " << p_id
<< " time = " << duration << " ms using " << fLI::FLAGS_num_threads
<< " threads" << endl;
latest_epoch_ = cur_epoch;
}
#else

if (cur_epoch > latest_epoch_) {
graph_stores_[p_id]->update_blob(latest_epoch_);
Expand All @@ -528,15 +620,16 @@ void Runner::apply_log_to_store_(const string_view& log, int p_id) {

auto endTime = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
endTime - startTime)
endTime - start_time_)
.count();

startTime = endTime;
start_time_ = endTime;

cout << "update epoch " << latest_epoch_ << " frag = " << p_id
cout << "single thread update epoch " << latest_epoch_ << " frag = " << p_id
<< " time = " << duration << " ms" << endl;
latest_epoch_ = cur_epoch;
}
#endif

if (unlikely(op == "bulkload_end")) {
cout << "Completion of bulkload and transition to epoch " << cur_epoch
Expand Down Expand Up @@ -573,12 +666,66 @@ void Runner::apply_log_to_store_(const string_view& log, int p_id) {
}
return;
}

#ifndef USE_MULTI_THREADS
sv_vec.erase(sv_vec.begin(), sv_vec.begin() + 1);
// remove the last element of sv_vec, since the last element of sv_vec is
// offset of binlog
sv_vec.pop_back();
#endif

#ifdef USE_MULTI_THREADS
if (op == "add_vertex" || op == "add_edge") {
if (parallel_state_) {
logs_.push(std::string(log));
} else {
while (!logs_.empty()) {
std::this_thread::yield();
}

for (auto idx = 0; idx < FLAGS_num_threads; idx++) {
while (true) {
working_state_mutex_[idx]->lock_shared();
bool is_working = is_working_[idx];
working_state_mutex_[idx]->unlock_shared();
if (is_working) {
std::this_thread::yield();
} else {
break;
}
}
}

logs_.push(std::string(log));
parallel_state_ = true;
}
} else if (op == "delete_vertex" || op == "delete_edge" ||
op == "update_vertex") {
parallel_state_ = false;
while (!logs_.empty()) {
std::this_thread::yield();
}

for (auto idx = 0; idx < FLAGS_num_threads; idx++) {
while (true) {
working_state_mutex_[idx]->lock_shared();
bool is_working = is_working_[idx];
working_state_mutex_[idx]->unlock_shared();
if (is_working) {
std::this_thread::yield();
} else {
break;
}
}
}

logs_.push(std::string(log));
} else {
LOG(ERROR) << "Unsupported operator " << op;
}
return;
#endif

#ifndef USE_MULTI_THREADS
if (op == "add_vertex") {
process_add_vertex(sv_vec, graph_stores_[p_id]);
} else if (op == "add_edge") {
Expand All @@ -592,6 +739,7 @@ void Runner::apply_log_to_store_(const string_view& log, int p_id) {
} else {
LOG(ERROR) << "Unsupported operator " << op;
}
#endif
}

Status Runner::start_kafka_to_process_(int p_id) {
Expand Down Expand Up @@ -635,6 +783,7 @@ Status Runner::start_kafka_to_process_(int p_id) {
}

printf("Start main loop for subgraph %d ...\n", p_id);
start_time_ = std::chrono::high_resolution_clock::now();
while (1) {
RdKafka::Message* msg = consumer->consume(topic, partition, 1000);
const char* log_base = static_cast<const char*>(msg->payload());
Expand Down Expand Up @@ -675,17 +824,36 @@ void Runner::load_graph_partitions_from_logs_(int mac_id,
int v_label_num = graph_stores_[p_id]->get_total_vertex_label_num();
graph_stores_[p_id]->init_ovg2ls(v_label_num);
graph_stores_[p_id]->init_vertex_maps(v_label_num);
#ifdef USE_MULTI_THREADS
std::vector<std::thread> threads;
for (auto idx = 0; idx < FLAGS_num_threads; idx++) {
threads.emplace_back(&Runner::process_log_thread, this, p_id, idx);
}
#endif
#ifndef WITH_TEST
GART_CHECK_OK(start_kafka_to_process_(p_id));
#else
start_file_stream_to_process_(p_id);
#endif

#ifdef USE_MULTI_THREADS
for (auto& thread : threads) {
thread.join();
}
#endif
}

void Runner::run() {
/*************** Load Data ****************/
int mac_id = gart::framework::config.getServerID();
int total_partitions = gart::framework::config.getNumServers();
#ifdef USE_MULTI_THREADS
working_state_mutex_.resize(FLAGS_num_threads);
for (auto idx = 0; idx < FLAGS_num_threads; idx++) {
is_working_.push_back(true);
working_state_mutex_[idx] = std::make_shared<std::shared_timed_mutex>();
}
#endif

load_graph_partitions_from_logs_(mac_id, total_partitions);

Expand Down
Loading

0 comments on commit 32f46c4

Please sign in to comment.