Skip to content

Commit

Permalink
afserver: adding block/task to an existing job: action message answer…
Browse files Browse the repository at this point in the history
… contains new block/task ids.

Now if there is Action::answer_kind is not set
(an empty string, used to be 'error','log','info'),
Action::answer data will be written as a raw string in 'object' field.
This was server can return a custom JSON object containing some data as an answer on action.

Needed on appending new tasks on a dynamic Houdini PDG job.

References #451, #436.
  • Loading branch information
timurhai committed Jul 6, 2021
1 parent 516539e commit 9fc34e4
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 13 deletions.
1 change: 1 addition & 0 deletions afanasy/src/libafanasy/name_af.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ namespace af
af::Msg * jsonMsg( const std::string & i_type, const std::string & i_name, char * i_data, int i_size);
af::Msg * jsonMsgInfo( const std::string & i_kind, const std::string & i_info);
af::Msg * jsonMsgError( const std::string & i_str);
af::Msg * jsonMsgObject(const std::string & i_str);
af::Msg * jsonMsgStatus( bool i_error, const std::string & i_type, const std::string & i_msg);

void jsonActionStart( std::ostringstream & i_str, const std::string & i_type,
Expand Down
5 changes: 5 additions & 0 deletions afanasy/src/libafanasy/name_afjson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ af::Msg * af::jsonMsgError( const std::string & i_str)
{
return af::jsonMsgInfo( "error", i_str);
}
af::Msg * af::jsonMsgObject(const std::string & i_str)
{
std::string str = "{\"object\":" + i_str + "}";
return af::jsonMsg( str);
}
af::Msg * af::jsonMsgStatus( bool i_success, const std::string & i_type, const std::string & i_msg)
{
std::string str = "{\"status\":";
Expand Down
2 changes: 1 addition & 1 deletion afanasy/src/server/afcontainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ af::Msg * AfContainer::action(Action & i_action, const af::Msg * i_msg)
{
if (i_action.answer_kind.empty())
{
return af::jsonMsgInfo("log", i_action.answer);
return af::jsonMsgObject(i_action.answer);
}
else
{
Expand Down
33 changes: 27 additions & 6 deletions afanasy/src/server/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ bool Block::action( Action & i_action)
}
else if (type == "append_tasks")
{
if (appendTasks(operation))
if (appendTasks(i_action, operation))
{
// Add a log line so that store() is called at the job level (a bit hacky)
i_action.log += "\nBlock['" + m_data->getName() + "']: Append tasks";
Expand Down Expand Up @@ -744,23 +744,33 @@ int Block::blackListWeight() const
return weight;
}

bool Block::appendTasks(const JSON &operation)
bool Block::appendTasks(Action & i_action, const JSON & i_operation)
{
if (m_job->getId() == AFJOB::SYSJOB_ID)
{
i_action.answer_kind = "error";
i_action.answer = "Appending system job is not allowed.";
return false;
}

if (m_data->isNumeric())
{
appendJobLog("Appending tasks to numeric block is not allowed");
i_action.answer_kind = "error";
i_action.answer = "Appending tasks to numeric block is not allowed.";
return false;
}
const JSON &tasks = operation["tasks"];

const JSON &tasks = i_operation["tasks"];
if (!tasks.IsArray())
{
appendJobLog("Operation \"append_tasks\" requires data in an array named \"tasks\"");
i_action.answer_kind = "error";
i_action.answer = "Operation requires tasks array.";
return false;
}

// Allocate new tasks
int old_tasks_num = m_data->getTasksNum();
m_data->jsonReadAndAppendTasks(operation);
m_data->jsonReadAndAppendTasks(i_operation);
m_jobprogress->appendTasks(m_data->getBlockNum(), m_data->getTasksNum() - old_tasks_num);
allocateTasks(old_tasks_num); // allocate only new tasks

Expand All @@ -769,5 +779,16 @@ bool Block::appendTasks(const JSON &operation)

// Set new tasks ready
m_job->checkStatesOnAppend();

// Return new tasks ids:
i_action.answer = "{\"task_ids\":[";
for (int i = old_tasks_num; i < m_data->getTasksNum(); i++)
{
if (i != old_tasks_num)
i_action.answer += ",";
i_action.answer += af::itos(i);
}
i_action.answer += "]}";

return true;
}
2 changes: 1 addition & 1 deletion afanasy/src/server/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ class Block
int getRenderCount(RenderAf * i_render) const;
void remRenderCount(RenderAf * i_render);

bool appendTasks(const JSON &tasks_json);
bool appendTasks(Action & i_action, const JSON & i_operation);
};

40 changes: 36 additions & 4 deletions afanasy/src/server/jobaf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,8 @@ void JobAf::v_action( Action & i_action)
}
else if( type == "append_blocks")
{
appendBlocks(operation["blocks"]);
appendBlocks(i_action, operation);
return;
}
else
{
Expand Down Expand Up @@ -1651,24 +1652,55 @@ int JobAf::v_calcWeight() const
return weight;
}

void JobAf::appendBlocks( const JSON & i_blocks)
void JobAf::appendBlocks(Action & i_action, const JSON & i_operation)
{
if (m_id == AFJOB::SYSJOB_ID)
{
i_action.answer_kind = "error";
i_action.answer = "Appending system job is not allowed.";
return;
}

const JSON & blocks = i_operation["blocks"];
if (!blocks.IsArray())
{
i_action.answer_kind = "error";
i_action.answer = "Operation requires blocks array.";
return;
}
if (blocks.Size() == 0)
{
i_action.answer_kind = "error";
i_action.answer = "Operation blocks array has zero size.";
return;
}

int old_blocks_num = m_blocks_num;

jsonReadAndAppendBlocks( i_blocks);
if (false == jsonReadAndAppendBlocks(blocks))
{
i_action.answer_kind = "error";
i_action.answer = "Appending blocks failed, see server log for details.";
}

m_progress->reconstruct( this);

// construct new blocks only (reuse the old_blocks_num existing ones)
construct( old_blocks_num);

// initialize
// initialize and constuct new blocks ids in an answer
i_action.answer = "{\"block_ids\":[";
for( int b = old_blocks_num; b < m_blocks_num; b++)
{
m_blocks_data[b]->setJobId( m_id);
m_blocks[b]->storeTasks();
m_blocks[b]->setUser( m_user);

if (b != old_blocks_num)
i_action.answer += ",";
i_action.answer += af::itos(b);
}
i_action.answer += "]}";

checkDepends();
checkStatesOnAppend();
Expand Down
2 changes: 1 addition & 1 deletion afanasy/src/server/jobaf.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class JobAf : public af::Job , public AfNodeSolve
bool checkTryTasksNext();
void resetTryTasksNext();

void appendBlocks( const JSON & i_blocks);
void appendBlocks(Action & i_action, const JSON & i_operation);

af::TaskExec *genTask( RenderAf *render, int block, int task, std::list<int> * blocksIds, MonitorContainer * monitoring);

Expand Down

0 comments on commit 9fc34e4

Please sign in to comment.