Skip to content

Commit

Permalink
event handling: fix a bug and clean house (#2421)
Browse files Browse the repository at this point in the history
- fix event stream when more than one cell in a cell group have same
synapse
  - events would previously no longer necessarily be sorted by time
- in order to simplify: also sort with respect to mechanism index (as
was previously only required for the gpu backend)
  - add pertinent test
- while cleaning up: overhauled the event related files and data
structures
  - removed dead code
- made event handling less generic (this feature was not used anywhere)
  • Loading branch information
boeschf authored Oct 22, 2024
1 parent 9de9174 commit 5eab345
Show file tree
Hide file tree
Showing 25 changed files with 388 additions and 707 deletions.
9 changes: 0 additions & 9 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <arbor/fvm_types.hpp>
#include <arbor/serdes.hpp>
#include <arbor/mechanism_abi.h>
#include <arbor/generic_event.hpp>

// Structures for the representation of event delivery targets and
// staged events.
Expand Down Expand Up @@ -46,9 +45,6 @@ struct deliverable_event {
ARB_SERDES_ENABLE(deliverable_event, time, weight, handle);
};

template<>
struct has_event_index<deliverable_event> : public std::true_type {};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_index; // same as target_handle::mech_index
Expand All @@ -61,11 +57,6 @@ struct deliverable_event_data {
weight);
};

// Stream index accessor function for multi_event_stream:
inline cell_local_size_type event_index(const arb_deliverable_event_data& ed) {
return ed.mech_index;
}

