Skip to content

Commit

Permalink
started integrating with hadoop actuator
Browse files Browse the repository at this point in the history
  • Loading branch information
pooyanjamshidi committed Aug 15, 2016
1 parent f23b3a2 commit 5510896
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/init.m
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function init()
global exp_name domain options topology config_template config_folder save_folder deployment_service ci_service mon_service exp_budget initial_design polling_time exp_time sleep_time storm summary_folder kafka topic blueprint application storm_ui zookeeper
global exp_name domain options topology config_template config_folder save_folder deployment_service ci_service mon_service exp_budget initial_design polling_time exp_time sleep_time storm summary_folder kafka topic blueprint application storm_ui zookeeper hadoop replication
config % global parameters initialized

% read configuration parameters
Expand Down Expand Up @@ -29,6 +29,7 @@ function init()
sleep_time=runConfig.sleep_time;
topic=runConfig.topic;
blueprint=runConfig.blueprint;
replication=runConfig.replication;

options=params.param_options;
domain=options2domain(options);
Expand All @@ -40,9 +41,10 @@ function init()
storm_ui=services{1,4};
kafka=services{1,5}.URL;
zookeeper=services{1,6};
hadoop=services{1,7};
else
setmcruserdata('application',application_detail);

setmcruserdata('topology',application_detail.name);
setmcruserdata('exp_name',strcat(application_detail.name,'_exp_',num2str(datenum(datetime('now')),'%bu')));

Expand All @@ -57,7 +59,8 @@ function init()
setmcruserdata('sleep_time',runConfig.sleep_time);
setmcruserdata('topic',runConfig.topic);
setmcruserdata('blueprint',runConfig.blueprint);

setmcruserdata('replication',runConfig.replication);

setmcruserdata('options',params.param_options);
setmcruserdata('domain',options2domain(params.param_options));

Expand All @@ -67,7 +70,8 @@ function init()
setmcruserdata('storm',services{1,4}.URL);
setmcruserdata('storm_ui',services{1,4});
setmcruserdata('kafka',services{1,5}.URL);
setmcruserdata('zookeeper',services{1,6});
setmcruserdata('zookeeper',services{1,6});
setmcruserdata('hadoop',services{1,7});
end

% setup the python path (for deployment service)
Expand Down
66 changes: 66 additions & 0 deletions src/integrated/hadoop/deploy_hadoop_mapreduce_job.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
function [status]=deploy_hadoop_mapreduce_job(setting)
% Submit a mapreduce job with a specific configuration setting.

% Authors: Pooyan Jamshidi ([email protected])
% The code is released under the FreeBSD License.
% Copyright (C) 2016 Pooyan Jamshidi, Imperial College London

global hadoop application options replication

% this make the code mcr compatible for global vars
if ~isdeployed
hadoop_=hadoop;
application_=application;
options_=options;
replication_=replication;
else
hadoop_=getmcruserdata('hadoop');
application_=getmcruserdata('application');
options_=getmcruserdata('options');
replication_=getmcruserdata('replication');
end
extrastr = ' ';

% clean hdfs for the job
if strcmp(status,'deployed')
undeploy_storm_topology(deployment_id);
end

% prepare the connection
ssh2_conn = ssh2_config(hadoop_.ip,hadoop_.username,hadoop_.password);
%ssh2_conn.remote_file_mode = 0777; % change to a more readable permission
%ssh2_conn = ssh2_command(ssh2_conn,['sudo chmod 777 ' storm_ui_.storm_client 'conf']);

% copy the updated config file to the storm UI VM
%ssh2_conn = scp_simple_put(storm_ui_.ip,storm_ui_.username,storm_ui_.password,topology_config_name,[storm_ui_.storm_client 'conf'],dirpath);
% check whether the current version of the application is in the remote
% server, note new versions should be reflected via input configuration
% file via application_.jar_file
[filethere] = check_remote_file(ssh2_conn, application_.jar_file);
if ~filethere
ssh2_conn = ssh2_command(ssh2_conn,['wget ' application_.jar_path]);
end

% prepare the command to be executed, we assume storm cli is installed and
% added to the path
config_str='';
for i=1:length(options_)
config_str=[config_str extrastr options_{1,i} '=' num2str(setting(i)) ';'];
end

cli='java -jar CommandLineTool.jar';
cmd=[cli extrastr '-jar' extrastr application_.jar_file extrastr '-params' extrastr config_str extrastr '-class' extrastr application_.class extrastr '-args' extrastr application_.args extrastr '-applicationReplication' extrastr replication_];
[ssh2_conn, response] = ssh2_command(ssh2_conn,cmd);

ssh2_conn = ssh2_close(ssh2_conn); %will call ssh2.m and run command and then close connection

end

function [filethere] = check_remote_file(ssh2_conn, fn)
[ssh2_conn, response] = ssh2_command(ssh2_conn,['[ -f ' fn ' ] && echo 1 || echo 0']);
if response{1} == '1'
filethere = 1;
else
filethere = 0;
end
end

0 comments on commit 5510896

Please sign in to comment.