Skip to content

Commit

Permalink
tickets: taskexec got job block tickets info
Browse files Browse the repository at this point in the history
On task start/stop render updates all parent pools tickets usage.

References #459.
  • Loading branch information
timurhai committed Feb 21, 2020
1 parent 0a59f6e commit 40ad2e1
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 43 deletions.
2 changes: 1 addition & 1 deletion afanasy/src/include/afversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@

#pragma once

static const int AFVERSION = 66;
static const int AFVERSION = 67;

1 change: 1 addition & 0 deletions afanasy/src/libafanasy/blockdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ TaskExec *BlockData::genTask(int num) const
m_job_id, m_block_num, m_flags, num);

taskExec->m_custom_data_block = m_custom_data;
taskExec->m_tickets = m_tickets;

if (isNotNumeric())
{
Expand Down
2 changes: 2 additions & 0 deletions afanasy/src/libafanasy/taskexec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ void TaskExec::v_readwrite( Msg * msg)
rw_int32_t ( m_block_num, msg);
rw_int32_t ( m_task_num, msg);

rw_IntMap(m_tickets, msg);

break;

default:
Expand Down
2 changes: 1 addition & 1 deletion afanasy/src/libafanasy/taskexec.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class TaskExec : public Af
/// Read or write task in message buffer.
void v_readwrite( Msg * msg);

std::map<std::string, int32_t> m_tickets;

std::string m_custom_data_task;
std::string m_custom_data_block;
Expand Down Expand Up @@ -237,7 +238,6 @@ class TaskExec : public Af

int64_t m_time_start;


private:
void initDefaults();

Expand Down
34 changes: 34 additions & 0 deletions afanasy/src/server/poolsrv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,40 @@ bool PoolSrv::assignRender(RenderAf * i_render)
return true;
}

void PoolSrv::taskAcuire(const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring)
{
for (auto const& eIt : i_taskexec->m_tickets)
{
std::map<std::string, af::Farm::Tiks>::iterator it = m_tickets_pool.find(eIt.first);
if (it != m_tickets_pool.end())
{
it->second.usage += eIt.second;
if (i_monitoring)
i_monitoring->addEvent(af::Monitor::EVT_pools_change, m_id);
}
}

if (m_parent)
m_parent->taskAcuire(i_taskexec, i_monitoring);
}

void PoolSrv::taskRelease(const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring)
{
for (auto const& eIt : i_taskexec->m_tickets)
{
std::map<std::string, af::Farm::Tiks>::iterator it = m_tickets_pool.find(eIt.first);
if (it != m_tickets_pool.end())
{
it->second.usage -= eIt.second;
if (i_monitoring)
i_monitoring->addEvent(af::Monitor::EVT_pools_change, m_id);
}
}

if (m_parent)
m_parent->taskRelease(i_taskexec, i_monitoring);
}

