Skip to content

Commit

Permalink
Merge remote-tracking branch 'github/add-parallelism' into impl-proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 committed Jul 3, 2024
2 parents 9856fe0 + f86cb69 commit 060b8ee
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 116 deletions.
7 changes: 7 additions & 0 deletions docs/flex/interactive/data_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ To illustrate, let's examine the `examples/modern_import_full.yaml` file. This c

``` yaml
loading_config:
x_csr_params:
parallelism: 1
build_csr_in_mem: true
use_mmap_vector: true
data_source:
scheme: file
location: /home/modern_graph/
Expand Down Expand Up @@ -227,6 +231,9 @@ The table below offers a detailed breakdown of each configuration item. In this
| loading_config.format.metadata.escaping | false | Whether escaping is used | No |
| loading_config.format.metadata.escape_char | '\\' | Escaping character (if `escaping` is true) | No |
| loading_config.format.metadata.batch_size | 4MB | The size of batch for reading from files | No |
| loading_config.x_csr_params.parallelism | 1 | Number of threads used for bulk loading | No |
| loading_config.x_csr_params.build_csr_in_mem | false | Whether to build csr fully in memory | No |
| loading_config.x_csr_params.use_mmap_vector | false | Whether to use mmap_vector rather than mmap_array for building | No |
| | | | |
| **vertex_mappings** | N/A | Define how to map the raw data into a graph vertex in the schema | Yes |
| vertex_mappings.type_name | N/A | Name of the vertex type | Yes |
Expand Down
31 changes: 15 additions & 16 deletions flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ int main(int argc, char** argv) {
*
*/
desc.add_options()("help", "Display help message")(
"version,v", "Display version")("parallelism,p",
bpo::value<uint32_t>()->default_value(1),
"version,v", "Display version")("parallelism,p", bpo::value<uint32_t>(),
"parallelism of bulk loader")(
"data-path,d", bpo::value<std::string>(), "data directory path")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
Expand All @@ -90,7 +89,6 @@ int main(int argc, char** argv) {
return 0;
}

uint32_t parallelism = vm["parallelism"].as<uint32_t>();
std::string data_path = "";
std::string bulk_load_config_path = "";
std::string graph_schema_path = "";
Expand All @@ -110,17 +108,6 @@ int main(int argc, char** argv) {
return -1;
}
bulk_load_config_path = vm["bulk-load"].as<std::string>();
bool build_csr_in_mem = false;
if (vm.count("build-csr-in-mem")) {
build_csr_in_mem = vm["build-csr-in-mem"].as<bool>();
LOG(INFO) << "batch init in memory: " << static_cast<int>(build_csr_in_mem);
}

bool use_mmap_vector = false;
if (vm.count("use-mmap-vector")) {
use_mmap_vector = vm["use-mmap-vector"].as<bool>();
LOG(INFO) << "use mmap vector: " << static_cast<int>(use_mmap_vector);
}

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand All @@ -141,6 +128,19 @@ int main(int argc, char** argv) {
return -1;
}

// check whether parallelism, build_csr_in_mem, use_mmap_vector are overriden
if (vm.count("parallelism")) {
loading_config_res.value().SetParallelism(vm["parallelism"].as<uint32_t>());
}
if (vm.count("build-csr-in-mem")) {
loading_config_res.value().SetBuildCsrInMem(
vm["build-csr-in-mem"].as<bool>());
}
if (vm.count("use-mmap-vector")) {
loading_config_res.value().SetUseMmapVector(
vm["use-mmap-vector"].as<bool>());
}

std::filesystem::path data_dir_path(data_path);
if (!std::filesystem::exists(data_dir_path)) {
std::filesystem::create_directory(data_dir_path);
Expand All @@ -163,8 +163,7 @@ int main(int argc, char** argv) {
std::signal(SIGABRT, signal_handler);

auto loader = gs::LoaderFactory::CreateFragmentLoader(
data_dir_path.string(), schema_res.value(), loading_config_res.value(),
parallelism, build_csr_in_mem, use_mmap_vector);
data_dir_path.string(), schema_res.value(), loading_config_res.value());
loader->LoadFragment();

t += grape::GetCurrentTime();
Expand Down
25 changes: 16 additions & 9 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

namespace server {

std::string to_message_json(const std::string& message) {
return "{\"message\":\"" + message + "\"}";
}

gs::GraphStatistics get_graph_statistics(const gs::GraphDBSession& sess) {
gs::GraphStatistics stat;
const auto& graph = sess.graph();
Expand Down Expand Up @@ -569,8 +573,8 @@ seastar::future<admin_query_result> admin_actor::run_delete_graph(
}
WorkDirManipulator::DeleteGraph(query_param.content);
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully delete graph: " +
query_param.content));
gs::Result<seastar::sstring>(to_message_json(
"Successfully delete graph: " + query_param.content)));
} else {
LOG(ERROR) << "Fail to delete graph: "
<< delete_res.status().error_message();
Expand Down Expand Up @@ -809,8 +813,8 @@ seastar::future<admin_query_result> admin_actor::delete_procedure(

VLOG(10) << "Successfully delete procedure: " << procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully delete procedure: " +
procedure_id));
gs::Result<seastar::sstring>(
to_message_json("Successfully delete procedure: " + procedure_id)));
}

// update a procedure by graph name and procedure name
Expand Down Expand Up @@ -867,8 +871,8 @@ seastar::future<admin_query_result> admin_actor::update_procedure(
if (update_res.ok()) {
VLOG(10) << "Successfully update procedure: " << procedure_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully update procedure: " +
procedure_id));
gs::Result<seastar::sstring>(
to_message_json("Successfully update procedure: " + procedure_id)));
} else {
LOG(ERROR) << "Fail to create procedure: "
<< update_res.status().error_message();
Expand Down Expand Up @@ -1073,7 +1077,8 @@ seastar::future<admin_query_result> admin_actor::start_service(
LOG(INFO) << "Successfully started service with graph: " << graph_name;
hqps_service.reset_start_time();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully start service"));
gs::Result<seastar::sstring>(
to_message_json("Successfully start service")));
});
}

Expand Down Expand Up @@ -1111,7 +1116,8 @@ seastar::future<admin_query_result> admin_actor::stop_service(
if (hqps_service.stop_compiler_subprocess()) {
LOG(INFO) << "Successfully stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully stop service"));
gs::Result<seastar::sstring>(
to_message_json("Successfully stop service")));
} else {
LOG(ERROR) << "Fail to stop compiler";
return seastar::make_ready_future<admin_query_result>(
Expand Down Expand Up @@ -1301,7 +1307,8 @@ seastar::future<admin_query_result> admin_actor::cancel_job(
if (cancel_meta_res.ok()) {
VLOG(10) << "Successfully cancel job: " << job_id;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully cancel job: " + job_id));
gs::Result<seastar::sstring>(
to_message_json("Successfully cancel job: " + job_id)));
} else {
LOG(ERROR) << "Fail to cancel job: " << job_id << ", error message: "
<< cancel_meta_res.status().error_message();
Expand Down
44 changes: 7 additions & 37 deletions flex/interactive/sdk/python/test/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,8 @@
import pytest

from gs_interactive.client.driver import Driver
from gs_interactive.models.base_edge_type_vertex_type_pair_relations_inner import (
BaseEdgeTypeVertexTypePairRelationsInner,
)
from gs_interactive.models.create_edge_type import CreateEdgeType
from gs_interactive.models.create_graph_request import CreateGraphRequest
from gs_interactive.models.create_graph_schema_request import (
CreateGraphSchemaRequest,
)
from gs_interactive.models.create_procedure_request import (
CreateProcedureRequest,
)
from gs_interactive.models.create_property_meta import CreatePropertyMeta
from gs_interactive.models.create_vertex_type import CreateVertexType
from gs_interactive.models.edge_mapping import EdgeMapping
from gs_interactive.models.edge_mapping_type_triplet import (
EdgeMappingTypeTriplet,
)
from gs_interactive.models.gs_data_type import GSDataType
from gs_interactive.models.typed_value import TypedValue
from gs_interactive.models.job_status import JobStatus
from gs_interactive.models.long_text import LongText
from gs_interactive.models.primitive_type import PrimitiveType
from gs_interactive.models.schema_mapping import SchemaMapping
from gs_interactive.models.schema_mapping_loading_config import (
SchemaMappingLoadingConfig,
)
from gs_interactive.models.schema_mapping_loading_config_format import (
SchemaMappingLoadingConfigFormat,
)
from gs_interactive.models.schema_mapping_loading_config_data_source import (
SchemaMappingLoadingConfigDataSource,
)
from gs_interactive.models.start_service_request import StartServiceRequest
from gs_interactive.models.string_type import StringType
from gs_interactive.models.string_type_string import StringTypeString
from gs_interactive.models.vertex_mapping import VertexMapping
from gs_interactive.models.query_request import QueryRequest
from gs_interactive.models import *


class TestDriver(unittest.TestCase):
"""Test usage of driver"""
Expand Down Expand Up @@ -169,6 +134,11 @@ def bulkLoading(self):
data_source=SchemaMappingLoadingConfigDataSource(scheme="file", location=location),
import_option="init",
format=SchemaMappingLoadingConfigFormat(type="csv"),
x_csr_params=SchemaMappingLoadingConfigXCsrParams(
parallelism=1,
build_csr_in_mem=True,
use_mmap_vector=True
),
),
vertex_mappings=[
VertexMapping(type_name="person", inputs=[person_csv_path])
Expand Down
10 changes: 10 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,16 @@ components:
loading_config:
type: object
properties:
x_csr_params:
type: object
description: mutable_csr specific parameters
properties:
parallelism: # how many thread used for bulk loading
type: integer
build_csr_in_mem: # whether to build csr in memory
type: boolean
use_mmap_vector: # whether to use mmap vector
type: boolean
data_source:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,12 @@ static void append_edges(std::shared_ptr<arrow::Array> src_col,
class AbstractArrowFragmentLoader : public IFragmentLoader {
public:
AbstractArrowFragmentLoader(const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config,
int32_t thread_num, bool build_csr_in_mem,
bool use_mmap_vector)
const LoadingConfig& loading_config)
: loading_config_(loading_config),
schema_(schema),
thread_num_(thread_num),
build_csr_in_mem_(build_csr_in_mem),
use_mmap_vector_(use_mmap_vector),
thread_num_(loading_config_.GetParallelism()),
build_csr_in_mem_(loading_config_.GetBuildCsrInMem()),
use_mmap_vector_(loading_config_.GetUseMmapVector()),
basic_fragment_loader_(schema_, work_dir) {
vertex_label_num_ = schema_.vertex_label_num();
edge_label_num_ = schema_.edge_label_num();
Expand Down
10 changes: 4 additions & 6 deletions flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,9 @@ static void put_column_names_option(const LoadingConfig& loading_config,

std::shared_ptr<IFragmentLoader> CSVFragmentLoader::Make(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool use_mmap_vector) {
const LoadingConfig& loading_config) {
return std::shared_ptr<IFragmentLoader>(
new CSVFragmentLoader(work_dir, schema, loading_config, thread_num,
build_csr_in_mem, use_mmap_vector));
new CSVFragmentLoader(work_dir, schema, loading_config));
}

void CSVFragmentLoader::addVertices(label_t v_label_id,
Expand Down Expand Up @@ -317,8 +315,8 @@ void CSVFragmentLoader::loadVertices() {
++iter) {
vertex_files.emplace_back(iter->first, iter->second);
}
LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, " << " "
<< vertex_files.size() << " vertex files, ";
LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, "
<< " " << vertex_files.size() << " vertex files, ";
std::atomic<size_t> v_ind(0);
std::vector<std::thread> threads(thread_num_);
for (int i = 0; i < thread_num_; ++i) {
Expand Down
10 changes: 3 additions & 7 deletions flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ class CSVTableRecordBatchSupplier : public IRecordBatchSupplier {
class CSVFragmentLoader : public AbstractArrowFragmentLoader {
public:
CSVFragmentLoader(const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool use_mmap_vector)
: AbstractArrowFragmentLoader(work_dir, schema, loading_config,
thread_num, build_csr_in_mem,
use_mmap_vector) {}
const LoadingConfig& loading_config)
: AbstractArrowFragmentLoader(work_dir, schema, loading_config) {}

static std::shared_ptr<IFragmentLoader> Make(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool use_mmap_vector);
const LoadingConfig& loading_config);

~CSVFragmentLoader() {}

Expand Down
6 changes: 2 additions & 4 deletions flex/storages/rt_mutable_graph/loader/loader_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ void LoaderFactory::Finalize() {}

std::shared_ptr<IFragmentLoader> LoaderFactory::CreateFragmentLoader(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int thread_num, bool build_csr_in_mem,
bool use_mmap_vector) {
const LoadingConfig& loading_config) {
auto scheme = loading_config.GetScheme();
auto format = loading_config.GetFormat();
auto key = scheme + format;
auto& known_loaders_ = getKnownLoaders();
auto iter = known_loaders_.find(key);
if (iter != known_loaders_.end()) {
return iter->second(work_dir, schema, loading_config, thread_num,
build_csr_in_mem, use_mmap_vector);
return iter->second(work_dir, schema, loading_config);
} else {
LOG(FATAL) << "Unsupported format: " << format;
}
Expand Down
6 changes: 2 additions & 4 deletions flex/storages/rt_mutable_graph/loader/loader_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ class LoaderFactory {
public:
using loader_initializer_t = std::shared_ptr<IFragmentLoader> (*)(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int thread_num,
bool build_csr_in_mem, bool use_mmap_vector);
const LoadingConfig& loading_config);

static void Init();

static void Finalize();

static std::shared_ptr<IFragmentLoader> CreateFragmentLoader(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int thread_num,
bool build_csr_in_mem, bool use_mmap_vector);
const LoadingConfig& loading_config);

static bool Register(const std::string& scheme_type,
const std::string& format_type,
Expand Down
6 changes: 2 additions & 4 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,9 @@ ODPSTableRecordBatchSupplier::GetNextBatch() {

std::shared_ptr<IFragmentLoader> ODPSFragmentLoader::Make(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool use_mmap_vector) {
const LoadingConfig& loading_config) {
return std::shared_ptr<IFragmentLoader>(
new ODPSFragmentLoader(work_dir, schema, loading_config, thread_num,
build_csr_in_mem, use_mmap_vector));
new ODPSFragmentLoader(work_dir, schema, loading_config));
}
void ODPSFragmentLoader::init() { odps_read_client_.init(); }

Expand Down
10 changes: 3 additions & 7 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,12 @@ class ODPSTableRecordBatchSupplier : public IRecordBatchSupplier {
class ODPSFragmentLoader : public AbstractArrowFragmentLoader {
public:
ODPSFragmentLoader(const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool use_mmap_vector)
: AbstractArrowFragmentLoader(work_dir, schema, loading_config,
thread_num, build_csr_in_mem,
use_mmap_vector) {}
const LoadingConfig& loading_config)
: AbstractArrowFragmentLoader(work_dir, schema, loading_config) {}

static std::shared_ptr<IFragmentLoader> Make(
const std::string& work_dir, const Schema& schema,
const LoadingConfig& loading_config, int32_t thread_num,
bool build_csr_in_mem, bool);
const LoadingConfig& loading_config);

~ODPSFragmentLoader() {}

Expand Down
Loading

0 comments on commit 060b8ee

Please sign in to comment.