Skip to content

Commit

Permalink
Create less intermediate data from events (#2249)
Browse files Browse the repository at this point in the history
This PR optimizes event handling by removing the intermediate structure
created by this chain
of transformations:
1. `event_lanes := vector<vector<event>>` (one lane per local cell, each
lane sorted by time)
2. `staged_events_per_mech_id :=
vector<vector<vector<deliverable_event>>>` (one vector per mech id and
time step, sorted by time)
3. `vector<event_stream>` one stream per mech id

The following optimisations where performed:
- cut out the middle step (2) completely as it is wholly unneeded and
sort directly into event streams
- remove a spurious index structure from `cable_cell_group`
- slim down `deliverable_event` and `deliverable_event_data`
- `event_stream` now uses a partition instead of a vector of ranges for
splitting its data into `dt` buckets. (Save 8B per `dt` ;))

The result is that the quite pathological example `calcium_stdp.py` (as
given in which generates immense amounts
of spikes using a single cell group and a single epoch drops from 3.8GB
heap to 1.9GB heap usage at peak
using the same runtime or slightly less (<5% difference).

## TODO
- [x] Port to GPU.
 - [X] Tests pass
 - [x] Examples run through
- [x] Fix tests... Interestingly _locally_ all tests pass on my dev
machine regardless of optimisation, vectorisation, and assertion
settings.

---------

Co-authored-by: boeschf <[email protected]>
  • Loading branch information
thorstenhater and boeschf authored Sep 17, 2024
1 parent e6b78db commit 047550d
Show file tree
Hide file tree
Showing 22 changed files with 301 additions and 327 deletions.
8 changes: 6 additions & 2 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ struct has_event_index<deliverable_event> : public std::true_type {};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_id; // same as target_handle::mech_id
cell_local_size_type mech_index; // same as target_handle::mech_index
float weight;
ARB_SERDES_ENABLE(deliverable_event_data, mech_id, mech_index, weight);
deliverable_event_data(cell_local_size_type idx, float w):
mech_index(idx),
weight(w) {}
ARB_SERDES_ENABLE(deliverable_event_data,
mech_index,
weight);
};

// Stream index accessor function for multi_event_stream:
Expand Down
88 changes: 66 additions & 22 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
@@ -1,63 +1,107 @@
#pragma once

#include <type_traits>
#include <vector>

#include <arbor/generic_event.hpp>
#include <arbor/mechanism_abi.h>


#include "backends/event.hpp"
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
#include "timestep_range.hpp"
#include "util/partition.hpp"

ARB_SERDES_ENABLE_EXT(arb_deliverable_event_data, mech_index, weight);

