diff --git a/analytical_engine/CMakeLists.txt b/analytical_engine/CMakeLists.txt index fa5853dce7d4..ae5de573b75e 100644 --- a/analytical_engine/CMakeLists.txt +++ b/analytical_engine/CMakeLists.txt @@ -364,10 +364,16 @@ if (ENABLE_JAVA_SDK) endif() endif() +add_library(applications SHARED core/applications.cc) +target_include_directories(applications PRIVATE ${LIBGRAPELITE_INCLUDE_DIRS}/grape/analytical_apps utils apps) +target_link_libraries(applications ${LIBGRAPELITE_LIBRARIES} ${GFLAGS_LIBRARIES} ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${VINEYARD_LIBRARIES}) +target_link_libraries(applications OpenMP::OpenMP_CXX) +target_link_libraries(applications gs_util) +target_link_libraries(grape_engine PRIVATE applications) # Test targets if (BUILD_TESTS) - add_executable(run_app test/run_app.cc core/object/dynamic.cc) + add_executable(run_app test/run_app.cc core/object/dynamic.cc core/flags.cc) target_include_directories(run_app PRIVATE ${LIBGRAPELITE_INCLUDE_DIRS}/grape/analytical_apps utils apps) target_link_libraries(run_app ${LIBGRAPELITE_LIBRARIES} ${GFLAGS_LIBRARIES} ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${VINEYARD_LIBRARIES}) target_link_libraries(run_app OpenMP::OpenMP_CXX) @@ -421,7 +427,7 @@ if (BUILD_TESTS) cmake_parse_arguments(add_vineyard_app "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) add_executable(${target} ${add_vineyard_app_SRCS} core/object/dynamic.cc) target_include_directories(${target} PRIVATE ${LIBGRAPELITE_INCLUDE_DIRS}/grape/analytical_apps) - target_link_libraries(${target} gs_proto ${LIBGRAPELITE_LIBRARIES} ${VINEYARD_LIBRARIES}) + target_link_libraries(${target} gs_proto applications ${LIBGRAPELITE_LIBRARIES} ${VINEYARD_LIBRARIES}) if (${LIBUNWIND_FOUND}) target_link_libraries(${target} ${LIBUNWIND_LIBRARIES}) endif () @@ -576,6 +582,7 @@ endmacro() install_gsa_binary(grape_engine) install_gsa_binary(gs_proto) install_gsa_binary(gs_util) +install_gsa_binary(applications) if (ENABLE_JAVA_SDK) install_gsa_binary(graphx_runner) diff --git a/analytical_engine/apps/sssp/sssp_average_length.h b/analytical_engine/apps/sssp/sssp_average_length.h index e064bc5f576b..b1de0b5dba43 100644 --- a/analytical_engine/apps/sssp/sssp_average_length.h +++ b/analytical_engine/apps/sssp/sssp_average_length.h @@ -109,7 +109,6 @@ class SSSPAverageLength bool update_sum = false; pair_msg_t msg; - int msg_cnt = 0; while (messages.GetMessage(msg)) { bool is_vertex_msg = msg.first; if (is_vertex_msg) { @@ -129,7 +128,6 @@ class SSSPAverageLength fid_t fid = (fid_t)(std::get<0>(msg.second)); ctx.all_sums[fid] = std::get<2>(msg.second); } - msg_cnt++; } for (auto& it : updated_map) { diff --git a/analytical_engine/core/applications.cc b/analytical_engine/core/applications.cc new file mode 100644 index 000000000000..c0dc28e1aa3b --- /dev/null +++ b/analytical_engine/core/applications.cc @@ -0,0 +1,290 @@ +#include "core/applications.h" + +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "grape/config.h" +#include "grape/fragment/immutable_edgecut_fragment.h" +#include "grape/fragment/loader.h" +#include "grape/grape.h" + +#include "bfs/bfs_auto.h" +#include "bfs/bfs_opt.h" +#include "cdlp/cdlp.h" +#include "cdlp/cdlp_auto.h" +#include "cdlp/cdlp_opt.h" +#include "lcc/lcc_auto.h" +#include "lcc/lcc_opt.h" +#include "pagerank/pagerank_auto.h" +#include "pagerank/pagerank_directed.h" +#include "pagerank/pagerank_opt.h" +#include "sssp/sssp_auto.h" +#include "sssp/sssp_opt.h" +#include "voterank/voterank.h" +#include "wcc/wcc.h" +#include "wcc/wcc_auto.h" +#include "wcc/wcc_opt.h" + +#include "apps/bfs/bfs_generic.h" +#include "apps/centrality/degree/degree_centrality.h" +// #include "apps/centrality/eigenvector/eigenvector_centrality.h" +// #include "apps/centrality/katz/katz_centrality.h" +#include "apps/clustering/avg_clustering.h" +#include "apps/clustering/clustering.h" +#include "apps/clustering/transitivity.h" +#include "apps/clustering/triangles.h" +#include "apps/dfs/dfs.h" +#include "apps/hits/hits.h" +#include "apps/kcore/kcore.h" +#include "apps/kshell/kshell.h" +#include "apps/sssp/sssp_average_length.h" +#include "apps/sssp/sssp_has_path.h" +#include "apps/sssp/sssp_path.h" +#include "core/flags.h" +#include "core/fragment/dynamic_fragment.h" + +#include "apps/lpa/lpa_u2i.h" +#include "apps/property/auto_sssp_property.h" +#include "apps/property/auto_wcc_property.h" +#include "apps/property/sssp_property.h" +#include "apps/property/wcc_property.h" +#include "apps/sampling_path/sampling_path.h" + +namespace bl = boost::leaf; + +using oid_t = vineyard::property_graph_types::OID_TYPE; +using vid_t = vineyard::property_graph_types::VID_TYPE; + +using FragmentType = vineyard::ArrowFragment; + +namespace gs { + +template +std::shared_ptr LoadSimpleGraph(const std::string& efile, + const std::string& vfile, + const grape::CommSpec& comm_spec) { + grape::LoadGraphSpec graph_spec = grape::DefaultLoadGraphSpec(); + graph_spec.set_directed(FLAGS_directed); + std::shared_ptr fragment = + grape::LoadGraph(efile, vfile, comm_spec, graph_spec); + return fragment; +} + +template +vineyard::ObjectID LoadPropertyGraph(const grape::CommSpec& comm_spec, + vineyard::Client& client, + const std::vector& efiles, + const std::vector& vfiles, + int directed) { + vineyard::ObjectID fragment_id = vineyard::InvalidObjectID(); + { + auto loader = std::make_unique>( + client, comm_spec, efiles, vfiles, directed != 0, + /* generate_eid */ false, /* retain_oid */ false); + fragment_id = + bl::try_handle_all([&loader]() { return loader->LoadFragment(); }, + [](const vineyard::GSError& e) { + LOG(FATAL) << e.error_msg; + return 0; + }, + [](const bl::error_info& unmatched) { + LOG(FATAL) << "Unmatched error " << unmatched; + return 0; + }); + } + + LOG(INFO) << "[worker-" << comm_spec.worker_id() + << "] loaded graph to vineyard ... " << fragment_id; + LOG(INFO) << "peek memory: " << vineyard::get_peak_rss_pretty() << std::endl; + + MPI_Barrier(comm_spec.comm()); + return fragment_id; +} + +template +std::shared_ptr ProjectGraph(std::shared_ptr fragment, + int v_label = 0, int v_prop = -1, + int e_label = 0, int e_prop = -1) { + // v_prop is grape::EmptyType, e_prop is grape::EmptyType + LOG(INFO) << "start project ... memory = " << vineyard::get_rss_pretty() + << ", peak = " << vineyard::get_peak_rss_pretty(); + auto projected_fragment = + PROJECT_FRAG_T::Project(fragment, v_label, v_prop, e_label, e_prop); + LOG(INFO) << "finish project ... memory = " << vineyard::get_rss_pretty() + << ", peak = " << vineyard::get_peak_rss_pretty(); + return projected_fragment; +} + +template +void DoQuery(const grape::CommSpec& comm_spec, std::shared_ptr fragment, + const std::string& out_prefix, Args... args) { + auto app = std::make_shared(); + auto worker = APP_T::CreateWorker(app, fragment); + auto spec = grape::DefaultParallelEngineSpec(); + worker->Init(comm_spec, spec); + worker->Query(std::forward(args)...); + + std::string output_path = + grape::GetResultFilename(out_prefix, fragment->fid()); + + std::ofstream ostream; + ostream.open(output_path); + worker->Output(ostream); + ostream.close(); + + worker->Finalize(); + VLOG(1) << "Worker-" << comm_spec.worker_id() << " finished"; + LOG(INFO) << "finish running application ... memory = " + << vineyard::get_rss_pretty() + << ", peak = " << vineyard::get_peak_rss_pretty(); +} + +template +void RunPropertyApp(std::shared_ptr fragment, + const grape::CommSpec& comm_spec, + const std::string& out_prefix, const std::string& name) { + if (name == "wcc_property") { + using AppType = gs::WCCProperty; + DoQuery(comm_spec, fragment, out_prefix); + } else if (name == "sssp_property") { + using AppType = gs::SSSPProperty; + DoQuery(comm_spec, fragment, out_prefix, + FLAGS_sssp_source); + } else if (name == "wcc_auto_property") { + using AppType = gs::AutoWCCProperty; + DoQuery(comm_spec, fragment, out_prefix); + } else if (name == "sssp_auto_property") { + using AppType = gs::AutoSSSPProperty; + DoQuery(comm_spec, fragment, out_prefix, + FLAGS_sssp_source); + } else if (name == "lpa_u2i_property") { + using AppType = gs::LPAU2I; + DoQuery(comm_spec, fragment, out_prefix); + } +} + +template +void RunProjectedApp(std::shared_ptr fragment, + const grape::CommSpec& comm_spec, + const std::string& out_prefix, const std::string& name) { + if (name == "sssp_projected") { + using PROJECTED_FRAG_T = + gs::ArrowProjectedFragment; + auto projected = + ProjectGraph(fragment, 0, -1, 0, 2); + using AppType = grape::SSSPOpt; + DoQuery(comm_spec, projected, out_prefix, + FLAGS_sssp_source); + } else { + using PROJECTED_FRAG_T = + gs::ArrowProjectedFragment; + auto projected = + ProjectGraph(fragment, 0, -1, 0, -1); + if (name == "wcc_projected") { + using AppType = grape::WCCOpt; + DoQuery(comm_spec, projected, out_prefix); + } else if (name == "cdlp_projected") { + // TODO(siyuan): uncomment once latest libgrape-lite is released. + using AppType = grape::CDLPOpt; + DoQuery(comm_spec, projected, out_prefix, + FLAGS_max_round); + } else if (name == "bfs_projected") { + using AppType = grape::BFSOpt; + DoQuery(comm_spec, projected, + out_prefix, FLAGS_bfs_source); + } else if (name == "lcc_projected") { + using AppType = grape::LCCOpt; + DoQuery(comm_spec, projected, out_prefix); + } else if (name == "pagerank_projected") { + using AppType = grape::PageRankOpt; + DoQuery( + comm_spec, projected, out_prefix, FLAGS_pagerank_delta, + FLAGS_max_round); + } else if (name == "wcc_auto_projected") { + using AppType = grape::WCCAuto; + DoQuery(comm_spec, projected, out_prefix); + } + } +} + +/** + * @brief Run application in batch mode + * @example ./grape_engine -batch_mode -vineyard_socket /tmp/vineyard.sock + * -efile p2p-31.e#label=e#src_label=v#dst_label=v#delimiter=' ' -vfile + * p2p-31.v#label=v#delimiter=' ' -application wcc -out_prefix ret + */ +void RunApp() { + std::string ipc_socket = FLAGS_vineyard_socket; + + std::vector efiles; + boost::split(efiles, FLAGS_efile, boost::is_any_of(",")); + + std::vector vfiles; + boost::split(vfiles, FLAGS_vfile, boost::is_any_of(",")); + + bool directed = FLAGS_directed; + std::string app_name = FLAGS_application; + + std::vector available_apps = { + "wcc_property", "sssp_property", "wcc_auto_property", + "sssp_auto_property", "lpa_u2i_property", "wcc_projected", + "cdlp_projected", "bfs_projected", "lcc_projected", + "pagerank_projected", "wcc_auto_projected", "sssp_projected"}; + + if (std::find(available_apps.begin(), available_apps.end(), app_name) == + available_apps.end()) { + LOG(FATAL) << "Application " << app_name << " is not supported."; + } + { + grape::CommSpec comm_spec; + comm_spec.Init(MPI_COMM_WORLD); + + vineyard::Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + LOG(INFO) << "Connected to IPCServer: " << ipc_socket; + + auto fragment_id = + LoadPropertyGraph(comm_spec, client, efiles, vfiles, directed); + + std::shared_ptr fragment = + std::dynamic_pointer_cast(client.GetObject(fragment_id)); + + RunPropertyApp(fragment, comm_spec, FLAGS_out_prefix, FLAGS_application); + RunProjectedApp(fragment, comm_spec, FLAGS_out_prefix, FLAGS_application); + MPI_Barrier(comm_spec.comm()); + } +} + +} // namespace gs + +template class gs::ArrowProjectedFragment; +template class gs::ArrowProjectedFragment; diff --git a/analytical_engine/core/applications.h b/analytical_engine/core/applications.h new file mode 100644 index 000000000000..27d31dc64a3d --- /dev/null +++ b/analytical_engine/core/applications.h @@ -0,0 +1,73 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_CORE_APPLICATIONS_H_ +#define ANALYTICAL_ENGINE_CORE_APPLICATIONS_H_ + +#include +#include +#include +#include +#include + +#include "common/util/uuid.h" +#include "gflags/gflags.h" +#include "gflags/gflags_declare.h" +#include "glog/logging.h" + +#include "grape/grape.h" +#include "grape/util.h" +#include "vineyard/client/client.h" +#include "vineyard/graph/fragment/arrow_fragment.h" + +#include "core/fragment/arrow_projected_fragment.h" +#include "core/loader/arrow_fragment_loader.h" + +namespace gs { + +template +std::shared_ptr LoadSimpleGraph(const std::string& efile, + const std::string& vfile, + const grape::CommSpec& comm_spec); + +template +vineyard::ObjectID LoadPropertyGraph(const grape::CommSpec& comm_spec, + vineyard::Client& client, + const std::vector& efiles, + const std::vector& vfiles, + int directed); + +template +std::shared_ptr ProjectGraph(std::shared_ptr fragment); + +template +void DoQuery(const grape::CommSpec& comm_spec, std::shared_ptr fragment, + const std::string& out_prefix, Args... args); + +template +void RunProjectedApp(std::shared_ptr fragment, + const grape::CommSpec& comm_spec, + const std::string& out_prefix, const std::string& name); + +template +void RunPropertyApp(std::shared_ptr fragment, + const grape::CommSpec& comm_spec, + const std::string& out_prefix, const std::string& name); +std::vector prepareSamplingPathPattern(const std::string& path_pattern); + +void RunApp(); +} // namespace gs + +#endif // ANALYTICAL_ENGINE_CORE_APPLICATIONS_H_ diff --git a/analytical_engine/core/flags.cc b/analytical_engine/core/flags.cc index 598d594f9e43..1c2cc0f62b55 100644 --- a/analytical_engine/core/flags.cc +++ b/analytical_engine/core/flags.cc @@ -29,3 +29,68 @@ DEFINE_string(etcd_endpoint, "http://127.0.0.1:2379", "Etcd endpoint that will be used to launch vineyardd"); DEFINE_string(dag_file, "", "Engine reads serialized dag proto from dag_file."); + +DEFINE_bool(batch_mode, false, "Whether to run in batch mode."); + +DEFINE_string(application, "", "application name"); +DEFINE_string(efile, "", "edge file"); +DEFINE_string(vfile, "", "vertex file"); +DEFINE_string(out_prefix, "", "output directory of results"); +DEFINE_bool(directed, false, "input graph is directed or not."); + +/* flags related to specific applications. */ +DEFINE_int64(bfs_source, 0, "source vertex of bfs."); +DEFINE_string(degree_centrality_type, "both", + "the type of degree centrality, available options: in/out/bot"); + +DEFINE_double(eigenvector_centrality_tolerance, 1e-6, "Error tolerance."); +DEFINE_int32(eigenvector_centrality_max_round, 100, + "Maximum number of iterations."); + +DEFINE_double(hits_tolerance, 0.001, "Error tolerance."); +DEFINE_int32(hits_max_round, 100, "Maximum number of iterations."); +DEFINE_bool(hits_normalized, true, + "Normalize results by the sum of all of the values."); + +DEFINE_int32(kcore_k, 3, "The order of the core"); + +DEFINE_int32(kshell_k, 3, "The order of the shell"); + +DEFINE_double(katz_centrality_alpha, 0.1, "Attenuation factor"); +DEFINE_double(katz_centrality_beta, 1.0, + "Weight attributed to the immediate neighborhood."); +DEFINE_double(katz_centrality_tolerance, 1e-06, "Error tolerance."); +DEFINE_int32(katz_centrality_max_round, 100, "Maximum number of iterations."); +DEFINE_bool(katz_centrality_normalized, true, + "Normalize results by the sum of all of the values."); + +DEFINE_int64(sssp_source, 0, "Source vertex of sssp."); +DEFINE_int64(sssp_target, 1, "Target vertex of sssp."); +DEFINE_bool( + sssp_weight, true, + "If true, use edge attribute as weight. Otherwise, all use weight 1."); + +DEFINE_int32(bfs_depth_limit, 10, "Specify the maximum search depth."); +DEFINE_string(bfs_output_format, "edges", + "Output format[edges/predecessors/successors]."); + +DEFINE_bool(segmented_partition, true, + "whether to use segmented partitioning."); +DEFINE_bool(rebalance, true, "whether to rebalance graph after loading."); +DEFINE_int32(rebalance_vertex_factor, 0, "vertex factor of rebalancing."); + +DEFINE_bool(serialize, false, "whether to serialize loaded graph."); +DEFINE_bool(deserialize, false, "whether to deserialize graph while loading."); +DEFINE_string(serialization_prefix, "", + "where to load/store the serialization files"); + +DEFINE_int64(dfs_source, 0, "source vertex of dfs."); +DEFINE_string(dfs_format, "edges", "output format of dfs."); + +DEFINE_int32(vr_num_of_nodes, 0, "nodes number of voterank."); + +DEFINE_string(sampling_path_pattern, "", "sampling path pattern"); +DEFINE_bool(run_projected, false, "run projected"); + +DEFINE_double(pagerank_delta, 0.85, "damping factor of pagerank"); +DEFINE_int32(max_round, 10, "maximum round"); diff --git a/analytical_engine/core/flags.h b/analytical_engine/core/flags.h index 3efbc3b1b3a1..5bfc905816a4 100644 --- a/analytical_engine/core/flags.h +++ b/analytical_engine/core/flags.h @@ -28,4 +28,59 @@ DECLARE_string(vineyard_socket); DECLARE_string(vineyard_shared_mem); DECLARE_string(etcd_endpoint); +DECLARE_bool(batch_mode); + +// applications +DECLARE_string(application); +DECLARE_bool(directed); +DECLARE_string(efile); +DECLARE_string(vfile); +DECLARE_string(out_prefix); + +DECLARE_int64(bfs_source); +DECLARE_string(degree_centrality_type); + +DECLARE_double(eigenvector_centrality_tolerance); +DECLARE_int32(eigenvector_centrality_max_round); + +DECLARE_double(hits_tolerance); +DECLARE_int32(hits_max_round); +DECLARE_bool(hits_normalized); + +DECLARE_int32(kcore_k); + +DECLARE_int32(kshell_k); + +DECLARE_double(katz_centrality_alpha); +DECLARE_double(katz_centrality_beta); +DECLARE_double(katz_centrality_tolerance); +DECLARE_int32(katz_centrality_max_round); +DECLARE_bool(katz_centrality_normalized); + +DECLARE_int64(sssp_source); +DECLARE_int64(sssp_target); +DECLARE_bool(sssp_weight); + +DECLARE_int64(bfs_source); +DECLARE_int32(bfs_depth_limit); +DECLARE_string(bfs_output_format); + +DECLARE_bool(segmented_partition); +DECLARE_bool(rebalance); +DECLARE_int32(rebalance_vertex_factor); + +DECLARE_bool(serialize); +DECLARE_bool(deserialize); +DECLARE_string(serialization_prefix); + +DECLARE_int64(dfs_source); +DECLARE_string(dfs_format); + +DECLARE_int32(vr_num_of_nodes); + +DECLARE_string(sampling_path_pattern); +DECLARE_bool(run_projected); + +DECLARE_double(pagerank_delta); +DECLARE_int32(max_round); #endif // ANALYTICAL_ENGINE_CORE_FLAGS_H_ diff --git a/analytical_engine/core/grape_engine.cc b/analytical_engine/core/grape_engine.cc index bccc59b099aa..2b3bfd8cf3bd 100644 --- a/analytical_engine/core/grape_engine.cc +++ b/analytical_engine/core/grape_engine.cc @@ -36,6 +36,7 @@ #include "grape/worker/comm_spec.h" #include "grpcpp/server.h" +#include "core/applications.h" #include "core/context/i_context.h" #include "core/error.h" #include "core/flags.h" @@ -202,14 +203,15 @@ int main(int argc, char* argv[]) { // set RDMAV_FORK_SAFE to avoid the openmpi error, see also #2812 setenv("RDMAV_FORK_SAFE", "1", 0); - int exit_code = 0; // not output any log to stderr by glog. FLAGS_stderrthreshold = std::numeric_limits::max(); grape::gflags::SetUsageMessage( "Usage: mpiexec [mpi_opts] ./grape_engine [grape_opts].\n" " Example: mpiexec -n 1 ./grape_engine -host 0.0.0.0 -port 50001"); - if (argc == 2 && strcmp(argv[1], "-h") == 0) { + if (argc == 2 && + (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0 || + strcmp(argv[1], "-help") == 0)) { grape::gflags::ShowUsageWithFlagsRestrict(argv[0], "core/flags"); exit(0); } @@ -224,25 +226,17 @@ int main(int argc, char* argv[]) { // Init MPI grape::InitMPIComm(); - auto host = FLAGS_host; - auto port = FLAGS_port; - auto dag_file = FLAGS_dag_file; - - if (dag_file.empty()) { - grape_engine_ptr = std::make_shared(host, port); + if (FLAGS_batch_mode) { + gs::RunApp(); } else { - grape_engine_ptr = std::make_shared(dag_file); - } - - InstallSignalHandlers(&master_signal_handler); - grape_engine_ptr->Start(); - - if (!dag_file.empty()) { - exit_code = grape_engine_ptr->RunDAGFile(); + grape_engine_ptr = + std::make_shared(FLAGS_host, FLAGS_port); + InstallSignalHandlers(&master_signal_handler); + grape_engine_ptr->Start(); } grape::FinalizeMPIComm(); ////////////////////////////////////////////// google::ShutdownGoogleLogging(); - return exit_code; + return 0; } diff --git a/analytical_engine/core/worker/default_property_worker.h b/analytical_engine/core/worker/default_property_worker.h index 28f89b9ffcc1..5fd89a601507 100644 --- a/analytical_engine/core/worker/default_property_worker.h +++ b/analytical_engine/core/worker/default_property_worker.h @@ -95,8 +95,6 @@ class DefaultPropertyWorker { context_->Init(messages_, std::forward(args)...); - int round = 0; - messages_.Start(); messages_.StartARound(); @@ -114,7 +112,6 @@ class DefaultPropertyWorker { while (!messages_.ToTerminate()) { t = grape::GetCurrentTime(); - round++; messages_.StartARound(); app_->IncEval(graph, *context_, messages_); diff --git a/analytical_engine/core/worker/default_worker.h b/analytical_engine/core/worker/default_worker.h index 4f70d763a347..fc506c3178ca 100644 --- a/analytical_engine/core/worker/default_worker.h +++ b/analytical_engine/core/worker/default_worker.h @@ -95,8 +95,6 @@ class DefaultWorker { context_->Init(messages_, std::forward(args)...); - int round = 0; - messages_.Start(); messages_.StartARound(); @@ -114,7 +112,6 @@ class DefaultWorker { while (!messages_.ToTerminate()) { t = grape::GetCurrentTime(); - round++; messages_.StartARound(); app_->IncEval(graph, *context_, messages_); diff --git a/analytical_engine/core/worker/parallel_property_worker.h b/analytical_engine/core/worker/parallel_property_worker.h index 3455732a24ea..54f3b81ca822 100644 --- a/analytical_engine/core/worker/parallel_property_worker.h +++ b/analytical_engine/core/worker/parallel_property_worker.h @@ -95,8 +95,6 @@ class ParallelPropertyWorker { context_->Init(messages_, std::forward(args)...); - int round = 0; - messages_.Start(); messages_.StartARound(); @@ -114,7 +112,6 @@ class ParallelPropertyWorker { while (!messages_.ToTerminate()) { t = grape::GetCurrentTime(); - round++; messages_.StartARound(); app_->IncEval(*graph_, *context_, messages_); diff --git a/analytical_engine/core/worker/property_auto_worker.h b/analytical_engine/core/worker/property_auto_worker.h index 11e71c948da0..07b5ce0b19d4 100644 --- a/analytical_engine/core/worker/property_auto_worker.h +++ b/analytical_engine/core/worker/property_auto_worker.h @@ -90,8 +90,6 @@ class PropertyAutoWorker { context_->Init(messages_, std::forward(args)...); - int round = 0; - messages_.Start(); messages_.StartARound(); @@ -109,7 +107,6 @@ class PropertyAutoWorker { while (!messages_.ToTerminate()) { t = grape::GetCurrentTime(); - round++; messages_.StartARound(); app_->IncEval(graph, *context_); diff --git a/analytical_engine/test/run_app.cc b/analytical_engine/test/run_app.cc index 7cdf4bdbe732..92ba536d2b6e 100644 --- a/analytical_engine/test/run_app.cc +++ b/analytical_engine/test/run_app.cc @@ -19,67 +19,37 @@ limitations under the License. #include #include -DEFINE_string(application, "", "application name"); -DEFINE_string(efile, "", "edge file"); -DEFINE_string(vfile, "", "vertex file"); -DEFINE_string(out_prefix, "", "output directory of results"); DEFINE_string(datasource, "local", "datasource type, available options: local, odps, oss"); DEFINE_string(jobid, "", "jobid, only used in LDBC graphanalytics."); -DEFINE_bool(directed, false, "input graph is directed or not."); +DEFINE_int32(app_concurrency, -1, "concurrency of the application."); -/* flags related to specific applications. */ -DEFINE_int64(bfs_source, 0, "source vertex of bfs."); -DEFINE_string(degree_centrality_type, "both", - "the type of degree centrality, available options: in/out/bot"); - -DEFINE_double(eigenvector_centrality_tolerance, 1e-6, "Error tolerance."); -DEFINE_int32(eigenvector_centrality_max_round, 100, - "Maximum number of iterations."); - -DEFINE_double(hits_tolerance, 0.001, "Error tolerance."); -DEFINE_int32(hits_max_round, 100, "Maximum number of iterations."); -DEFINE_bool(hits_normalized, true, - "Normalize results by the sum of all of the values."); - -DEFINE_int32(kcore_k, 3, "The order of the core"); - -DEFINE_int32(kshell_k, 3, "The order of the shell"); - -DEFINE_double(katz_centrality_alpha, 0.1, "Attenuation factor"); -DEFINE_double(katz_centrality_beta, 1.0, - "Weight attributed to the immediate neighborhood."); -DEFINE_double(katz_centrality_tolerance, 1e-06, "Error tolerance."); -DEFINE_int32(katz_centrality_max_round, 100, "Maximum number of iterations."); -DEFINE_bool(katz_centrality_normalized, true, - "Normalize results by the sum of all of the values."); - -DEFINE_int64(sssp_source, 0, "Source vertex of sssp."); -DEFINE_int64(sssp_target, 1, "Target vertex of sssp."); -DEFINE_bool( - sssp_weight, true, - "If true, use edge attribute as weight. Otherwise, all use weight 1."); - -DEFINE_int32(bfs_depth_limit, 10, "Specify the maximum search depth."); -DEFINE_string(bfs_output_format, "edges", - "Output format[edges/predecessors/successors]."); - -DEFINE_bool(segmented_partition, true, - "whether to use segmented partitioning."); -DEFINE_bool(rebalance, true, "whether to rebalance graph after loading."); -DEFINE_int32(rebalance_vertex_factor, 0, "vertex factor of rebalancing."); - -DEFINE_bool(serialize, false, "whether to serialize loaded graph."); -DEFINE_bool(deserialize, false, "whether to deserialize graph while loading."); -DEFINE_string(serialization_prefix, "", - "where to load/store the serialization files"); +void Init() { + if (FLAGS_out_prefix.empty()) { + LOG(FATAL) << "Please assign an output prefix."; + } + if (FLAGS_deserialize && FLAGS_serialization_prefix.empty()) { + LOG(FATAL) << "Please assign a serialization prefix."; + } else if (FLAGS_vfile.empty() || FLAGS_efile.empty()) { + LOG(FATAL) << "Please assign input vertex/edge files."; + } -DEFINE_int32(app_concurrency, -1, "concurrency of application"); + if (access(FLAGS_out_prefix.c_str(), 0) != 0) { + mkdir(FLAGS_out_prefix.c_str(), 0777); + } -DEFINE_int64(dfs_source, 0, "source vertex of dfs."); -DEFINE_string(dfs_format, "edges", "output format of dfs."); + grape::InitMPIComm(); + grape::CommSpec comm_spec; + comm_spec.Init(MPI_COMM_WORLD); + if (comm_spec.worker_id() == grape::kCoordinatorRank) { + VLOG(1) << "Workers of libgrape-lite initialized."; + } +} -DEFINE_int32(vr_num_of_nodes, 0, "nodes number of voterank."); +void Finalize() { + grape::FinalizeMPIComm(); + VLOG(1) << "Workers finalized."; +} int main(int argc, char* argv[]) { FLAGS_stderrthreshold = 0; @@ -96,7 +66,7 @@ int main(int argc, char* argv[]) { google::InitGoogleLogging("analytical_apps"); google::InstallFailureSignalHandler(); - gs::Init(); + Init(); std::string name = FLAGS_application; if (name.find("sssp") != std::string::npos || @@ -118,7 +88,7 @@ int main(int argc, char* argv[]) { } } - gs::Finalize(); + Finalize(); google::ShutdownGoogleLogging(); } diff --git a/analytical_engine/test/run_app.h b/analytical_engine/test/run_app.h index 07147d7fc326..277c732a6fbb 100644 --- a/analytical_engine/test/run_app.h +++ b/analytical_engine/test/run_app.h @@ -71,86 +71,12 @@ limitations under the License. #include "core/flags.h" #include "core/fragment/dynamic_fragment.h" -DECLARE_string(application); -DECLARE_bool(directed); -DECLARE_string(efile); -DECLARE_string(vfile); -DECLARE_string(out_prefix); DECLARE_string(datasource); DECLARE_string(jobid); - -DECLARE_int64(bfs_source); -DECLARE_string(degree_centrality_type); - -DECLARE_double(eigenvector_centrality_tolerance); -DECLARE_int32(eigenvector_centrality_max_round); - -DECLARE_double(hits_tolerance); -DECLARE_int32(hits_max_round); -DECLARE_bool(hits_normalized); - -DECLARE_int32(kcore_k); - -DECLARE_int32(kshell_k); - -DECLARE_double(katz_centrality_alpha); -DECLARE_double(katz_centrality_beta); -DECLARE_double(katz_centrality_tolerance); -DECLARE_int32(katz_centrality_max_round); -DECLARE_bool(katz_centrality_normalized); - -DECLARE_int64(sssp_source); -DECLARE_int64(sssp_target); -DECLARE_bool(sssp_weight); - -DECLARE_int64(bfs_source); -DECLARE_int32(bfs_depth_limit); -DECLARE_string(bfs_output_format); - -DECLARE_bool(segmented_partition); -DECLARE_bool(rebalance); -DECLARE_int32(rebalance_vertex_factor); - -DECLARE_bool(serialize); -DECLARE_bool(deserialize); -DECLARE_string(serialization_prefix); - DECLARE_int32(app_concurrency); -DECLARE_int64(dfs_source); -DECLARE_string(dfs_format); - -DECLARE_int32(vr_num_of_nodes); - namespace gs { -void Init() { - if (FLAGS_out_prefix.empty()) { - LOG(FATAL) << "Please assign an output prefix."; - } - if (FLAGS_deserialize && FLAGS_serialization_prefix.empty()) { - LOG(FATAL) << "Please assign a serialization prefix."; - } else if (FLAGS_vfile.empty() || FLAGS_efile.empty()) { - LOG(FATAL) << "Please assign input vertex/edge files."; - } - - if (access(FLAGS_out_prefix.c_str(), 0) != 0) { - mkdir(FLAGS_out_prefix.c_str(), 0777); - } - - grape::InitMPIComm(); - grape::CommSpec comm_spec; - comm_spec.Init(MPI_COMM_WORLD); - if (comm_spec.worker_id() == grape::kCoordinatorRank) { - VLOG(1) << "Workers of libgrape-lite initialized."; - } -} - -void Finalize() { - grape::FinalizeMPIComm(); - VLOG(1) << "Workers finalized."; -} - template void CreateAndQuery(const grape::CommSpec& comm_spec, const std::string efile, const std::string& vfile, const std::string& out_prefix, diff --git a/analytical_engine/test/run_vy_app.cc b/analytical_engine/test/run_vy_app.cc index be9d8ad1fb34..ffa96cf0c745 100644 --- a/analytical_engine/test/run_vy_app.cc +++ b/analytical_engine/test/run_vy_app.cc @@ -40,6 +40,7 @@ #include "sssp/sssp_opt.h" #include "wcc/wcc_opt.h" +#include "core/applications.h" #include "core/fragment/arrow_projected_fragment.h" #include "core/loader/arrow_fragment_loader.h" @@ -50,89 +51,8 @@ using vid_t = vineyard::property_graph_types::VID_TYPE; using FragmentType = vineyard::ArrowFragment; -using ProjectedFragmentType = - gs::ArrowProjectedFragment; -using ProjectedFragmentType2 = - gs::ArrowProjectedFragment; - -void RunWCCProperty(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = gs::WCCProperty; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunSSSPProperty(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = gs::SSSPProperty; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(4); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunLPAU2I(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = gs::LPAU2I; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunSamplingPath(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix, - const std::string& path_pattern) { - using AppType = gs::SamplingPath; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - +std::vector prepareSamplingPathPattern(const std::string& path_pattern) { std::vector label_id_seq; - std::string delimiter = "-"; auto start = 0U; auto end = path_pattern.find(delimiter); @@ -143,193 +63,22 @@ void RunSamplingPath(std::shared_ptr fragment, end = path_pattern.find(delimiter, start); } label_id_seq.push_back(std::stoi(path_pattern.substr(start, end))); - - worker->Init(comm_spec, spec); - worker->Query(label_id_seq, 10000000); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); + return label_id_seq; } -void RunAutoWCCProperty(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = gs::AutoWCCProperty; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunAutoSSSPProperty(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = gs::AutoSSSPProperty; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(4); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedWCC(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - using AppType = grape::WCCOpt; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedSSSP(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - // using AppType = grape::SSSPProjected; - // using AppType = grape::SSSPAuto; - using AppType = grape::SSSPOpt; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(4); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedCDLP(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - // TODO(siyuan): uncomment once latest libgrape-lite is released. - // using AppType = grape::CDLPOpt; - using AppType = grape::CDLP; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(10); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedBFS(std::shared_ptr fragment, +void RunSamplingPath(std::shared_ptr fragment, const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - // using AppType = grape::BFSAuto; - using AppType = grape::BFSOpt; + const std::string& out_prefix, + const std::string& path_pattern) { + using AppType = gs::SamplingPath; auto app = std::make_shared(); auto worker = AppType::CreateWorker(app, fragment); auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - worker->Query(4); + std::vector label_id_seq = prepareSamplingPathPattern(path_pattern); - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedLCC(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - // using AppType = grape::LCCAuto; - using AppType = grape::LCCOpt; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); worker->Init(comm_spec, spec); - - worker->Query(); - - std::ofstream ostream; - std::string output_path = - grape::GetResultFilename(out_prefix, fragment->fid()); - - ostream.open(output_path); - worker->Output(ostream); - ostream.close(); - - worker->Finalize(); -} - -void RunProjectedPR(std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& out_prefix) { - // using AppType = grape::PageRankAuto; - using AppType = grape::PageRankOpt; - // using AppType = grape::PageRankLocalParallel; - auto app = std::make_shared(); - auto worker = AppType::CreateWorker(app, fragment); - auto spec = grape::DefaultParallelEngineSpec(); - worker->Init(comm_spec, spec); - - worker->Query(0.85, 10); + worker->Query(label_id_seq, 10000000); std::ofstream ostream; std::string output_path = @@ -349,62 +98,34 @@ void Run(vineyard::Client& client, const grape::CommSpec& comm_spec, std::dynamic_pointer_cast(client.GetObject(id)); if (app_name == "lpa") { - RunLPAU2I(fragment, comm_spec, "./outputs_lpau2i/"); + gs::RunPropertyApp(fragment, comm_spec, "./outputs_lpau2i/", "lpa_u2i"); } else if (app_name == "sampling_path") { RunSamplingPath(fragment, comm_spec, "./outputs_sampling_path/", path_pattern); } else { if (!run_projected) { - RunWCCProperty(fragment, comm_spec, "./outputs_wcc/"); - RunSSSPProperty(fragment, comm_spec, "./outputs_sssp/"); - - RunAutoWCCProperty(fragment, comm_spec, "./outputs_auto_wcc/"); - RunAutoSSSPProperty(fragment, comm_spec, "./outputs_auto_sssp/"); + gs::RunPropertyApp(fragment, comm_spec, "./outputs_wcc/", "wcc_property"); + gs::RunPropertyApp(fragment, comm_spec, "./outputs_sssp/", + "sssp_property"); + + gs::RunPropertyApp(fragment, comm_spec, "./outputs_auto_wcc/", + "wcc_auto_property"); + gs::RunPropertyApp(fragment, comm_spec, "./outputs_auto_sssp/", + "sssp_auto_property"); } else { - { - // v_prop is grape::EmptyType, e_prop is grape::EmptyType - LOG(INFO) << "start project ... memory = " << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - std::shared_ptr projected_fragment = - ProjectedFragmentType::Project(fragment, 0, -1, 0, -1); - LOG(INFO) << "finish project ... memory = " - << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - - RunProjectedWCC(projected_fragment, comm_spec, - "./output_projected_wcc/"); - RunProjectedCDLP(projected_fragment, comm_spec, - "./output_projected_cdlp/"); - RunProjectedLCC(projected_fragment, comm_spec, - "./output_projected_lcc/"); - RunProjectedPR(projected_fragment, comm_spec, - "./output_projected_pagerank/"); - RunProjectedBFS(projected_fragment, comm_spec, - "./output_projected_bfs/"); - - LOG(INFO) << "finish running application ... memory = " - << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - } - - { - // v_prop is grape::EmptyType, e_prop is int64_t - - LOG(INFO) << "start project ... memory = " << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - std::shared_ptr projected_fragment = - ProjectedFragmentType2::Project(fragment, 0, -1, 0, 2); - LOG(INFO) << "finish project ... memory = " - << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - - RunProjectedSSSP(projected_fragment, comm_spec, - "./output_projected_sssp/"); - - LOG(INFO) << "finish running application ... memory = " - << vineyard::get_rss_pretty() - << ", peak = " << vineyard::get_peak_rss_pretty(); - } + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_wcc/", + "wcc_projected"); + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_sssp/", + "sssp_projected"); + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_cdlp/", + "cdlp_projected"); + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_bfs/", + "bfs_projected"); + + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_lcc/", + "lcc_projected"); + gs::RunProjectedApp(fragment, comm_spec, "./output_projected_pagerank/", + "pagerank_projected"); } } } @@ -457,29 +178,8 @@ int main(int argc, char** argv) { LOG(INFO) << "Connected to IPCServer: " << ipc_socket; - vineyard::ObjectID fragment_id = vineyard::InvalidObjectID(); - { - auto loader = std::make_unique>( - client, comm_spec, efiles, vfiles, directed != 0, - /* generate_eid */ false, /* retain_oid */ false); - fragment_id = - bl::try_handle_all([&loader]() { return loader->LoadFragment(); }, - [](const vineyard::GSError& e) { - LOG(FATAL) << e.error_msg; - return 0; - }, - [](const bl::error_info& unmatched) { - LOG(FATAL) << "Unmatched error " << unmatched; - return 0; - }); - } - - LOG(INFO) << "[worker-" << comm_spec.worker_id() - << "] loaded graph to vineyard ... " << fragment_id; - LOG(INFO) << "peek memory: " << vineyard::get_peak_rss_pretty() - << std::endl; - - MPI_Barrier(comm_spec.comm()); + auto fragment_id = + gs::LoadPropertyGraph(comm_spec, client, efiles, vfiles, directed != 0); Run(client, comm_spec, fragment_id, run_projected, app_name, path_pattern); LOG(INFO) << "memory: " << vineyard::get_rss_pretty() @@ -491,8 +191,3 @@ int main(int argc, char** argv) { grape::FinalizeMPIComm(); return 0; } - -template class gs::ArrowProjectedFragment; -template class gs::ArrowProjectedFragment; diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 826a05f777b7..dbbbb2f448f5 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -75,7 +75,6 @@ data: # Pegasus config pegasus.hosts=PEGASUS_HOSTS # this is used by ir server to compute the server size pegasus.worker.num={{ .Values.pegasus.worker.num }} - pegasus.timeout={{ .Values.pegasus.timeout }} pegasus.batch.size=1024 pegasus.output.capacity=16 diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 6c35a11f5d01..d9e7b92d8938 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -508,7 +508,6 @@ auth: pegasus: worker: num: 1 - timeout: 240000 secondary: enabled: false diff --git a/docs/deployment/gae_in_job_mode.md b/docs/deployment/gae_in_job_mode.md new file mode 100644 index 000000000000..1ba2b829f500 --- /dev/null +++ b/docs/deployment/gae_in_job_mode.md @@ -0,0 +1,13 @@ +# Run GAE in Job Mode + +Instead of started as a server, GAE could also be executed in batch style, we leverage [mpi-operator](https://github.com/kubeflow/mpi-operator?tab=readme-ov-file) to launch a GAE job. + +Usage: + +Make sure mpi-operator is installed, then create a Job by using this [yaml yaml](https://github.com/alibaba/GraphScope/blob/main/examples/analytical/mpi-operator/wcc.yaml). + +```bash +kubectl create -f https://github.com/alibaba/GraphScope/blob/main/examples/analytical/mpi-operator/wcc.yaml +``` + +You can see a master pod and several worker pod has been created, and GAE automatically exits after the job completed. diff --git a/examples/analytical/mpi-operator/wcc.yaml b/examples/analytical/mpi-operator/wcc.yaml index e7653e6cebcd..f6ff45baafa2 100644 --- a/examples/analytical/mpi-operator/wcc.yaml +++ b/examples/analytical/mpi-operator/wcc.yaml @@ -14,14 +14,14 @@ spec: template: spec: containers: - - image: registry.cn-hongkong.aliyuncs.com/graphscope/analytical:latest + - image: graphscope/analytical:latest env: - name: APPLICATION - value: wcc + value: wcc_property - name: VFILE - value: /mnt/data/gstest/p2p-31.v + value: /mnt/data/gstest/p2p-31.v#label=v#delimiter=' ' - name: EFILE - value: /mnt/data/gstest/p2p-31.e + value: /mnt/data/gstest/p2p-31.e#label=e#src_label=v#dst_label=v#delimiter=' ' - name: OUTPUT value: /mnt/data/output - name: OMPI_MCA_btl_vader_single_copy_mechanism @@ -47,9 +47,8 @@ spec: - -c - | set +ex - nohup vineyardd & mpirun -x PATH -x LD_LIBRARY_PATH -x VINEYARD_IPC_SOCKET -n 2 \ - /opt/graphscope/bin/run_app --vfile ${VFILE} --efile ${EFILE} --application ${APPLICATION} --out_prefix ${OUTPUT} + /opt/graphscope/bin/grape_engine -batch_mode -vineyard_socket ${VINEYARD_IPC_SOCKET} -vfile "${VFILE}" -efile "${EFILE}" -application ${APPLICATION} --out_prefix ${OUTPUT} resources: limits: cpu: 1 @@ -59,15 +58,24 @@ spec: template: spec: containers: - - image: registry.cn-hongkong.aliyuncs.com/graphscope/analytical:latest + - image: graphscope/analytical:latest name: worker imagePullPolicy: IfNotPresent securityContext: runAsUser: 1001 - command: - - /usr/sbin/sshd + env: + - name: VINEYARD_IPC_SOCKET + value: "/tmp/vineyard.sock" + # command: + # - /usr/sbin/sshd args: - - -De + # - -De + - /bin/bash + - -c + - | + set +ex + nohup vineyardd -socket ${VINEYARD_IPC_SOCKET} & + /usr/sbin/sshd -De resources: limits: cpu: 1 diff --git a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties index 70def2ce3acf..4371c712e868 100644 --- a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties +++ b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties @@ -1,7 +1,6 @@ ## pegasus service config # a.k.a. thread num pegasus.worker.num = THREADS_PER_WORKER -pegasus.timeout = 240000 pegasus.batch.size = 1024 pegasus.output.capacity = 16 diff --git a/k8s/dockerfiles/analytical.Dockerfile b/k8s/dockerfiles/analytical.Dockerfile index 9cc0614861aa..3d7a8376f2f8 100644 --- a/k8s/dockerfiles/analytical.Dockerfile +++ b/k8s/dockerfiles/analytical.Dockerfile @@ -38,7 +38,7 @@ COPY --from=builder /home/graphscope/install /opt/graphscope/ RUN mkdir -p /tmp/gs && (mv /opt/graphscope/builtin /tmp/gs/builtin || true) && chown -R graphscope:graphscope /tmp/gs RUN chmod +x /opt/graphscope/bin/* -# RUN apt-get update -y && apt-get install openssh-server dnsutils -y && rm -rf /var/lib/apt/lists/* +RUN apt-get update -y && apt-get install openssh-server dnsutils -y && rm -rf /var/lib/apt/lists/* RUN mkdir -p /var/run/sshd