Skip to content

Commit

Permalink
Adapt to Ivan's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
frankosterfeld committed Jun 27, 2023
1 parent ed4c7fb commit dfb7b5e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
34 changes: 16 additions & 18 deletions include/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ enum class trigger_observer_state {
Ignore ///< Ignore tag
};

// TODO is the scope where want these?
struct null_type {};

// Until clang-format can handle concepts
// clang-format off
template<typename T>
Expand Down Expand Up @@ -76,16 +73,16 @@ template<typename T>
class data_sink;

struct data_sink_query {
std::optional<std::string> sink_name;
std::optional<std::string> signal_name;
std::optional<std::string> _sink_name;
std::optional<std::string> _signal_name;

static data_sink_query
with_signal_name(std::string_view name) {
signal_name(std::string_view name) {
return { {}, std::string{ name } };
}

static data_sink_query
with_sink_name(std::string_view name) {
sink_name(std::string_view name) {
return { std::string{ name }, {} };
}
};
Expand All @@ -96,6 +93,7 @@ class data_sink_registry {

public:
// TODO this shouldn't be a singleton but associated with the flow graph (?)
// TODO reconsider mutex usage when moving to the graph
static data_sink_registry &
instance() {
static data_sink_registry s_instance;
Expand Down Expand Up @@ -213,8 +211,8 @@ class data_sink_registry {
auto matches = [&query](const std::any &v) {
try {
auto sink = std::any_cast<data_sink<T> *>(v);
const auto sink_name_matches = !query.sink_name || *query.sink_name == sink->name();
const auto signal_name_matches = !query.signal_name || *query.signal_name == sink->signal_name;
const auto sink_name_matches = !query._sink_name || *query._sink_name == sink->name();
const auto signal_name_matches = !query._signal_name || *query._signal_name == sink->signal_name;
return sink_name_matches && signal_name_matches;
} catch (...) {
return false;
Expand Down Expand Up @@ -390,7 +388,7 @@ class data_sink : public node<data_sink<T>> {
std::lock_guard lg(_listener_mutex);
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<poller>();
add_listener(std::make_unique<continuous_listener<null_type>>(handler, block), block);
add_listener(std::make_unique<continuous_listener<fair::meta::null_type>>(handler, block), block);
return handler;
}

Expand All @@ -400,7 +398,7 @@ class data_sink : public node<data_sink<T>> {
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
std::lock_guard lg(_listener_mutex);
add_listener(std::make_unique<trigger_listener<null_type, TriggerPredicate>>(std::forward<TriggerPredicate>(p), handler, pre_samples, post_samples, block), block);
add_listener(std::make_unique<trigger_listener<fair::meta::null_type, TriggerPredicate>>(std::forward<TriggerPredicate>(p), handler, pre_samples, post_samples, block), block);
ensure_history_size(pre_samples);
return handler;
}
Expand All @@ -411,7 +409,7 @@ class data_sink : public node<data_sink<T>> {
std::lock_guard lg(_listener_mutex);
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
add_listener(std::make_unique<multiplexed_listener<null_type, F>>(std::move(triggerObserverFactory), maximum_window_size, handler, block), block);
add_listener(std::make_unique<multiplexed_listener<fair::meta::null_type, F>>(std::move(triggerObserverFactory), maximum_window_size, handler, block), block);
return handler;
}

Expand All @@ -421,7 +419,7 @@ class data_sink : public node<data_sink<T>> {
const auto block = block_mode == blocking_mode::Blocking;
auto handler = std::make_shared<dataset_poller>();
std::lock_guard lg(_listener_mutex);
add_listener(std::make_unique<snapshot_listener<null_type, P>>(std::forward<P>(p), delay, handler, block), block);
add_listener(std::make_unique<snapshot_listener<fair::meta::null_type, P>>(std::forward<P>(p), delay, handler, block), block);
return handler;
}

Expand Down Expand Up @@ -527,7 +525,7 @@ class data_sink : public node<data_sink<T>> {

template<typename Callback>
struct continuous_listener : public abstract_listener {
static constexpr auto has_callback = !std::is_same_v<Callback, null_type>;
static constexpr auto has_callback = !std::is_same_v<Callback, fair::meta::null_type>;
static constexpr auto callback_takes_tags = std::is_invocable_v<Callback, std::span<const T>, std::span<const tag_t>>;

bool block = false;
Expand Down Expand Up @@ -675,7 +673,7 @@ class data_sink : public node<data_sink<T>> {
// I leave it as is for now.
inline void
publish_dataset(DataSet<T> &&data) {
if constexpr (!std::is_same_v<Callback, null_type>) {
if constexpr (!std::is_same_v<Callback, fair::meta::null_type>) {
callback(std::move(data));
} else {
auto poller = polling_handler.lock();
Expand Down Expand Up @@ -760,7 +758,7 @@ class data_sink : public node<data_sink<T>> {

inline void
publish_dataset(DataSet<T> &&data) {
if constexpr (!std::is_same_v<Callback, null_type>) {
if constexpr (!std::is_same_v<Callback, fair::meta::null_type>) {
callback(std::move(data));
} else {
auto poller = polling_handler.lock();
Expand Down Expand Up @@ -858,7 +856,7 @@ class data_sink : public node<data_sink<T>> {

inline void
publish_dataset(DataSet<T> &&data) {
if constexpr (!std::is_same_v<Callback, null_type>) {
if constexpr (!std::is_same_v<Callback, fair::meta::null_type>) {
callback(std::move(data));
} else {
auto poller = polling_handler.lock();
Expand Down Expand Up @@ -892,7 +890,7 @@ class data_sink : public node<data_sink<T>> {
process(std::span<const T>, std::span<const T> in_data, std::optional<property_map> tag_data0) override {
if (tag_data0 && trigger_predicate({ 0, *tag_data0 })) {
auto new_pending = pending_snapshot{ *tag_data0, sample_delay, sample_delay };
// make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in set_sample_rate, see there)
// make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in apply_sample_rate, see there)
auto rit = std::find_if(pending.rbegin(), pending.rend(), [delay = sample_delay](const auto &other) { return other.pending_samples < delay; });
pending.insert(rit.base(), std::move(new_pending));
}
Expand Down
2 changes: 2 additions & 0 deletions include/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace fair::meta {

using namespace fair::literals;

struct null_type {};

template<typename... Ts>
struct print_types;

Expand Down
22 changes: 11 additions & 11 deletions test/qa_data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ const boost::ut::suite DataSinkTests = [] {
}
};

expect(data_sink_registry::instance().register_streaming_callback<float>(data_sink_query::with_sink_name("test_sink"), chunk_size, callback));
expect(data_sink_registry::instance().register_streaming_callback<float>(data_sink_query::with_sink_name("test_sink"), chunk_size, callback_with_tags));
expect(data_sink_registry::instance().register_streaming_callback<float>(data_sink_query::sink_name("test_sink"), chunk_size, callback));
expect(data_sink_registry::instance().register_streaming_callback<float>(data_sink_query::sink_name("test_sink"), chunk_size, callback_with_tags));

fair::graph::scheduler::simple sched{ std::move(flow_graph) };
sched.work();
Expand Down Expand Up @@ -317,10 +317,10 @@ const boost::ut::suite DataSinkTests = [] {

std::atomic<std::size_t> samples_seen = 0;

auto poller_data_only = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::with_sink_name("test_sink"), blocking_mode::Blocking);
auto poller_data_only = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking);
expect(neq(poller_data_only, nullptr));

auto poller_with_tags = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::with_sink_name("test_sink"), blocking_mode::Blocking);
auto poller_with_tags = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking);
expect(neq(poller_with_tags, nullptr));

auto runner1 = std::async([poller = poller_data_only] {
Expand Down Expand Up @@ -394,7 +394,7 @@ const boost::ut::suite DataSinkTests = [] {
return v && std::get<std::string>(v->get()) == "TRIGGER";
};

auto poller = data_sink_registry::instance().get_trigger_poller<int32_t>(data_sink_query::with_sink_name("test_sink"), is_trigger, 3, 2, blocking_mode::Blocking);
auto poller = data_sink_registry::instance().get_trigger_poller<int32_t>(data_sink_query::sink_name("test_sink"), is_trigger, 3, 2, blocking_mode::Blocking);
expect(neq(poller, nullptr));

auto polling = std::async([poller] {
Expand Down Expand Up @@ -446,7 +446,7 @@ const boost::ut::suite DataSinkTests = [] {
};

const auto delay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples
auto poller = data_sink_registry::instance().get_snapshot_poller<int32_t>(data_sink_query::with_sink_name("test_sink"), is_trigger, delay, blocking_mode::Blocking);
auto poller = data_sink_registry::instance().get_snapshot_poller<int32_t>(data_sink_query::sink_name("test_sink"), is_trigger, delay, blocking_mode::Blocking);
expect(neq(poller, nullptr));

auto poller_result = std::async([poller] {
Expand Down Expand Up @@ -512,7 +512,7 @@ const boost::ut::suite DataSinkTests = [] {
std::vector<std::shared_ptr<data_sink<int32_t>::dataset_poller>> pollers;

for (const auto &f : factories) {
auto poller = data_sink_registry::instance().get_multiplexed_poller<int32_t>(data_sink_query::with_sink_name("test_sink"), f, 100000, blocking_mode::Blocking);
auto poller = data_sink_registry::instance().get_multiplexed_poller<int32_t>(data_sink_query::sink_name("test_sink"), f, 100000, blocking_mode::Blocking);
expect(neq(poller, nullptr));
pollers.push_back(poller);
}
Expand Down Expand Up @@ -564,7 +564,7 @@ const boost::ut::suite DataSinkTests = [] {

auto is_trigger = [](const tag_t &tag) { return true; };

auto poller = data_sink_registry::instance().get_trigger_poller<float>(data_sink_query::with_sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking);
auto poller = data_sink_registry::instance().get_trigger_poller<float>(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking);
expect(neq(poller, nullptr));

auto polling = std::async([poller] {
Expand Down Expand Up @@ -629,7 +629,7 @@ const boost::ut::suite DataSinkTests = [] {
received_data.push_back(dataset.signal_values.back());
};

data_sink_registry::instance().register_trigger_callback<float>(data_sink_query::with_sink_name("test_sink"), is_trigger, 3000, 2000, callback);
data_sink_registry::instance().register_trigger_callback<float>(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, callback);

fair::graph::scheduler::simple sched{ std::move(flow_graph) };
sched.work();
Expand All @@ -650,10 +650,10 @@ const boost::ut::suite DataSinkTests = [] {

expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

auto invalid_type_poller = data_sink_registry::instance().get_streaming_poller<double>(data_sink_query::with_sink_name("test_sink"));
auto invalid_type_poller = data_sink_registry::instance().get_streaming_poller<double>(data_sink_query::sink_name("test_sink"));
expect(eq(invalid_type_poller, nullptr));

auto poller = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::with_sink_name("test_sink"));
auto poller = data_sink_registry::instance().get_streaming_poller<float>(data_sink_query::sink_name("test_sink"));
expect(neq(poller, nullptr));

auto polling = std::async([poller] {
Expand Down

0 comments on commit dfb7b5e

Please sign in to comment.