namespace arb {

template <typename Event, typename Span>
class event_stream_base {
public: // member types
template <typename Event>
struct event_stream_base {
using size_type = std::size_t;
using event_type = Event;
using event_time_type = ::arb::event_time_type<Event>;
using event_data_type = ::arb::event_data_type<Event>;

protected: // private member types
using span_type = Span;

static_assert(std::is_same<decltype(std::declval<span_type>().begin()), event_data_type*>::value);
static_assert(std::is_same<decltype(std::declval<span_type>().end()), event_data_type*>::value);

protected: // members
std::vector<event_data_type> ev_data_;
std::vector<span_type> ev_spans_;
std::vector<std::size_t> ev_spans_ = {0};
size_type index_ = 0;
event_data_type* base_ptr_ = nullptr;

public:
event_stream_base() = default;

// returns true if the currently marked time step has no events
bool empty() const {
return ev_spans_.empty() || ev_data_.empty() || !index_ || index_ > ev_spans_.size() ||
!ev_spans_[index_-1].size();
return ev_data_.empty() // No events
|| index_ < 1 // Since we index with a left bias, index_ must be at least 1
|| index_ >= ev_spans_.size() // Cannot index at container length
|| ev_spans_[index_-1] >= ev_spans_[index_]; // Current span is empty
}

void mark() {
index_ += (index_ <= ev_spans_.size() ? 1 : 0);
}
void mark() { index_ += 1; }

auto marked_events() {
using std::begin;
using std::end;
if (empty()) {
return make_event_stream_state((event_data_type*)nullptr, (event_data_type*)nullptr);
} else {
return make_event_stream_state(begin(ev_spans_[index_-1]), end(ev_spans_[index_-1]));
auto beg = (event_data_type*)nullptr;
auto end = (event_data_type*)nullptr;
if (!empty()) {
beg = base_ptr_ + ev_spans_[index_-1];
end = base_ptr_ + ev_spans_[index_];
}
return make_event_stream_state(beg, end);
}

// clear all previous data
void clear() {
ev_data_.clear();
// Clear + push doesn't allocate a new vector
ev_spans_.clear();
ev_spans_.push_back(0);
base_ptr_ = nullptr;
index_ = 0;
}

// Construct a mapping of mech_id to a stream s.t. streams are partitioned into
// time step buckets by `ev_span`
template<typename EventStream>
static std::enable_if_t<std::is_base_of_v<event_stream_base, EventStream>>
multi_event_stream(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
auto n_steps = steps.size();

std::unordered_map<unsigned, std::vector<std::size_t>> dt_sizes;
for (auto& [k, v]: streams) {
v.clear();
dt_sizes[k].resize(n_steps, 0);
}

auto cell = 0;
for (auto& lane: lanes) {
auto div = divs[cell];
arb_size_type step = 0;
for (auto evt: lane) {
auto time = evt.time;
auto weight = evt.weight;
auto target = evt.target;
while(step < n_steps && time >= steps[step].t_end()) ++step;
// Events coinciding with epoch's upper boundary belong to next epoch
if (step >= n_steps) break;
auto& handle = handles[div + target];
streams[handle.mech_id].ev_data_.push_back({handle.mech_index, weight});
dt_sizes[handle.mech_id][step]++;
}
++cell;
}

for (auto& [id, stream]: streams) {
util::make_partition(stream.ev_spans_, dt_sizes[id]);
stream.init();
}
}
};

} // namespace arb
142 changes: 63 additions & 79 deletions arbor/backends/gpu/event_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,33 @@

// Indexed collection of pop-only event queues --- CUDA back-end implementation.

#include <arbor/mechanism_abi.h>

#include "backends/event_stream_base.hpp"
#include "memory/memory.hpp"
#include "util/partition.hpp"
#include "util/range.hpp"
#include "util/rangeutil.hpp"
#include "util/transform.hpp"
#include "threading/threading.hpp"
#include <arbor/mechanism_abi.h>

ARB_SERDES_ENABLE_EXT(arb_deliverable_event_data, mech_index, weight);
#include "timestep_range.hpp"
#include "memory/memory.hpp"

namespace arb {
namespace gpu {

template <typename Event>
class event_stream :
public event_stream_base<Event,
typename memory::device_vector<::arb::event_data_type<Event>>::view_type> {
struct event_stream: public event_stream_base<Event> {
public:
using base = event_stream_base<Event, typename memory::device_vector<::arb::event_data_type<Event>>::view_type>;
using base = event_stream_base<Event>;
using size_type = typename base::size_type;
using event_data_type = typename base::event_data_type;
using device_array = memory::device_vector<event_data_type>;

private: // members
task_system_handle thread_pool_;
device_array device_ev_data_;
std::vector<size_type> offsets_;
using base::clear;
using base::ev_data_;
using base::ev_spans_;
using base::base_ptr_;

public:
event_stream() = default;
event_stream(task_system_handle t): base(), thread_pool_{t} {}

void clear() {
base::clear();
offsets_.clear();
}

// Initialize event streams from a vector of vector of events
// Outer vector represents time step bins
void init(const std::vector<std::vector<Event>>& staged) {
Expand All @@ -54,94 +43,89 @@ class event_stream :
if (!num_events) return;

// allocate space for spans and data
base::ev_spans_.resize(staged.size());
base::ev_data_.resize(num_events);
offsets_.resize(staged.size()+1);
ev_spans_.resize(staged.size() + 1);
ev_data_.resize(num_events);
resize(device_ev_data_, num_events);

// compute offsets by exclusive scan over staged events
util::make_partition(offsets_,
util::transform_view(staged, [&](const auto& v) { return v.size(); }),
(size_type)0u);
util::make_partition(ev_spans_,
util::transform_view(staged, [](const auto& v) { return v.size(); }),
0ull);

// assign, copy to device (and potentially sort) the event data in parallel
arb_assert(thread_pool_);
threading::parallel_for::apply(0, staged.size(), thread_pool_.get(),
[this,&staged](size_type i) {
const auto offset = offsets_[i];
const auto size = staged[i].size();
// add device range
base::ev_spans_[i] = device_ev_data_(offset, offset + size);
// host span
auto host_span = memory::make_view(base::ev_data_)(offset, offset + size);
arb_assert(ev_spans_.size() == staged.size() + 1);
threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this, &staged](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);
const auto len = end - beg;

auto host_span = memory::make_view(ev_data_)(beg, end);

// make event data and copy
std::copy_n(util::transform_view(staged[i],
[](const auto& x) { return event_data(x); }).begin(),
size,
len,
host_span.begin());
// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, base::ev_spans_[i]);
auto device_span = memory::make_view(device_ev_data_)(beg, end);
memory::copy_async(host_span, device_span);
});

arb_assert(num_events == base::ev_data_.size());
}
base_ptr_ = device_ev_data_.data();

friend void serialize(serializer& ser, const std::string& k, const event_stream<Event>& t) {
ser.begin_write_map(::arb::to_serdes_key(k));
ARB_SERDES_WRITE(ev_data_);
ser.begin_write_map("ev_spans_");
auto base_ptr = t.device_ev_data_.data();
for (size_t ix = 0; ix < t.ev_spans_.size(); ++ix) {
ser.begin_write_map(std::to_string(ix));
const auto& span = t.ev_spans_[ix];
ser.write("offset", static_cast<unsigned long long>(span.begin() - base_ptr));
ser.write("size", static_cast<unsigned long long>(span.size()));
ser.end_write_map();
}
ser.end_write_map();
ARB_SERDES_WRITE(index_);
ARB_SERDES_WRITE(device_ev_data_);
ARB_SERDES_WRITE(offsets_);
ser.end_write_map();
arb_assert(num_events == device_ev_data_.size());
arb_assert(num_events == ev_data_.size());
}

friend void deserialize(serializer& ser, const std::string& k, event_stream<Event>& t) {
ser.begin_read_map(::arb::to_serdes_key(k));
ARB_SERDES_READ(ev_data_);
ser.begin_read_map("ev_spans_");
for (size_t ix = 0; ser.next_key(); ++ix) {
ser.begin_read_map(std::to_string(ix));
unsigned long long offset = 0, size = 0;
ser.read("offset", offset);
ser.read("size", size);
typename base::span_type span{t.ev_data_.data() + offset, size};
if (ix < t.ev_spans_.size()) {
t.ev_spans_[ix] = span;
} else {
t.ev_spans_.emplace_back(span);
}
ser.end_read_map();
}
ser.end_read_map();
ARB_SERDES_READ(index_);
ARB_SERDES_READ(device_ev_data_);
ARB_SERDES_READ(offsets_);
ser.end_read_map();
// Initialize event stream assuming ev_data_ and ev_span_ has
// been set previously (e.g. by `base::multi_event_stream`)
void init() {
resize(device_ev_data_, ev_data_.size());
base_ptr_ = device_ev_data_.data();

threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);

auto host_span = memory::make_view(ev_data_)(beg, end);
auto device_span = memory::make_view(device_ev_data_)(beg, end);

// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, device_span);
});
}

private:
template<typename D>
static void resize(D& d, std::size_t size) {
// resize if necessary
if (d.size() < size) {
d = D(size);
}
}

ARB_SERDES_ENABLE(event_stream<Event>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

task_system_handle thread_pool_;
device_array device_ev_data_;
};

} // namespace gpu
Expand Down
13 changes: 10 additions & 3 deletions arbor/backends/gpu/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
#include "backends/event_stream_state.hpp"
#include "backends/gpu/chunk_writer.hpp"
#include "memory/copy.hpp"
#include "memory/gpu_wrappers.hpp"
#include "memory/wrappers.hpp"
#include "util/index_into.hpp"
#include "util/rangeutil.hpp"
#include "util/maputil.hpp"
#include "util/meta.hpp"
#include "util/range.hpp"
#include "util/strprintf.hpp"

Expand Down Expand Up @@ -241,7 +239,8 @@ void shared_state::instantiate(mechanism& m,
m.ppack_.n_detectors = n_detector;

if (storage.count(id)) throw arb::arbor_internal_error("Duplicate mech id in shared state");
auto& store = storage.emplace(id, mech_storage{thread_pool}).first->second;
auto& store = storage.emplace(id, mech_storage{}).first->second;
streams[id] = deliverable_event_stream{thread_pool};

// Allocate view pointers
store.state_vars_ = std::vector<arb_value_type*>(m.mech_.n_state_vars);
Expand Down Expand Up @@ -389,6 +388,14 @@ void shared_state::take_samples() {
}
}

void shared_state::init_events(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<size_t>& divs,
const timestep_range& dts) {
arb::gpu::event_stream<deliverable_event>::multi_event_stream(lanes, handles, divs, dts, streams);
}


// Debug interface
ARB_ARBOR_API std::ostream& operator<<(std::ostream& o, shared_state& s) {
using io::csv;
Expand Down
Loading

0 comments on commit 047550d

Please sign in to comment.