Skip to content

Commit

Permalink
integrating hadoop actuator
Browse files Browse the repository at this point in the history
  • Loading branch information
pooyanjamshidi committed Aug 20, 2016
1 parent 5510896 commit e99d8d8
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 73 deletions.
90 changes: 17 additions & 73 deletions src/integrated/f.m
Original file line number Diff line number Diff line change
@@ -1,87 +1,31 @@
function [latency,throughput]=f(x)
% this deploy the application with specific settings, waits until the data
% are retrived then send back performance data associated with the
% application under specific setting, in other words this is the response
% function
function [response_1,response_2]=f(x)
% this is a function factory for technology specific experiments
% based on the type of application, it will call specific functions

% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global exp_name options sleep_time storm storm_ui
global application options

% this make the code mcr compatible for global vars
if ~isdeployed
exp_name_=exp_name;
application_=application;
options_=options;
sleep_time_=sleep_time;
storm_ui_=storm_ui;
else
exp_name_=getmcruserdata('exp_name');
application_=getmcruserdata('application');
options_=getmcruserdata('options');
sleep_time_=getmcruserdata('sleep_time');
storm_ui_=getmcruserdata('storm_ui');
end

setting=domain2option(options_,x);

% check whether a deployment exists
[nimbus_ip,deployment_id,blueprint_id,status]=get_container_details();

if ~isempty(nimbus_ip)
if ~isdeployed % update storm ip address
storm_ui_.ip=nimbus_ip;
storm = ['http://' nimbus_ip ':8080'];
else
setmcruserdata('nimbus_ip',nimbus_ip);
setmcruserdata('storm',['http://' nimbus_ip ':8080']);
end
%[updated_config_name]=update_config(setting); I decided to pass
%setting directly to deploy_storm_topology instead of passing the
%updated config file
try
% deploy the application under a specific setting
[deployment_id,status]=deploy_storm_topology(setting);
catch ME
warning(ME.message);
end
else
% deploy the application under a specific setting
updated_blueprint_name=update_blueprint(setting);
status='preparing'; deployment_id='';
try
[deployment_id,blueprint_id,status]=deploy(updated_blueprint_name);
catch ME
switch ME.identifier
case 'MATLAB:webservices:CopyContentToDataStreamError'
warning(ME.message);
end
end

end

% uncomment this when deployment service supports deploying monitoring agents
%start_monitoring_topology(deployment_id);

if is_deployed(deployment_id) && strcmp(status,'deployed') % verifying deployment though storm API and deployment service status
fprintf('%s is deployed with setting [%s] \n', deployment_id,num2str(setting));
[expdata_csv_name]=update_expdata(deployment_id);
%undeploy(blueprint_id);
undeploy_storm_topology(deployment_id);
pause(sleep_time_/1000); % convert to seconds and wait for the experiment to finish and retireve the performance data
if ~isempty(expdata_csv_name)
is_summarized=summarize_expdata(expdata_csv_name,setting); % this also update a csv file
else
is_summarized=0;
end
if is_summarized
[latency,throughput]=retrieve_data(exp_name_);
fprintf('the average latency and throughput of %s are respectively: %d, %d \n', deployment_id,latency,throughput);
else
latency=-1;
throughput=-1;
end
else % -1 means there was some problem...
latency=-1;
throughput=-1;
end

switch application_.type
case 'storm'
[latency,throughput]=f_storm(setting);
response_1=latency;
response_2=throughput;
case 'hadoop'
[job_completion_time]=f_hadoop(x);
response_1=job_completion_time;
response_2=-1; % throughput is not applicable for batch applications
end
40 changes: 40 additions & 0 deletions src/integrated/hadoop/f_hadoop.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
function [job_completion_time]=f_hadoop(setting)
% this deploy the hadoop applications with specific settings, waits until
% the job is finished and data are retrived then send back performance data
% associated with the application under specific setting, in other words
% this is the response function

% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global application exp_name
if ~isdeployed
exp_name_=exp_name;
application_=application;
else
exp_name_=getmcruserdata('exp_name');
application_=getmcruserdata('application');
end

try
% deploy the job under a specific setting
[status]=deploy_hadoop_mapreduce_job(setting);
catch ME
warning(ME.message);
end
expdata_csv_name=strcat(application_.name,'_metrics_',num2str(datenum(datetime('now')),'%bu'),'.csv');

if ~isempty(expdata_csv_name)
is_summarized=summarize_expdata_hadoop(expdata_csv_name,setting); % this also update a csv file
else
is_summarized=0;
end
if is_summarized
[job_completion_time]=retrieve_data_hadoop(exp_name_);
fprintf('the average job completion time of %s is: %d \n', application_.name,job_completion_time);
else
job_completion_time=-1;
end

end
27 changes: 27 additions & 0 deletions src/integrated/hadoop/retrieve_data_hadoop.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
function [job_completion_time]=retrieve_data_hadoop(exp_name)
% this retrieves the latest end-to-end data from the performance repository
% (csv file)
% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global summary_folder
if ~isdeployed
dirpath=summary_folder;
else
dirpath=getmcruserdata('summary_folder');
end
filename=strcat(dirpath,exp_name,'.csv');

