Skip to content

Commit

Permalink
Merge pull request #38 from SpiNNakerManchester/sync_on_stop
Browse files Browse the repository at this point in the history
Step 1 towards final.SpiNNTools.5
  • Loading branch information
Luis A. Plana authored Apr 28, 2021
2 parents c41d29b + 1f5909f commit 2114937
Show file tree
Hide file tree
Showing 30 changed files with 1,421 additions and 1,062 deletions.
10 changes: 4 additions & 6 deletions c_code/comms_i.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void i_processQueue (uint unused0, uint unused1)

uint pkt_type = key & SPINN_TYPE_MASK;

// check if data packet,
// process data packet,
if (pkt_type == SPINN_DATA_KEY)
{
// check packet phase and process accordingly
Expand All @@ -96,22 +96,20 @@ void i_processQueue (uint unused0, uint unused1)
}
}

// check if stop packet,
// or process stop packet,
else if (pkt_type == SPINN_STOP_KEY)
{
// stop packet received
i_stop_packet (key);
}

// check if network stop packet,
// or process network stop packet,
else if (pkt_type == SPINN_STPN_KEY)
{
// network stop packet received
i_net_stop_packet (key);
}

#ifdef DEBUG
// report unknown packet type,
// or report unknown packet type,
else
{
stage_done (SPINN_UNXPD_PKT, key);
Expand Down
109 changes: 20 additions & 89 deletions c_code/comms_s.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// enqueue received packet
// (FORWARD, BACKPROP, ldsa, ldst, stop and net_stop types)
// (FORWARD, BACKPROP, lds, stop and net_stop types)
// ------------------------------------------------------------------------
void s_receivePacket (uint key, uint payload)
{
Expand Down Expand Up @@ -77,7 +77,7 @@ void s_processQueue (uint unused0, uint unused1)

uint pkt_type = key & SPINN_TYPE_MASK;

// check if data packet,
// process data packet,
if (pkt_type == SPINN_DATA_KEY)
{
// check packet phase and process accordingly
Expand All @@ -95,36 +95,26 @@ void s_processQueue (uint unused0, uint unused1)
}
}

// check for an LDS "accumulation" packet,
// or process LDS packet,
else if (pkt_type == SPINN_LDSA_KEY)
{
// process LDS "accumulation" packet
s_ldsa_packet (payload);
s_lds_packet (payload);
}

// check for LDS "total" packet,
else if (pkt_type == SPINN_LDST_KEY)
{
// process LDS "total" packet
s_ldst_packet (payload);
}

// check if stop packet,
// or process stop packet,
else if (pkt_type == SPINN_STOP_KEY)
{
// stop packet received
s_stop_packet (key);
}

// check if network stop packet,
// or process network stop packet,
else if (pkt_type == SPINN_STPN_KEY)
{
// network stop packet received
s_net_stop_packet (key);
}

#ifdef DEBUG
// report unknown packet type,
// or report unknown packet type,
else
{
stage_done (SPINN_UNXPD_PKT, key);
Expand Down Expand Up @@ -233,34 +223,34 @@ void s_net_stop_packet (uint key)


// ------------------------------------------------------------------------
// process LDSA packet: accumulate the received partial link delta sums
// process LDS packet: accumulate the received partial link delta sums
// ------------------------------------------------------------------------
void s_ldsa_packet (uint payload)
void s_lds_packet (uint payload)
{
#ifdef DEBUG
lda_recv++;
lds_recv++;
#endif

// add the received value to the total so far,
s_lds_part += (lds_t) payload;

// increment the count of partial link delta sums arrived,
s_ldsa_arrived++;
s_lds_arrived++;

// check whether all the partial sums have arrived
if (s_ldsa_arrived == scfg.ldsa_expected)
if (s_lds_arrived == scfg.lds_expected)
{
// send the result to the first s core
// to give a total across the whole network
if (scfg.is_first_group == 0)
{
while (!spin1_send_mc_packet (ldstKey, s_lds_part, WITH_PAYLOAD));
// broadcast (first subgroup) or relay (all others) lds value
while (!spin1_send_mc_packet (ldsKey, s_lds_part, WITH_PAYLOAD));

#ifdef DEBUG
pkt_sent++;
ldt_sent++;
pkt_sent++;
lds_sent++;
#endif
}

// prepare for next epoch
s_lds_part = 0;
s_lds_arrived = 0;

// access thread semaphore with interrupts disabled
uint cpsr = spin1_int_disable ();
Expand Down Expand Up @@ -293,62 +283,3 @@ void s_ldsa_packet (uint payload)
}
}
// ------------------------------------------------------------------------


// ------------------------------------------------------------------------
// process LDST packet: accumulate the received link delta sum totals
// ------------------------------------------------------------------------
void s_ldst_packet (uint payload)
{
#ifdef DEBUG
ldt_recv++;
#endif

// add the received value to the total so far,
s_lds_part += (lds_t) payload;

// increment the count of link delta sums arrived,
s_ldst_arrived++;

// check whether all the partial sums have arrived
if (s_ldst_arrived == scfg.ldst_expected)
{
// send the final value of s_lds_part back to the w cores
while (!spin1_send_mc_packet (ldsrKey, s_lds_part, WITH_PAYLOAD));

#ifdef DEBUG
pkt_sent++;
ldr_sent++;
#endif

// access thread semaphore with interrupts disabled
uint cpsr = spin1_int_disable ();

#if defined(DEBUG) && defined(DEBUG_THRDS)
if (!(sb_thrds_pend & SPINN_THRD_LDST))
wrng_cth++;
#endif

// check if all other threads done
if (sb_thrds_pend == SPINN_THRD_LDST)
{
// if done initialise semaphore
sb_thrds_pend = SPINN_SB_THRDS;

// restore interrupts after semaphore access,
spin1_mode_restore (cpsr);

// and advance tick
sb_advance_tick ();
}
else
{
// if not done report processing thread done,
sb_thrds_pend &= ~SPINN_THRD_LDST;

// and restore interrupts after semaphore access
spin1_mode_restore (cpsr);
}
}
}
// ------------------------------------------------------------------------
3 changes: 1 addition & 2 deletions c_code/comms_s.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ void s_processQueue (uint unused0, uint unused1);
void s_stop_packet (uint key);
void s_net_stop_packet (uint key);

void s_ldsa_packet (uint payload);
void s_ldst_packet (uint payload);
void s_lds_packet (uint payload);

#endif
75 changes: 45 additions & 30 deletions c_code/comms_t.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ void t_processFWDQueue (uint unused0, uint unused1)
tf_process (key, payload);
}

// process criterion packet,
// or process criterion packet,
else if (pkt_type == SPINN_CRIT_KEY)
{
t_criterion_packet (key);
}

// process tick stop packet,
// or process tick stop packet,
else if (pkt_type == SPINN_STOP_KEY)
{
t_stop_packet (key);
}

// process network stop packet,
// or process network stop packet,
else if (pkt_type == SPINN_STPN_KEY)
{
t_net_stop_packet (key);
Expand Down Expand Up @@ -170,38 +170,48 @@ void t_criterion_packet (uint key)
#endif

// partial criterion value arrived,
tf_crit_prev = key & SPINN_STPD_MASK;
tf_crit_prev = tf_crit_prev && (key & SPINN_STPD_MASK);

// access flag with interrupts disabled,
uint cpsr = spin1_int_disable ();
// update scoreboard,
tf_crit_arrived++;

// and check if updated criterion value can be forwarded
if (tf_crit_rdy)
// and check if all criterion packets arrived
if (tf_crit_arrived == tcfg.crit_expected)
{
// initialise flag,
tf_crit_rdy = tf_init_crit;
// initialise scoreboard for next tick,
tf_crit_arrived = 0;

// restore interrupts after flag access,
spin1_mode_restore (cpsr);
// access flag with interrupts disabled,
uint cpsr = spin1_int_disable ();

// send stop packet,
tf_send_stop ();

// and advance tick if last_output_group
//NOTE: last output group does not get a tick stop packet
// so it's ready to advance tick
if (tcfg.is_last_output_group)
// and check if updated criterion value can be forwarded
if (tf_crit_rdy)
{
tf_advance_tick ();
// initialise flag,
tf_crit_rdy = 0;

// restore interrupts after flag access,
spin1_mode_restore (cpsr);

// send stop packet,
tf_send_stop ();

// and advance tick if last_output_group
//NOTE: last output group does not get a tick stop packet
// so it's ready to advance tick
if (tcfg.is_last_output)
{
tf_advance_tick ();
}
}
}
else
{
// flag ready to forward criterion,
tf_crit_rdy = 1;
else
{
// flag ready to forward criterion,
tf_crit_rdy = 1;

// and restore interrupts after flag access
spin1_mode_restore (cpsr);
// and restore interrupts after flag access
spin1_mode_restore (cpsr);
}
}
}
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -311,7 +321,7 @@ void t_backprop_packet (uint key, uint payload)
// store received error,
t_errors[tb_comms][inx] = (error_t) payload;

// and update scoreboard,
// update scoreboard,
tb_arrived++;

// if all expected errors have arrived may move to next tick
Expand Down Expand Up @@ -362,7 +372,12 @@ void tf_send_stop (void)
// "aggregate" criteria,
tf_stop_crit = tf_stop_crit && tf_crit_prev;

if (tcfg.is_last_output_group)
// initialise previous value,
//TODO: should this be done in critical section?
tf_crit_prev = TRUE;

// make stop decision,
if (tcfg.is_last_output)
{
tf_group_crit = tf_stop_crit;

Expand All @@ -386,7 +401,7 @@ void tf_send_stop (void)

#ifdef DEBUG
pkt_sent++;
if (tcfg.is_last_output_group)
if (tcfg.is_last_output)
{
stp_sent++;
}
Expand Down
Loading

0 comments on commit 2114937

Please sign in to comment.