// Delivery data accessor function for multi_event_stream:
inline arb_deliverable_event_data event_data(const deliverable_event& ev) {
return {ev.handle.mech_index, ev.weight};
Expand Down
99 changes: 79 additions & 20 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

#include <vector>

#include <arbor/generic_event.hpp>
#include <arbor/mechanism_abi.h>


#include "backends/event.hpp"
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
Expand All @@ -18,15 +16,13 @@ namespace arb {

template <typename Event>
struct event_stream_base {
using size_type = std::size_t;
using event_type = Event;
using event_time_type = ::arb::event_time_type<Event>;
using event_data_type = ::arb::event_data_type<Event>;
using event_data_type = decltype(event_data(std::declval<Event>()));

protected: // members
std::vector<event_data_type> ev_data_;
std::vector<std::size_t> ev_spans_ = {0};
size_type index_ = 0;
std::size_t index_ = 0;
event_data_type* base_ptr_ = nullptr;

public:
Expand Down Expand Up @@ -62,24 +58,32 @@ struct event_stream_base {
index_ = 0;
}

// Construct a mapping of mech_id to a stream s.t. streams are partitioned into
// time step buckets by `ev_span`
protected:
// backend specific initializations
virtual void init() = 0;
};

struct spike_event_stream_base : event_stream_base<deliverable_event> {
template<typename EventStream>
static std::enable_if_t<std::is_base_of_v<event_stream_base, EventStream>>
multi_event_stream(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
friend void initialize(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
arb_assert(lanes.size() < divs.size());

// reset streams and allocate sufficient space for temporaries
auto n_steps = steps.size();
std::unordered_map<unsigned, std::vector<std::size_t>> dt_sizes;
for (auto& [k, v]: streams) {
v.clear();
dt_sizes[k].resize(n_steps, 0);
v.spike_counter_.clear();
v.spike_counter_.resize(steps.size(), 0);
v.spikes_.clear();
// ev_data_ has been cleared during v.clear(), so we use its capacity
v.spikes_.reserve(v.ev_data_.capacity());
}

// loop over lanes: group events by mechanism and sort them by time
auto cell = 0;
for (const auto& lane: lanes) {
auto div = divs[cell];
Expand All @@ -94,16 +98,71 @@ struct event_stream_base {
if (step >= n_steps) break;
arb_assert(div + target < handles.size());
const auto& handle = handles[div + target];
streams[handle.mech_id].ev_data_.push_back({handle.mech_index, weight});
dt_sizes[handle.mech_id][step]++;
auto& stream = streams[handle.mech_id];
stream.spikes_.push_back(spike_data{step, handle.mech_index, time, weight});
// insertion sort with last element as pivot
// ordering: first w.r.t. step, within a step: mech_index, within a mech_index: time
auto first = stream.spikes_.begin();
auto last = stream.spikes_.end();
auto pivot = std::prev(last, 1);
std::rotate(std::upper_bound(first, pivot, *pivot), pivot, last);
// increment count in current time interval
stream.spike_counter_[step]++;
}
}

for (auto& [id, stream]: streams) {
util::make_partition(stream.ev_spans_, dt_sizes[id]);
stream.init();
// copy temporary deliverable_events into stream's ev_data_
stream.ev_data_.reserve(stream.spikes_.size());
std::transform(stream.spikes_.begin(), stream.spikes_.end(), std::back_inserter(stream.ev_data_),
[](auto const& e) noexcept -> arb_deliverable_event_data {
return {e.mech_index, e.weight}; });
// scan over spike_counter_ and written to ev_spans_
util::make_partition(stream.ev_spans_, stream.spike_counter_);
// delegate to derived class init: static cast necessary to access protected init()
static_cast<spike_event_stream_base&>(stream).init();
}
}

protected: // members
struct spike_data {
arb_size_type step = 0;
cell_local_size_type mech_index = 0;
time_type time = 0;
float weight = 0;
auto operator<=>(spike_data const&) const noexcept = default;
};
std::vector<spike_data> spikes_;
std::vector<std::size_t> spike_counter_;
};

struct sample_event_stream_base : event_stream_base<sample_event> {
friend void initialize(const std::vector<std::vector<sample_event>>& staged,
sample_event_stream_base& stream) {
// clear previous data
stream.clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
auto num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
stream.ev_spans_.reserve(staged.size() + 1);
stream.ev_data_.reserve(num_events);

// add event data and spans
for (const auto& v : staged) {
for (const auto& ev: v) stream.ev_data_.push_back(ev.raw);
stream.ev_spans_.push_back(stream.ev_data_.size());
}

arb_assert(num_events == stream.ev_data_.size());
arb_assert(staged.size() + 1 == stream.ev_spans_.size());
stream.init();
}
};

} // namespace arb
126 changes: 20 additions & 106 deletions arbor/backends/gpu/event_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,33 @@

// Indexed collection of pop-only event queues --- CUDA back-end implementation.

#include <arbor/mechanism_abi.h>

#include "backends/event_stream_base.hpp"
#include "util/transform.hpp"
#include "threading/threading.hpp"
#include "timestep_range.hpp"
#include "memory/memory.hpp"

namespace arb {
namespace gpu {

template <typename Event>
struct event_stream: public event_stream_base<Event> {
public:
using base = event_stream_base<Event>;
using size_type = typename base::size_type;
using event_data_type = typename base::event_data_type;
using device_array = memory::device_vector<event_data_type>;

using base::clear;
using base::ev_data_;
using base::ev_spans_;
using base::base_ptr_;

event_stream() = default;
event_stream(task_system_handle t): base(), thread_pool_{t} {}

// Initialize event streams from a vector of vector of events
// Outer vector represents time step bins
void init(const std::vector<std::vector<Event>>& staged) {
// clear previous data
clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
const size_type num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
ev_spans_.resize(staged.size() + 1);
ev_data_.resize(num_events);
resize(device_ev_data_, num_events);

// compute offsets by exclusive scan over staged events
util::make_partition(ev_spans_,
util::transform_view(staged, [](const auto& v) { return v.size(); }),
0ull);

// assign, copy to device (and potentially sort) the event data in parallel
arb_assert(thread_pool_);
arb_assert(ev_spans_.size() == staged.size() + 1);
threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this, &staged](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);
const auto len = end - beg;

auto host_span = memory::make_view(ev_data_)(beg, end);

// make event data and copy
std::copy_n(util::transform_view(staged[i],
[](const auto& x) { return event_data(x); }).begin(),
len,
host_span.begin());
// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
auto device_span = memory::make_view(device_ev_data_)(beg, end);
memory::copy_async(host_span, device_span);
});

base_ptr_ = device_ev_data_.data();
template<typename BaseEventStream>
struct event_stream : BaseEventStream {
public:
ARB_SERDES_ENABLE(event_stream<BaseEventStream>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

arb_assert(num_events == device_ev_data_.size());
arb_assert(num_events == ev_data_.size());
protected:
void init() override final {
resize(this->device_ev_data_, this->device_ev_data_.size());
memory::copy_async(this->ev_data_, this->device_ev_data_);
this->base_ptr_ = this->device_ev_data_.data();
}

// Initialize event stream assuming ev_data_ and ev_span_ has
// been set previously (e.g. by `base::multi_event_stream`)
void init() {
resize(device_ev_data_, ev_data_.size());
base_ptr_ = device_ev_data_.data();

threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);

auto host_span = memory::make_view(ev_data_)(beg, end);
auto device_span = memory::make_view(device_ev_data_)(beg, end);
private: // device memory
using event_data_type = typename BaseEventStream::event_data_type;
using device_array = memory::device_vector<event_data_type>;

// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, device_span);
});
}
device_array device_ev_data_;

template<typename D>
static void resize(D& d, std::size_t size) {
Expand All @@ -117,16 +37,10 @@ struct event_stream: public event_stream_base<Event> {
d = D(size);
}
}

ARB_SERDES_ENABLE(event_stream<Event>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

task_system_handle thread_pool_;
device_array device_ev_data_;
};

using spike_event_stream = event_stream<spike_event_stream_base>;
using sample_event_stream = event_stream<sample_event_stream_base>;

} // namespace gpu
} // namespace arb
2 changes: 0 additions & 2 deletions arbor/backends/gpu/fvm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ struct backend {
using threshold_watcher = arb::gpu::threshold_watcher;
using cable_solver = arb::gpu::matrix_state_fine<arb_value_type, arb_index_type>;
using diffusion_solver = arb::gpu::diffusion_state<arb_value_type, arb_index_type>;
using deliverable_event_stream = arb::gpu::deliverable_event_stream;
using sample_event_stream = arb::gpu::sample_event_stream;

using shared_state = arb::gpu::shared_state;
using ion_state = arb::gpu::ion_state;
Expand Down
3 changes: 0 additions & 3 deletions arbor/backends/gpu/gpu_store_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ using array = memory::device_vector<arb_value_type>;
using iarray = memory::device_vector<arb_index_type>;
using sarray = memory::device_vector<arb_size_type>;

using deliverable_event_stream = arb::gpu::event_stream<deliverable_event>;
using sample_event_stream = arb::gpu::event_stream<sample_event>;

} // namespace gpu
} // namespace arb