void PoolSrv::v_refresh(time_t i_currentTime, AfContainer * i_container, MonitorContainer * i_monitoring)
{
bool changed = false;
Expand Down
4 changes: 4 additions & 0 deletions afanasy/src/server/poolsrv.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "../libafanasy/name_af.h"
#include "../libafanasy/pool.h"

#include "afnodefarm.h"
Expand Down Expand Up @@ -53,6 +54,9 @@ class PoolSrv : public af::Pool, public AfNodeFarm

bool assignRender(RenderAf * i_render);

void taskAcuire (const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring);
void taskRelease(const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring);

inline int getHostMaxTasks() const
{ if (m_host_max_tasks < 0 && m_parent) return m_parent->getHostMaxTasks(); else return m_host_max_tasks; }
inline int getHostMaxCapacity() const
Expand Down
41 changes: 21 additions & 20 deletions afanasy/src/server/renderaf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void RenderAf::setTask( af::TaskExec *taskexec, MonitorContainer * monitoring, b
return;
}

addTask( taskexec);
addTask(taskexec, monitoring);

if( monitoring ) monitoring->addEvent( af::Monitor::EVT_renders_change, m_id);

Expand Down Expand Up @@ -655,7 +655,7 @@ void RenderAf::stopTask( int jobid, int blocknum, int tasknum, int number)

void RenderAf::taskFinished( const af::TaskExec * taskexec, MonitorContainer * monitoring)
{
removeTask( taskexec);
removeTask(taskexec, monitoring);

if( taskexec->getNumber())
{
Expand All @@ -673,32 +673,33 @@ void RenderAf::taskFinished( const af::TaskExec * taskexec, MonitorContainer * m
if( monitoring ) monitoring->addEvent( af::Monitor::EVT_renders_change, m_id);
}

void RenderAf::addTask( af::TaskExec * taskexec)
void RenderAf::addTask(af::TaskExec * i_taskexec, MonitorContainer * i_monitoring)
{
// If render was not busy it has become busy now
if( false == isBusy())
if (false == isBusy())
{
setBusy( true);
m_task_start_finish_time = time( NULL);
setBusy(true);
m_task_start_finish_time = time(NULL);
store();
}

#ifdef AFOUTPUT
AF_DEBUG << *taskexec;
AF_DEBUG << *i_taskexec;
#endif

m_tasks.push_back( taskexec);
m_tasks.push_back(i_taskexec);

m_capacity_used += taskexec->getCapResult();
m_capacity_used += i_taskexec->getCapResult();

// Just check capacity:
// Just check capacity.
// It was checked before, when we run canRunOn function
if ((getCapacity() >= 0) && (m_capacity_used > getCapacity()))
AF_ERR << "Capacity_used > max capacity (" << m_capacity_used << " > " << getCapacity() << ")";

//farmTaskAcuire( taskexec->getServiceType());
m_parent->taskAcuire(i_taskexec, i_monitoring);
}

void RenderAf::removeTask( const af::TaskExec * i_exec)
void RenderAf::removeTask(const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring)
{
// Do not set free status here, even if this task was last.
// May it will take another task in this run cycle
Expand All @@ -707,25 +708,25 @@ void RenderAf::removeTask( const af::TaskExec * i_exec)
// As this af::TaskExec will be deleted by TaskRun very soon.

// Remove exec pointer:
for( std::list<af::TaskExec*>::iterator it = m_tasks.begin(); it != m_tasks.end(); it++)
for (std::list<af::TaskExec*>::iterator it = m_tasks.begin(); it != m_tasks.end(); it++)
{
if( *it == i_exec)
if (*it == i_taskexec)
{
it = m_tasks.erase( it);
it = m_tasks.erase(it);
}
}

// Remove exec pointer from events:
m_re.remTaskExec( i_exec);
m_re.remTaskExec(i_taskexec);

if( m_capacity_used < i_exec->getCapResult())
if (m_capacity_used < i_taskexec->getCapResult())
{
AF_ERR << "Capacity_used < getCapResult() (" << m_capacity_used << " < " << i_exec->getCapResult() << ")";
AF_ERR << "Capacity_used < getCapResult() (" << m_capacity_used << " < " << i_taskexec->getCapResult() << ")";
m_capacity_used = 0;
}
else m_capacity_used -= i_exec->getCapResult();
else m_capacity_used -= i_taskexec->getCapResult();

//farmTaskRelease( taskexec->getServiceType());
m_parent->taskRelease(i_taskexec, i_monitoring);
}

void RenderAf::v_refresh( time_t currentTime, AfContainer * pointer, MonitorContainer * monitoring)
Expand Down
4 changes: 2 additions & 2 deletions afanasy/src/server/renderaf.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ class RenderAf: public af::Render, public AfNodeFarm

/// Add the task exec to this render and take over its ownership (meaning
/// one should not free taskexec after having provided it to this method).
void addTask( af::TaskExec * taskexec);
void addTask(af::TaskExec * i_taskexec, MonitorContainer * i_monitoring);
/// Remove the task exec from this render and give back its ownership to the
/// caller.
void removeTask( const af::TaskExec * taskexec);
void removeTask(const af::TaskExec * i_taskexec, MonitorContainer * i_monitoring);

/// Stop tasks.
void ejectTasks( JobContainer * jobs, MonitorContainer * monitoring, uint32_t upstatus, const std::string * i_keeptasks_username = NULL);
Expand Down
6 changes: 4 additions & 2 deletions afanasy/src/watch/itemfarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ void ItemFarm::drawTickets(QPainter * i_painter, int i_x, int i_y, int i_w, int
tkp_w += tk_rect.width();
}

i_painter->drawText(i_x+5+tkp_w, i_y, i_w-10, 15, Qt::AlignLeft | Qt::AlignTop, QString("x%1").arg(tkp_it.value().count), &tk_rect);
i_painter->drawText(i_x+5+tkp_w, i_y, i_w-10, 15, Qt::AlignLeft | Qt::AlignTop,
QString("x%1 / %2").arg(tkp_it.value().count).arg(tkp_it.value().usage), &tk_rect);
tkp_w += tk_rect.width() + 1;

tkp_w += 8;
Expand All @@ -160,7 +161,8 @@ void ItemFarm::drawTickets(QPainter * i_painter, int i_x, int i_y, int i_w, int
tkh_it.next();

QRect tk_rect;
i_painter->drawText(i_x+5, i_y, i_w-10-tkh_w, 15, Qt::AlignRight | Qt::AlignTop, QString("x%1").arg(tkh_it.value().count), &tk_rect);
i_painter->drawText(i_x+5, i_y, i_w-10-tkh_w, 15, Qt::AlignRight | Qt::AlignTop,
QString("x%1 / %2").arg(tkh_it.value().count).arg(tkh_it.value().usage), &tk_rect);
tkh_w += tk_rect.width() + 1;

const QPixmap * icon = Watch::getTicketIcon(tkh_it.key());
Expand Down
8 changes: 4 additions & 4 deletions afanasy/src/watch/itemjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ void ItemJob::updateInfo(const af::Job * i_job)
m_info_text = "Branch: <b>" + branch + "</b>";
m_info_text += "<br>Username: <b>" + username + "</b>";
m_info_text += "<br>Creation host: <b>" + hostname + "</b>";
m_info_text += "<br>Created at: <b>" + afqt::time2Qstr(time_creation) + "</b>";
m_info_text += "<br>Created: <b>" + afqt::time2Qstr(time_creation) + "</b>";

if (time_started)
m_info_text += "<br>Started at: <b>" + afqt::time2Qstr(time_started) + "</b>";
m_info_text += "<br>Started: <b>" + afqt::time2Qstr(time_started) + "</b>";
if (time_done)
m_info_text += "<br>Was done at: <b>" + afqt::time2Qstr(time_done) + "</b>";
m_info_text += "<br>Done: <b>" + afqt::time2Qstr(time_done) + "</b>";
else if (time_wait)
m_info_text += "<br>Waiting for: <b>" + afqt::time2Qstr(time_wait) + "</b>";
m_info_text += "<br>Waiting: <b>" + afqt::time2Qstr(time_wait) + "</b>";
}

bool ItemJob::calcHeight()
Expand Down
2 changes: 1 addition & 1 deletion afanasy/src/watch/itempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void ItemPool::updateInfo(af::Pool * i_pool)
m_info_text += QString("Renders Total: <b>%1</b><br>").arg(i_pool->getRendersTotal());

m_info_text += "<br>";
m_info_text += QString("Created at: <b>%1</b>").arg(afqt::time2Qstr(i_pool->getTimeCreation()));
m_info_text += QString("Created: <b>%1</b>").arg(afqt::time2Qstr(i_pool->getTimeCreation()));
}

bool ItemPool::calcHeight()
Expand Down
4 changes: 2 additions & 2 deletions afanasy/src/watch/itemrender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ void ItemRender::v_updateValues(af::Node * i_afnode, int i_msgType)

m_online = render->isOnline();
m_info_text_render += "<br>";
m_info_text_render += "<br>Registered at <b>" + afqt::time2Qstr(m_time_registered) + "</b>";
m_info_text_render += "<br>Registered: <b>" + afqt::time2Qstr(m_time_registered) + "</b>";
if (m_online)
{
m_info_text_render += "<br>Launched at <b>" + afqt::time2Qstr(m_time_launched) + "</b>";
m_info_text_render += "<br>Launched: <b>" + afqt::time2Qstr(m_time_launched) + "</b>";
}
else
{
Expand Down
12 changes: 7 additions & 5 deletions afanasy/src/watch/paramspanel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ void ParamSeparator::paintEvent(QPaintEvent * event)
painter.drawLine(0, 0, width(), 0);
}

ParamTicket::ParamTicket(const QString & i_name, int i_count):
ParamTicket::ParamTicket(const QString & i_name, int i_count, int i_usage):
m_name(i_name)
{
QHBoxLayout * layout = new QHBoxLayout(this);
Expand Down Expand Up @@ -402,17 +402,19 @@ ParamTicket::ParamTicket(const QString & i_name, int i_count):
layout->addWidget(btn);
connect(btn, SIGNAL(clicked()), this, SLOT(slot_Edit()));

update(i_count);
update(i_count, i_usage);
}

ParamTicket::~ParamTicket()
{
}

void ParamTicket::update(int i_count)
void ParamTicket::update(int i_count, int i_usage)
{
m_count = i_count;
m_count_label->setText(QString(" x %1").arg(m_count));
if (i_usage == -1)
m_count_label->setText(QString(" x %1").arg(i_count));
else
m_count_label->setText(QString(" x %1 / %2").arg(i_count).arg(i_usage));
}

void ParamTicket::slot_Edit()
Expand Down
6 changes: 3 additions & 3 deletions afanasy/src/watch/paramspanel.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ class ParamTicket: public QWidget
{
Q_OBJECT
public:
ParamTicket(const QString & i_name, int i_count);
ParamTicket(const QString & i_name, int i_count, int i_usage = -1);
~ParamTicket();

void update(int i_count);
void update(int i_count, int i_usage = -1);

signals:
void sig_Edit(QString i_name);
Expand All @@ -146,6 +146,6 @@ protected slots:

protected:
QString m_name;

QLabel * m_count_label;
int m_count;
};
4 changes: 2 additions & 2 deletions afanasy/src/watch/paramspanelfarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ void ParamsPanelFarm::updateTickets(
}
else
{
wIt.value()->update(tIt.value().count);
wIt.value()->update(tIt.value().count, tIt.value().usage);
wIt++;
}
}
Expand All @@ -310,7 +310,7 @@ void ParamsPanelFarm::updateTickets(
QMap<QString, ParamTicket*>::const_iterator wIt = i_widgets.find(tIt.key());
if (wIt == i_widgets.end())
{
ParamTicket * tw = new ParamTicket(tIt.key(), tIt.value().count);
ParamTicket * tw = new ParamTicket(tIt.key(), tIt.value().count, tIt.value().usage);
i_widgets[tIt.key()] = tw;
i_layout->addWidget(tw);
if (i_host)
Expand Down

0 comments on commit 40ad2e1

Please sign in to comment.