Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create less intermediate data from events #2249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ef6590c
Remove unused gid -> idx mapping.
thorstenhater Jan 12, 2024
6af003d
Re-factoring.
thorstenhater Jan 13, 2024
31ead67
Push event lane to staged events to lowered cell and fix tests.
thorstenhater Jan 15, 2024
2fdc92d
Convert event span to partition.
thorstenhater Jan 16, 2024
9074e7a
Multicore works. Almost.
thorstenhater Jan 16, 2024
0fa64ca
Fix all tests for deflated event handling.
thorstenhater Jan 17, 2024
280c768
Remove spurious return.
thorstenhater Jan 17, 2024
ecb1da0
void!
thorstenhater Jan 17, 2024
c685dbb
Off-by-one in empty.
thorstenhater Feb 14, 2024
44b0f22
Formatting
thorstenhater Feb 15, 2024
7ce840a
Renable evt_index
thorstenhater Feb 15, 2024
1e03b0b
Intermediate step, prep shared state and stream on GPU
thorstenhater Feb 15, 2024
0dde4c0
Renable streams in SERDES
thorstenhater Feb 15, 2024
baae0c8
Massage GPU stream
thorstenhater Feb 15, 2024
865ff8d
Clean-up SERDES
thorstenhater Feb 15, 2024
dcb6672
Fix includes
thorstenhater Feb 15, 2024
f132f43
Size matters...
thorstenhater Feb 15, 2024
0a1bdff
Fix copying
thorstenhater Feb 15, 2024
7836b56
Shuffle SERDES for arb_deliverable_event
thorstenhater Feb 15, 2024
f861588
Renable device_ev_data in serde.
thorstenhater Feb 16, 2024
1815771
Make a multi event stream for GPU.
thorstenhater Feb 16, 2024
ce4020b
Massaging various files
thorstenhater Feb 16, 2024
e680a65
More asserts and clean-up.
thorstenhater Feb 16, 2024
efd6487
Typo.
thorstenhater Feb 16, 2024
49b4cf8
Typo ctd
thorstenhater Feb 16, 2024
14dd6a7
Introduce base_ptr.
thorstenhater Feb 16, 2024
30f6f78
Add a semicolon.
thorstenhater Feb 16, 2024
eb2e58c
Shuffleboard
thorstenhater Feb 16, 2024
a93f71b
Shuffle more
thorstenhater Feb 16, 2024
da2a8f4
Merge remote-tracking branch 'origin/master' into perf/create-less-ev…
thorstenhater Feb 16, 2024
88842cf
Merge and commit.
thorstenhater Feb 16, 2024
9a7ebb0
Merge remote-tracking branch 'origin/master' into perf/create-less-ev…
thorstenhater Aug 15, 2024
b77013d
Merge remote-tracking branch 'origin/master' into perf/create-less-ev…
thorstenhater Aug 26, 2024
15550d8
factorized streams
boeschf Sep 10, 2024
9043cd1
Merge pull request #3 from boeschf/create-less-event-data-1
thorstenhater Sep 11, 2024
4b95af3
@boeschf's suggestions.
thorstenhater Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ struct has_event_index<deliverable_event> : public std::true_type {};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_id; // same as target_handle::mech_id
cell_local_size_type mech_index; // same as target_handle::mech_index
float weight;
ARB_SERDES_ENABLE(deliverable_event_data, mech_id, mech_index, weight);
deliverable_event_data(cell_local_size_type idx, float w):
mech_index(idx),
weight(w) {}
ARB_SERDES_ENABLE(deliverable_event_data,
mech_index,
weight);
};

// Stream index accessor function for multi_event_stream:
Expand Down
88 changes: 66 additions & 22 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
@@ -1,63 +1,107 @@
#pragma once

#include <type_traits>
#include <vector>

#include <arbor/generic_event.hpp>
#include <arbor/mechanism_abi.h>


#include "backends/event.hpp"
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
#include "timestep_range.hpp"
#include "util/partition.hpp"

ARB_SERDES_ENABLE_EXT(arb_deliverable_event_data, mech_index, weight);