12 changes: 2 additions & 10 deletions arbor/backends/gpu/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ shared_state::shared_state(task_system_handle tp,
time_since_spike(n_cell*n_detector),
src_to_spike(make_const_view(src_to_spike_)),
cbprng_seed(cbprng_seed_),
sample_events(thread_pool),
sample_events(),
watcher{n_cv_, src_to_spike.data(), detector_info}
{
memory::fill(time_since_spike, -1.0);
Expand Down Expand Up @@ -262,7 +262,7 @@ void shared_state::instantiate(mechanism& m,

if (storage.count(id)) throw arb::arbor_internal_error("Duplicate mech id in shared state");
auto& store = storage.emplace(id, mech_storage{}).first->second;
streams[id] = deliverable_event_stream{thread_pool};
streams[id] = spike_event_stream{};

// Allocate view pointers
store.state_vars_ = std::vector<arb_value_type*>(m.mech_.n_state_vars);
Expand Down Expand Up @@ -410,14 +410,6 @@ void shared_state::take_samples() {
}
}

void shared_state::init_events(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<size_t>& divs,
const timestep_range& dts) {
arb::gpu::event_stream<deliverable_event>::multi_event_stream(lanes, handles, divs, dts, streams);
}


// Debug interface
ARB_ARBOR_API std::ostream& operator<<(std::ostream& o, shared_state& s) {
using io::csv;
Expand Down
Loading

0 comments on commit 5eab345

Please sign in to comment.