Skip to content
This repository has been archived by the owner on May 28, 2019. It is now read-only.

Commit

Permalink
Merge pull request #380 from May2016/galaxy3
Browse files Browse the repository at this point in the history
add generate format json
  • Loading branch information
fxsjy committed May 27, 2016
2 parents 3b9094e + 560bcda commit 9aa127e
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 133 deletions.
1 change: 1 addition & 0 deletions galaxy.flag
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
--nexus_addr=
--nexus_root=/galaxy3
--resman_path=/resman
--appmaster=/appmaster
91 changes: 66 additions & 25 deletions src/client/galaxy_job_action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <thread.h>
#include <tprinter.h>
#include "galaxy_job_action.h"

Expand Down Expand Up @@ -183,7 +184,9 @@ bool JobAction::ListJobs() {
::baidu::galaxy::sdk::ListContainerGroupsResponse resman_response;
resman_request.user = user_;
std::map<std::string, ::baidu::galaxy::sdk::ContainerGroupStatistics> containers;

bool ret = resman_->ListContainerGroups(resman_request, &resman_response);

if (ret) {
for (uint32_t i = 0; i < resman_response.containers.size(); ++i) {
containers[resman_response.containers[i].id] = resman_response.containers[i];
Expand All @@ -198,7 +201,6 @@ bool JobAction::ListJobs() {
::baidu::galaxy::sdk::ListJobsRequest request;
::baidu::galaxy::sdk::ListJobsResponse response;
request.user = user_;

ret = app_master_->ListJobs(request, &response);
if (ret) {
baidu::common::TPrinter jobs(12);
Expand All @@ -213,6 +215,7 @@ bool JobAction::ListJobs() {
std::string scpu;
std::string smem;
std::string svolums;

std::map<std::string, ::baidu::galaxy::sdk::ContainerGroupStatistics>::iterator it
= containers.find(response.jobs[i].jobid);
if (it != containers.end()) {
Expand Down Expand Up @@ -295,6 +298,7 @@ bool JobAction::ListJobs() {
printf("List job failed for reason %s:%s\n",
StringStatus(response.error_code.status).c_str(), response.error_code.reason.c_str());
}

return ret;
}

Expand Down Expand Up @@ -382,10 +386,11 @@ bool JobAction::ShowJob(const std::string& jobid) {
printf("%s\n", desc_data_volums.ToString().c_str());

printf("job description pod task infomation\n");
::baidu::common::TPrinter desc_tasks(7);
//desc_tasks.AddRow(10, "", "id", "cpu(cores/excess)", "memory(cores)", "tcp_throt", "blkio", "ports(name/port)", "exe_package", "data_package", "services");
desc_tasks.AddRow(7, "", "id", "cpu(cores/excess)", "memory(size/excess)", "tcp_throt(r/re/s/se)", "blkio", "ports(name/port)");
for (uint32_t i = 0; i < response.job.desc.pod.tasks.size(); ++i) {
printf("=========================================================\n");
printf("job description pod task [%u] base infomation\n", i);
::baidu::common::TPrinter desc_task(7);
desc_task.AddRow(7, "", "id", "cpu(cores/excess)", "memory(size/excess)", "tcp_throt(r/re/s/se)", "blkio", "ports(name/port)");
std::string scpu = ::baidu::common::NumToString(response.job.desc.pod.tasks[i].cpu.milli_core / 1000.0) + "/"
+ StringBool(response.job.desc.pod.tasks[i].cpu.excess);
std::string smem = ::baidu::common::HumanReadableString(response.job.desc.pod.tasks[i].memory.size) + "/"
Expand All @@ -404,7 +409,7 @@ bool JobAction::ShowJob(const std::string& jobid) {
+ response.job.desc.pod.tasks[i].ports[j].port;
//+ response.job.desc.pod.tasks[i].ports[j].real_port;
if (j == 0) {
desc_tasks.AddRow(7, ::baidu::common::NumToString(i).c_str(),
desc_task.AddRow(7, ::baidu::common::NumToString(i).c_str(),
response.job.desc.pod.tasks[i].id.c_str(),
scpu.c_str(),
smem.c_str(),
Expand All @@ -413,7 +418,7 @@ bool JobAction::ShowJob(const std::string& jobid) {
sports.c_str()
);
} else {
desc_tasks.AddRow(7, "",
desc_task.AddRow(7, "",
"",
"",
"",
Expand All @@ -422,9 +427,11 @@ bool JobAction::ShowJob(const std::string& jobid) {
sports.c_str()
);
}

}

if (response.job.desc.pod.tasks[i].ports.size() == 0) {
desc_tasks.AddRow(7, ::baidu::common::NumToString(i).c_str(),
desc_task.AddRow(7, ::baidu::common::NumToString(i).c_str(),
response.job.desc.pod.tasks[i].id.c_str(),
scpu.c_str(),
smem.c_str(),
Expand All @@ -433,26 +440,60 @@ bool JobAction::ShowJob(const std::string& jobid) {
""
);
}
printf("%s\n", desc_tasks.ToString().c_str());

printf("podinfo infomation\n");
::baidu::common::TPrinter pods(7);
pods.AddRow(7, "", "podid", "endpoint", "status", "version", "start_time", "fail_count");
for (uint32_t i = 0; i < response.job.pods.size(); ++i) {
size_t pos = response.job.pods[i].podid.rfind(".");
std::string podid(response.job.pods[i].podid, pos + 1, response.job.pods[i].podid.size()- (pos + 1));
pods.AddRow(7, ::baidu::common::NumToString(i).c_str(),
response.job.pods[i].podid.c_str(),
//response.job.pods[i].jobid.c_str(),
response.job.pods[i].endpoint.c_str(),
StringPodStatus(response.job.pods[i].status).c_str(),
response.job.pods[i].version.c_str(),
FormatDate(response.job.pods[i].start_time).c_str(),
::baidu::common::NumToString(response.job.pods[i].fail_count).c_str()
printf("%s\n", desc_task.ToString().c_str());

printf("job description pod task [%u] exe_package infomation\n", i);
printf("-----------------------------------------------\n");
printf("start_cmd: %s\n", response.job.desc.pod.tasks[i].exe_package.start_cmd.c_str());
printf("stop_cmd: %s\n", response.job.desc.pod.tasks[i].exe_package.stop_cmd.c_str());
printf("dest_path: %s\n", response.job.desc.pod.tasks[i].exe_package.package.dest_path.c_str());
printf("version: %s\n", response.job.desc.pod.tasks[i].exe_package.package.version.c_str());

printf("\njob description pod task [%u] data_package infomation\n", i);
printf("-----------------------------------------------\n");
printf("reload_cmd: %s\n", response.job.desc.pod.tasks[i].data_package.reload_cmd.c_str());
::baidu::common::TPrinter packages(3);
packages.AddRow(3, "", "version", "dest_path");
for (uint32_t j = 0; j < response.job.desc.pod.tasks[i].data_package.packages.size(); ++j) {
packages.AddRow(3, ::baidu::common::NumToString(j).c_str(),
response.job.desc.pod.tasks[i].data_package.packages[j].version.c_str(),
response.job.desc.pod.tasks[i].data_package.packages[j].dest_path.c_str()
);
}
printf("%s\n", packages.ToString().c_str());

printf("job description pod task [%u] services infomation\n", i);
::baidu::common::TPrinter services(4);
services.AddRow(4, "", "name", "port_name", "use_bns");
for (uint32_t j = 0; j < response.job.desc.pod.tasks[i].services.size(); ++j) {
services.AddRow(4, ::baidu::common::NumToString(j).c_str(),
response.job.desc.pod.tasks[i].services[j].service_name.c_str(),
response.job.desc.pod.tasks[i].services[j].port_name.c_str(),
StringBool(response.job.desc.pod.tasks[i].services[j].use_bns).c_str()
);
}
printf("%s\n", services.ToString().c_str());

}

printf("podinfo infomation\n");
::baidu::common::TPrinter pods(8);
pods.AddRow(8, "", "podid", "endpoint", "version", "status", "fail_count", "start_time", "update_time");
for (uint32_t i = 0; i < response.job.pods.size(); ++i) {
size_t pos = response.job.pods[i].podid.rfind(".");
std::string podid(response.job.pods[i].podid, pos + 1, response.job.pods[i].podid.size()- (pos + 1));
pods.AddRow(8, ::baidu::common::NumToString(i).c_str(),
response.job.pods[i].podid.c_str(),
response.job.pods[i].endpoint.c_str(),
response.job.pods[i].version.c_str(),
StringPodStatus(response.job.pods[i].status).c_str(),
::baidu::common::NumToString(response.job.pods[i].fail_count).c_str(),
FormatDate(response.job.pods[i].start_time).c_str(),
FormatDate(response.job.pods[i].update_time).c_str()
);
}
printf("%s\n", pods.ToString().c_str());
}


} else {
printf("Show job failed for reason %s:%s\n", StringStatus(response.error_code.status).c_str(),
Expand All @@ -478,7 +519,7 @@ bool JobAction::ExecuteCmd(const std::string& jobid, const std::string& cmd) {
request.user = user_;
request.jobid = jobid;
request.cmd = cmd;
bool ret = app_master_->ExecuteCmd(request, &response);
bool ret = app_master_->ExecuteCmd(request, &response);
if (ret) {
printf("Execute job %s\n success", jobid.c_str());
} else {
Expand Down
65 changes: 55 additions & 10 deletions src/client/galaxy_job_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
DEFINE_string(f, "", "specify config file");
DEFINE_string(i, "", "specify job id");
DEFINE_string(c, "", "specify cmd");
DEFINE_string(t, "", "specify task num");
DEFINE_string(d, "", "specify data_volums num");
DEFINE_string(p, "", "specify port num");
DEFINE_string(a, "", "specify packages num in data_package");
DEFINE_string(s, "", "specify service num");

DECLARE_string(flagfile);

Expand All @@ -19,12 +24,19 @@ const std::string kGalaxyUsage = "galaxy.\n"
" galaxy list\n"
" galaxy show -i id\n"
" galaxy exec -i id -c cmd\n"
" galaxy json [-t num_task -d num_data_volums -p num_port -a num_packages in data_package -s num_service]\n"
"Optionss: \n"
" -f specify config file, job config file or label config file.\n"
" -c specify cmd.\n"
" -i specify job id.\n";
" -i specify job id.\n"
" -t specify specify task num, default 1.\n"
" -d spicify data_volums num, default 1\n"
" -p specify port num, default 1\n"
" -a specify specify packages num in data_package, default 1\n"
" -s specify specify service num, default 1\n";

int main(int argc, char** argv) {
bool ok = true;
FLAGS_flagfile = "./galaxy.flag";
::google::SetUsageMessage(kGalaxyUsage);
::google::ParseCommandLineFlags(&argc, &argv, true);
Expand All @@ -41,7 +53,7 @@ int main(int argc, char** argv) {
fprintf(stderr, "-f is needed\n");
return -1;
}
return jobAction->SubmitJob(FLAGS_f);
ok = jobAction->SubmitJob(FLAGS_f);
} else if (strcmp(argv[1], "update") == 0) {
if (FLAGS_f.empty()) {
fprintf(stderr, "-f is needed\n");
Expand All @@ -53,29 +65,29 @@ int main(int argc, char** argv) {
return -1;
}

return jobAction->UpdateJob(FLAGS_f, FLAGS_i);
ok = jobAction->UpdateJob(FLAGS_f, FLAGS_i);
} else if (strcmp(argv[1], "remove") == 0) {
if (FLAGS_i.empty()) {
fprintf(stderr, "-i is needed\n");
return -1;
}
return jobAction->RemoveJob(FLAGS_i);
ok = jobAction->RemoveJob(FLAGS_i);

} else if (strcmp(argv[1], "list") == 0) {
return jobAction->ListJobs();
ok = jobAction->ListJobs();
} else if (strcmp(argv[1], "stop") == 0) {
if (FLAGS_i.empty()) {
fprintf(stderr, "-i is needed\n");
return -1;
}

return jobAction->StopJob(FLAGS_i);
ok = jobAction->StopJob(FLAGS_i);
} else if (strcmp(argv[1], "show") == 0) {
if (FLAGS_i.empty()) {
fprintf(stderr, "-i is needed\n");
return -1;
}
return jobAction->ShowJob(FLAGS_i);
ok = jobAction->ShowJob(FLAGS_i);

} else if (strcmp(argv[1], "exec") == 0) {
if (FLAGS_i.empty()) {
Expand All @@ -87,12 +99,45 @@ int main(int argc, char** argv) {
fprintf(stderr, "-c is needed\n");
return -1;
}
return jobAction->ExecuteCmd(FLAGS_i, FLAGS_c);
} else {
ok = jobAction->ExecuteCmd(FLAGS_i, FLAGS_c);
} else if (strcmp(argv[1], "json") == 0) {
int num_tasks = atoi(FLAGS_t.c_str());
if (FLAGS_t.empty()) {
num_tasks = 1;
}

int num_data_volums = atoi(FLAGS_d.c_str());
if (FLAGS_d.empty()) {
num_data_volums =1;
}

int num_ports = atoi(FLAGS_p.c_str());
if (FLAGS_p.empty()) {
num_ports = 1;
}
int num_packages = atoi(FLAGS_a.c_str());
if (FLAGS_a.empty()) {
num_packages = 1;
}
int num_services = atoi(FLAGS_s.c_str());
if (FLAGS_s.empty()) {
num_services = 1;
}
ok = ::baidu::galaxy::client::GenerateJson(num_tasks,
num_data_volums,
num_ports,
num_packages,
num_services
);
}else {
fprintf(stderr, "%s", kGalaxyUsage.c_str());
return -1;
}
return 0;
if (ok) {
return 0;
} else {
return -1;
}
}

/* vim: set ts=4 sw=4 sts=4 tw=100 */
3 changes: 2 additions & 1 deletion src/client/galaxy_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ int ParseTask(const rapidjson::Value& task_json, ::baidu::galaxy::sdk::TaskDescr

time_t timestamp;
time(&timestamp);
task->id = baidu::common::NumToString(timestamp);
//task->id = baidu::common::NumToString(timestamp);

if (!task_json.HasMember("cpu")) {
fprintf(stderr, "cpu is required in task\n");
Expand Down Expand Up @@ -567,6 +567,7 @@ int ParsePod(const rapidjson::Value& pod_json, ::baidu::galaxy::sdk::PodDescript
if (ok != 0) {
break;
}
task.id = ::baidu::common::NumToString((uint32_t)i);
tasks.push_back(task);
}

Expand Down
Loading

0 comments on commit 9aa127e

Please sign in to comment.