namespace arb {

template <typename Event, typename Span>
class event_stream_base {
public: // member types
template <typename Event>
struct event_stream_base {
using size_type = std::size_t;
using event_type = Event;
using event_time_type = ::arb::event_time_type<Event>;
using event_data_type = ::arb::event_data_type<Event>;

protected: // private member types
using span_type = Span;

static_assert(std::is_same<decltype(std::declval<span_type>().begin()), event_data_type*>::value);
static_assert(std::is_same<decltype(std::declval<span_type>().end()), event_data_type*>::value);

protected: // members
std::vector<event_data_type> ev_data_;
std::vector<span_type> ev_spans_;
std::vector<std::size_t> ev_spans_ = {0};
size_type index_ = 0;
event_data_type* base_ptr_ = nullptr;

public:
event_stream_base() = default;

// returns true if the currently marked time step has no events
bool empty() const {
return ev_spans_.empty() || ev_data_.empty() || !index_ || index_ > ev_spans_.size() ||
!ev_spans_[index_-1].size();
return ev_data_.empty() // No events
|| index_ < 1 // Since we index with a left bias, index_ must be at least 1
|| index_ >= ev_spans_.size() // Cannot index at container length
|| ev_spans_[index_-1] >= ev_spans_[index_]; // Current span is empty
}

void mark() {
index_ += (index_ <= ev_spans_.size() ? 1 : 0);
}
void mark() { index_ += 1; }

auto marked_events() {
using std::begin;
using std::end;
if (empty()) {
return make_event_stream_state((event_data_type*)nullptr, (event_data_type*)nullptr);
} else {
return make_event_stream_state(begin(ev_spans_[index_-1]), end(ev_spans_[index_-1]));
auto beg = (event_data_type*)nullptr;
auto end = (event_data_type*)nullptr;
if (!empty()) {
beg = base_ptr_ + ev_spans_[index_-1];
end = base_ptr_ + ev_spans_[index_];
}
return make_event_stream_state(beg, end);
}

// clear all previous data
void clear() {
ev_data_.clear();
// Clear + push doesn't allocate a new vector
ev_spans_.clear();
ev_spans_.push_back(0);
base_ptr_ = nullptr;
index_ = 0;
}

// Construct a mapping of mech_id to a stream s.t. streams are partitioned into
// time step buckets by `ev_span`
template<typename EventStream>
static std::enable_if_t<std::is_base_of_v<event_stream_base, EventStream>>
multi_event_stream(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
auto n_steps = steps.size();

std::unordered_map<unsigned, std::vector<std::size_t>> dt_sizes;
for (auto& [k, v]: streams) {
v.clear();
dt_sizes[k].resize(n_steps, 0);
}

auto cell = 0;
for (auto& lane: lanes) {
auto div = divs[cell];
arb_size_type step = 0;
for (auto evt: lane) {
auto time = evt.time;
auto weight = evt.weight;
auto target = evt.target;
while(step < n_steps && time >= steps[step].t_end()) ++step;
// Events coinciding with epoch's upper boundary belong to next epoch
if (step >= n_steps) break;
auto& handle = handles[div + target];
streams[handle.mech_id].ev_data_.push_back({handle.mech_index, weight});
dt_sizes[handle.mech_id][step]++;
}
++cell;
}

for (auto& [id, stream]: streams) {
util::make_partition(stream.ev_spans_, dt_sizes[id]);
stream.init();
}
}
};

} // namespace arb
142 changes: 63 additions & 79 deletions arbor/backends/gpu/event_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,33 @@

// Indexed collection of pop-only event queues --- CUDA back-end implementation.

#include <arbor/mechanism_abi.h>

#include "backends/event_stream_base.hpp"
#include "memory/memory.hpp"
#include "util/partition.hpp"
#include "util/range.hpp"
#include "util/rangeutil.hpp"
#include "util/transform.hpp"
#include "threading/threading.hpp"
#include <arbor/mechanism_abi.h>

ARB_SERDES_ENABLE_EXT(arb_deliverable_event_data, mech_index, weight);
#include "timestep_range.hpp"
#include "memory/memory.hpp"

namespace arb {
namespace gpu {

template <typename Event>
class event_stream :
public event_stream_base<Event,
typename memory::device_vector<::arb::event_data_type<Event>>::view_type> {
struct event_stream: public event_stream_base<Event> {
public:
using base = event_stream_base<Event, typename memory::device_vector<::arb::event_data_type<Event>>::view_type>;
using base = event_stream_base<Event>;
using size_type = typename base::size_type;
using event_data_type = typename base::event_data_type;
using device_array = memory::device_vector<event_data_type>;

private: // members
task_system_handle thread_pool_;
device_array device_ev_data_;
std::vector<size_type> offsets_;
using base::clear;
using base::ev_data_;
using base::ev_spans_;
using base::base_ptr_;

public:
event_stream() = default;
event_stream(task_system_handle t): base(), thread_pool_{t} {}

void clear() {
base::clear();
offsets_.clear();
}

// Initialize event streams from a vector of vector of events
// Outer vector represents time step bins
void init(const std::vector<std::vector<Event>>& staged) {
Expand All @@ -54,94 +43,89 @@ class event_stream :
if (!num_events) return;

// allocate space for spans and data
base::ev_spans_.resize(staged.size());
base::ev_data_.resize(num_events);
offsets_.resize(staged.size()+1);
ev_spans_.resize(staged.size() + 1);
ev_data_.resize(num_events);
resize(device_ev_data_, num_events);

// compute offsets by exclusive scan over staged events
util::make_partition(offsets_,
util::transform_view(staged, [&](const auto& v) { return v.size(); }),
(size_type)0u);
util::make_partition(ev_spans_,
util::transform_view(staged, [](const auto& v) { return v.size(); }),
0ull);

// assign, copy to device (and potentially sort) the event data in parallel
arb_assert(thread_pool_);
threading::parallel_for::apply(0, staged.size(), thread_pool_.get(),
[this,&staged](size_type i) {
const auto offset = offsets_[i];
const auto size = staged[i].size();
// add device range
base::ev_spans_[i] = device_ev_data_(offset, offset + size);
// host span
auto host_span = memory::make_view(base::ev_data_)(offset, offset + size);
arb_assert(ev_spans_.size() == staged.size() + 1);
threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this, &staged](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);
const auto len = end - beg;

auto host_span = memory::make_view(ev_data_)(beg, end);

// make event data and copy
std::copy_n(util::transform_view(staged[i],
[](const auto& x) { return event_data(x); }).begin(),
size,
len,
host_span.begin());
// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, base::ev_spans_[i]);
auto device_span = memory::make_view(device_ev_data_)(beg, end);
memory::copy_async(host_span, device_span);
});

arb_assert(num_events == base::ev_data_.size());
}
base_ptr_ = device_ev_data_.data();

friend void serialize(serializer& ser, const std::string& k, const event_stream<Event>& t) {
ser.begin_write_map(::arb::to_serdes_key(k));
ARB_SERDES_WRITE(ev_data_);
ser.begin_write_map("ev_spans_");
auto base_ptr = t.device_ev_data_.data();
for (size_t ix = 0; ix < t.ev_spans_.size(); ++ix) {
ser.begin_write_map(std::to_string(ix));
const auto& span = t.ev_spans_[ix];
ser.write("offset", static_cast<unsigned long long>(span.begin() - base_ptr));
ser.write("size", static_cast<unsigned long long>(span.size()));
ser.end_write_map();
}
ser.end_write_map();
ARB_SERDES_WRITE(index_);
ARB_SERDES_WRITE(device_ev_data_);
ARB_SERDES_WRITE(offsets_);
ser.end_write_map();
arb_assert(num_events == device_ev_data_.size());
arb_assert(num_events == ev_data_.size());
}

friend void deserialize(serializer& ser, const std::string& k, event_stream<Event>& t) {
ser.begin_read_map(::arb::to_serdes_key(k));
ARB_SERDES_READ(ev_data_);
ser.begin_read_map("ev_spans_");
for (size_t ix = 0; ser.next_key(); ++ix) {
ser.begin_read_map(std::to_string(ix));
unsigned long long offset = 0, size = 0;
ser.read("offset", offset);
ser.read("size", size);
typename base::span_type span{t.ev_data_.data() + offset, size};
if (ix < t.ev_spans_.size()) {
t.ev_spans_[ix] = span;
} else {
t.ev_spans_.emplace_back(span);
}
ser.end_read_map();
}
ser.end_read_map();
ARB_SERDES_READ(index_);
ARB_SERDES_READ(device_ev_data_);
ARB_SERDES_READ(offsets_);
ser.end_read_map();
// Initialize event stream assuming ev_data_ and ev_span_ has
// been set previously (e.g. by `base::multi_event_stream`)
void init() {
resize(device_ev_data_, ev_data_.size());
base_ptr_ = device_ev_data_.data();

threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);

auto host_span = memory::make_view(ev_data_)(beg, end);
auto device_span = memory::make_view(device_ev_data_)(beg, end);

// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, device_span);
});
}

private:
template<typename D>
static void resize(D& d, std::size_t size) {
// resize if necessary
if (d.size() < size) {
d = D(size);
}
}

ARB_SERDES_ENABLE(event_stream<Event>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

task_system_handle thread_pool_;
device_array device_ev_data_;
};

} // namespace gpu
Expand Down
13 changes: 10 additions & 3 deletions arbor/backends/gpu/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
#include "backends/event_stream_state.hpp"
#include "backends/gpu/chunk_writer.hpp"
#include "memory/copy.hpp"
#include "memory/gpu_wrappers.hpp"
#include "memory/wrappers.hpp"
#include "util/index_into.hpp"
#include "util/rangeutil.hpp"
#include "util/maputil.hpp"
#include "util/meta.hpp"
#include "util/range.hpp"
#include "util/strprintf.hpp"

Expand Down Expand Up @@ -241,7 +239,8 @@ void shared_state::instantiate(mechanism& m,
m.ppack_.n_detectors = n_detector;

if (storage.count(id)) throw arb::arbor_internal_error("Duplicate mech id in shared state");
auto& store = storage.emplace(id, mech_storage{thread_pool}).first->second;
auto& store = storage.emplace(id, mech_storage{}).first->second;
streams[id] = deliverable_event_stream{thread_pool};

// Allocate view pointers
store.state_vars_ = std::vector<arb_value_type*>(m.mech_.n_state_vars);
Expand Down Expand Up @@ -389,6 +388,14 @@ void shared_state::take_samples() {
}
}

void shared_state::init_events(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<size_t>& divs,
const timestep_range& dts) {
arb::gpu::event_stream<deliverable_event>::multi_event_stream(lanes, handles, divs, dts, streams);
}


// Debug interface
ARB_ARBOR_API std::ostream& operator<<(std::ostream& o, shared_state& s) {
using io::csv;
Expand Down
Loading