if exist(filename, 'file')
lastrow=number_of_rows(filename);
thiscsv=csvread(filename,lastrow-1,0);
% I assume the end column is job_completion_time
job_completion_time=thiscsv(end);
else
% File does not exist.
warningMessage = sprintf('Warning: file does not exist:\n%s', filename);
warning(warningMessage);
end

end
49 changes: 49 additions & 0 deletions src/integrated/hadoop/summarize_expdata_hadoop.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
function [status]=summarize_expdata_hadoop(expdata_csv_name,setting)
% this calculates the mean throughput and latency for the experiment run with a specific
% configuration. status==1 successful, 0 unsuccessful

% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global exp_name save_folder summary_folder hadoop
if ~isdeployed
exp_name_=exp_name;
save_folder_=save_folder;
summary_folder_=summary_folder;
hadoop_=hadoop;
else
exp_name_ = getmcruserdata('exp_name');
save_folder_=getmcruserdata('save_folder');
summary_folder_=getmcruserdata('summary_folder');
hadoop_=getmcruserdata('hadoop');
end

ssh2_conn = scp_simple_get(hadoop_.ip,hadoop_.username,hadoop_.password,'data.csv',save_folder_);
ssh2_conn = ssh2_close(ssh2_conn); %will call ssh2.m and run command and then close connection
movefile([save_folder_ 'data.csv'],[save_folder_ expdata_csv_name]);

summary=[];

filename=[save_folder_ expdata_csv_name];
thiscsv=csvread(filename,firstrow,0);

% if instead of mean percentile required replace it with prctile(X,p)
if ~isempty(thiscsv)
job_completion_time=mean(thiscsv(:,end));
summary=[setting job_completion_time];
end

% writing data to the performance data repository
if ~isdir(summary_folder_)
mkdir(summary_folder_);
end

if ~isempty(summary)
% we use dlmwrite in order to use -append feature
dlmwrite(strcat(summary_folder_,exp_name_,'.csv'),summary,'-append');
status=1;
else
status=0;
end
end
82 changes: 82 additions & 0 deletions src/integrated/storm/f_storm.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
function [latency,throughput]=f_storm(setting)
% this deploy the application with specific settings, waits until the data
% are retrived then send back performance data associated with the
% application under specific setting, in other words this is the response
% function

% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global exp_name sleep_time storm storm_ui
if ~isdeployed
exp_name_=exp_name;
sleep_time_=sleep_time;
storm_ui_=storm_ui;
else
exp_name_=getmcruserdata('exp_name');
sleep_time_=getmcruserdata('sleep_time');
storm_ui_=getmcruserdata('storm_ui');
end

% check whether a deployment exists
[nimbus_ip,deployment_id,blueprint_id,status]=get_container_details();

if ~isempty(nimbus_ip)
if ~isdeployed % update storm ip address
storm_ui_.ip=nimbus_ip;
storm = ['http://' nimbus_ip ':8080'];
else
setmcruserdata('nimbus_ip',nimbus_ip);
setmcruserdata('storm',['http://' nimbus_ip ':8080']);
end
%[updated_config_name]=update_config(setting); I decided to pass
%setting directly to deploy_storm_topology instead of passing the
%updated config file
try
% deploy the application under a specific setting
[deployment_id,status]=deploy_storm_topology(setting);
catch ME
warning(ME.message);
end
else
% deploy the application under a specific setting
updated_blueprint_name=update_blueprint(setting);
status='preparing'; deployment_id='';
try
[deployment_id,blueprint_id,status]=deploy(updated_blueprint_name);
catch ME
switch ME.identifier
case 'MATLAB:webservices:CopyContentToDataStreamError'
warning(ME.message);
end
end

end

% uncomment this when deployment service supports deploying monitoring agents
%start_monitoring_topology(deployment_id);

if is_deployed(deployment_id) && strcmp(status,'deployed') % verifying deployment though storm API and deployment service status
fprintf('%s is deployed with setting [%s] \n', deployment_id,num2str(setting));
[expdata_csv_name]=update_expdata(deployment_id);
%undeploy(blueprint_id);
undeploy_storm_topology(deployment_id);
pause(sleep_time_/1000); % convert to seconds and wait for the experiment to finish and retireve the performance data
if ~isempty(expdata_csv_name)
is_summarized=summarize_expdata(expdata_csv_name,setting); % this also update a csv file
else
is_summarized=0;
end
if is_summarized
[latency,throughput]=retrieve_data(exp_name_);
fprintf('the average latency and throughput of %s are respectively: %d, %d \n', deployment_id,latency,throughput);
else
latency=-1;
throughput=-1;
end
else % -1 means there was some problem...
latency=-1;
throughput=-1;
end
end

0 comments on commit e99d8d8

Please sign in to comment.