Skip to content

Commit

Permalink
Merge pull request #451 from eliemichel/append-tasks
Browse files Browse the repository at this point in the history
[WIP] Add a block/task to an existing job
  • Loading branch information
timurhai authored Mar 3, 2020
2 parents 57bcf77 + 6c86c22 commit 5fc2b07
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 44 deletions.
5 changes: 4 additions & 1 deletion afanasy/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Afanasy binaries
archives
bin
bin

# TODO: Edit cmake lists so that this RelWithDebInfo is created in bin/
service/RelWithDebInfo/
41 changes: 41 additions & 0 deletions afanasy/python/af.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,3 +1221,44 @@ def renderGetLocal(self):
:return:
"""
return self.renderGetList(cgruconfig.VARS['HOSTNAME'])

def appendBlocks(self, jobId, blocks, verbose=False):
"""Append new blocks to an existing job
:param jobId: Id of the job to which blocks are added
:param blocks: list of new Block() objects
:param bool verbose: verbosity toggle
:return: server response
"""
blocks_data = []
for b in blocks:
b.fillTasks()
blocks_data.append(b.data)

self.action = 'action'
self.data['type'] = 'jobs'
self.data['ids'] = [jobId]
self.data['operation'] = {'type': 'append_blocks',
'blocks': blocks_data}
return self._sendRequest(verbose)

def appendTasks(self, jobId, blockId, tasks, verbose=False):
"""Append new tasks to an existing block
:param jobId: Id of the job to which tasks are added
:param blockId: Index of the block to which tasks are added
:param tasks: list of new Task() objects
:param bool verbose: verbosity toggle
:return: server response
"""
tasks_data = []
for t in tasks:
tasks_data.append(t.data)

self.action = 'action'
self.data['type'] = 'jobs'
self.data['ids'] = [jobId]
self.data['block_ids'] = [blockId]
self.data['operation'] = {'type': 'append_tasks',
'tasks': tasks_data}
return self._sendRequest(verbose)
74 changes: 68 additions & 6 deletions afanasy/python/afcmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ class Block:
frames_inc = None
p_tasks_done = None
time_started = None
data = None
tasks = []
full = False

class State:
restart = 'restart'
skip = 'skip'

def __init__(self, data):
def __init__(self, data, full=False):
'''
Constructor
'''
Expand All @@ -59,6 +62,17 @@ def __init__(self, data):
self.frames_inc = data.get('frames_inc')
self.p_tasks_done = data.get('p_tasks_done')
self.time_started = data.get('time_started')
self.data = data
self.full = full
if self.full is True and self.isNumeric() is False:
self.tasks = data.get('tasks', [])

def fillTasks(self):
if self.full is not True:
job = getJob(self.job_id, full=True)
block = job.blocks[self.block_num]
self.data = block.data
self.tasks = block.tasks

def setState(self, state, taskIds=[], verbose=False):
action = 'action'
Expand All @@ -76,6 +90,31 @@ def restart(self, taskIds=[]):
def skip(self, taskIds=[]):
self.setState(self.State.skip, taskIds=taskIds)

def isNumeric(self):
return bool(self.flags >> 0)

def appendTasks(self, tasks, verbose=False):
"""Append new tasks to an existing block
:param tasks: list of new Task() objects
:param bool verbose: verbosity toggle
:return: server response
"""
output = "The block is numeric and cannot have tasks appended"
if self.isNumeric() is False:
tasks_data = []
for t in tasks:
tasks_data.append(t.data)
action = 'action'
data = {'ids': [self.job_id],
'type': 'jobs',
'block_ids': [self.block_num],
'operation': {
'type': 'append_tasks',
'tasks': tasks_data}}
output = _sendRequest(action, data, verbose)
return output


class Job:
'''
Expand All @@ -99,6 +138,8 @@ class Job:
max_running_tasks_per_host = None
depend_mask = ''
p_percentage = 0
data = None
full = False

class State:
restart = 'restart'
Expand All @@ -108,13 +149,15 @@ class State:
skip = 'skip'
delete = 'delete'

def __init__(self, jobId, data=None):
def __init__(self, jobId, data=None, full=False):
'''
Constructor
'''
self.id = jobId
self.blocks = []
self.full = full
if data is not None:
self.data = data
self.fillInfo(data)

def fillInfo(self, data):
Expand All @@ -133,12 +176,12 @@ def fillInfo(self, data):
self.max_running_tasks = data.get('max_running_tasks', -1)
self.max_running_tasks_per_host = data.get('max_running_tasks_per_host', -1)
self.depend_mask = data.get('depend_mask', '')
self.fillBlocks(data['blocks'])
self.fillBlocks(data['blocks'], self.full)

def fillBlocks(self, blocksData):
def fillBlocks(self, blocksData, full):
blocksProgress = 0
for blockData in blocksData:
block = Block(blockData)
block = Block(blockData, full)
if block.p_percentage is not None:
blocksProgress += block.p_percentage
self.blocks.append(block)
Expand Down Expand Up @@ -175,6 +218,25 @@ def getProgress(self, verbose=False,):
return output['job_progress']
return None

def appendBlocks(self, blocks, verbose=False):
"""Append new blocks to an existing job
:param bool verbose: verbosity toggle
:return: server response
"""
action = 'action'
blocks_data = []
for block in blocks:
block.fillTasks()
blocks_data.append(block.data)
data = {'type': 'jobs',
'ids': [self.id],
"operation": {
'type': 'append_blocks',
'blocks': blocks_data}}
output = _sendRequest(action, data, verbose)
return output


class Render:
'''
Expand Down Expand Up @@ -516,7 +578,7 @@ def getJobList(ids=None, full=False, verbose=False):
if output is not None:
if 'jobs' in output:
for jobData in output['jobs']:
job = Job(jobData['id'], jobData)
job = Job(jobData['id'], jobData, full=full)
jobs.append(job)
return jobs

Expand Down
35 changes: 35 additions & 0 deletions afanasy/src/libafanasy/blockdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,41 @@ void BlockData::jsonReadTasks(const JSON &i_object)
}
}

void BlockData::jsonReadAndAppendTasks(const JSON &i_object)
{
// This function is similar to jsonReadTasks but adds new tasks to the block
// instead of overriding the previous ones.

const JSON &tasks = i_object["tasks"];

if (!tasks.IsArray() || tasks.Size() == 0)
return;

TaskData **old_tasks_data = m_tasks_data;
int old_tasks_num = m_tasks_num;

m_tasks_num += tasks.Size();
m_tasks_data = new TaskData *[m_tasks_num];
for (int t = 0; t < m_tasks_num; t++)
{
if (t < old_tasks_num)
{
m_tasks_data[t] = old_tasks_data[t];
continue;
}

m_tasks_data[t] = createTask(tasks[t - old_tasks_num]);
if (m_tasks_data[t] == NULL)
{
AFERROR("BlockData::BlockData: Can not allocate memory for new task.")
break;
}
}

if (NULL != old_tasks_data)
delete [] old_tasks_data;
}

void BlockData::jsonWrite(std::ostringstream &o_str, const std::string &i_datamode) const
{
int type = 0;
Expand Down
1 change: 1 addition & 0 deletions afanasy/src/libafanasy/blockdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class BlockData : public Af
void jsonWrite(std::ostringstream &o_str, const std::string &i_datamode) const;
void jsonWriteTasks(std::ostringstream &o_str) const;
void jsonReadTasks(const JSON &i_object);
void jsonReadAndAppendTasks(const JSON &i_object); ///< Append new tasks from JSON object

/// Generate progress bits info string.
void generateProgressStream(std::ostringstream &o_str) const;
Expand Down
21 changes: 15 additions & 6 deletions afanasy/src/libafanasy/job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,32 +125,41 @@ bool Job::jsonRead( const JSON &i_object, std::string * io_changes)
jr_string("project", m_project, i_object);
jr_string("department", m_department, i_object);

const JSON & blocks = i_object["blocks"];
if( false == blocks.IsArray())
return jsonReadAndAppendBlocks(i_object["blocks"]);
}

bool Job::jsonReadAndAppendBlocks( const JSON &i_blocks)
{
if( false == i_blocks.IsArray())
{
AFERROR("Job::jsonRead: Can't find blocks array.");
return false;
}

m_blocks_num = blocks.Size();
int old_blocks_num = m_blocks_num;
BlockData ** old_blocks_data = m_blocks_data;

m_blocks_num += i_blocks.Size();
if( m_blocks_num < 1 )
{
AFERROR("Job::jsonRead: Blocks array has zero size.");
return false;
}

m_blocks_data = new BlockData*[m_blocks_num];
for( int b = 0; b < m_blocks_num; b++) m_blocks_data[b] = NULL;
for( int b = 0; b < m_blocks_num; b++)
m_blocks_data[b] = b < old_blocks_num ? old_blocks_data[b] : NULL;
if( NULL == old_blocks_data)
delete [] old_blocks_data;
for( int b = old_blocks_num; b < m_blocks_num; b++)
{
m_blocks_data[b] = newBlockData( blocks[b], b);
m_blocks_data[b] = newBlockData( i_blocks[b - old_blocks_num], b);
if( m_blocks_data[b] == NULL)
{
AFERROR("Job::jsonRead: Can not allocate memory for new block.\n");
return false;
}
}

return true;
}

Expand Down
5 changes: 5 additions & 0 deletions afanasy/src/libafanasy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class Job : public Work

void stdOutJobBlocksTasks() const;

protected:
/// Read blocks data and append it to block list
/// (called by jsonRead and also when appending new blocks)
bool jsonReadAndAppendBlocks( const JSON & i_blocks);

protected:
BlockData ** m_blocks_data; ///< Blocks pointer.
int32_t m_blocks_num; ///< Number of blocks in job.
Expand Down
67 changes: 67 additions & 0 deletions afanasy/src/libafanasy/jobprogress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,41 @@ bool JobProgress::construct( Job * job)
return true;
}

bool JobProgress::reconstruct( Job * job)
{
int32_t old_blocks_num = m_blocks_num;
int32_t new_blocks_num = job->getBlocksNum();
if( new_blocks_num == old_blocks_num)
return true;

if( new_blocks_num < 1)
{
AFERRAR("JobProgress::JobProgress: Invalid number if blocks = %d (job name: '%s')", m_blocks_num, job->getName().c_str())
return false;
}

appendBlocks( new_blocks_num - old_blocks_num);

for( int b = old_blocks_num; b < m_blocks_num; b++)
{
const af::BlockData * block = job->getBlock( b);
tasksnum[b] = block->getTasksNum();
if( tasksnum[b] < 1)
{
AFERRAR("JobProgress::JobProgress: Invalid number of tasks = %d (m_job_id=%d,block=%d)", tasksnum[b], job->getId(), b)
return false;
}

if( initTasks( b, tasksnum[b]) == false)
{
AFERRAR("JobProgress::JobProgress: Tasks initalization failed ( block=%d, tasks number=%d).", b, tasksnum[b])
return false;
}
}

return true;
}

JobProgress::JobProgress( Msg * msg)
{
initProperties();
Expand Down Expand Up @@ -80,6 +115,24 @@ bool JobProgress::initBlocks()
return true;
}

void JobProgress::appendBlocks(int numblocks)
{
int32_t old_blocks_num = m_blocks_num;
m_blocks_num += numblocks;

int32_t * old_tasksnum = tasksnum;
tasksnum = new int32_t[m_blocks_num];
for (int b = 0; b < m_blocks_num; b++)
tasksnum[b] = b < old_blocks_num ? old_tasksnum[b] : 0;
if( old_tasksnum != NULL) delete [] old_tasksnum;

TaskProgress ***old_tp = tp;
tp = new TaskProgress **[ m_blocks_num]();
for (int b = 0; b < m_blocks_num; b++)
tp[b] = b < old_blocks_num ? old_tp[b] : 0;
if( old_tp != NULL) delete [] old_tp;
}

bool JobProgress::initTasks( int block, int numtasks)
{
if( numtasks == 0)
Expand All @@ -94,6 +147,20 @@ bool JobProgress::initTasks( int block, int numtasks)
return true;
}

void JobProgress::appendTasks(int block, int numtasks)
{
int32_t old_tasksnum = tasksnum[block];
TaskProgress **old_tp = tp[block];

tasksnum[block] += numtasks;
tp[block] = new TaskProgress*[ tasksnum[block]];

for( int t = 0; t < tasksnum[block]; t++)
tp[block][t] = t < old_tasksnum ? old_tp[t] : newTaskProgress();

if( old_tp != NULL) delete [] old_tp;
}

TaskProgress * JobProgress::newTaskProgress() const
{
return new TaskProgress;
Expand Down
Loading

0 comments on commit 5fc2b07

Please sign in to comment.