Skip to content

Commit

Permalink
Simplify/Unify TagObserver/Predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
frankosterfeld committed Jun 27, 2023
1 parent 1466313 commit 91ee68c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 63 deletions.
41 changes: 18 additions & 23 deletions include/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ namespace fair::graph {

enum class blocking_mode { NonBlocking, Blocking };

enum class trigger_observer_state {
Start, ///< Start a new dataset
Stop, ///< Finish dataset
StopAndStart, ///< Finish pending dataset, start a new one
Ignore ///< Ignore tag
enum class trigger_test_result {
Matching, ///< Start a new dataset
NotMatching, ///< Finish dataset
Ignore ///< Ignore tag
};

// Until clang-format can handle concepts
Expand All @@ -33,13 +32,17 @@ concept DataSetCallback = std::invocable<T, DataSet<V>>;
template<typename T, typename V>
concept StreamCallback = std::invocable<T, std::span<const V>> || std::invocable<T, std::span<const V>, std::span<const tag_t>>;

/**
* For the 'Triggered' and 'Snapshot' acquisition modes.
* Stateless predicate to check whether a tag matches the trigger criteria.
*/
template<typename T>
concept TriggerPredicate = requires(const T p, tag_t tag) {
{ p(tag) } -> std::convertible_to<bool>;
{ p(tag) } -> std::convertible_to<trigger_test_result>;
};

/**
* For the 'Multiplexed' acquisition mode: Stateful object checking all incoming tags to control which data should be sent
* For the 'Multiplexed' acquisition mode: Possibly stateful object checking all incoming tags to control which data should be sent
* to the listener.
*
* A new dataset is started when the observer returns @c Start or @c StopAndStart.
Expand All @@ -55,10 +58,10 @@ concept TriggerPredicate = requires(const T p, tag_t tag) {
* struct color_observer {
* trigger_observer_state operator()(const tag_t &tag) {
* if (tag == green || tag == yellow) {
* return trigger_observer_state::StopAndStart;
* return trigger_observer_state::Matching;
* }
* if (tag == red) {
* return trigger_observer_state::Stop;
* return trigger_observer_state::NotMatching;
* }
*
* return trigger_observer_state::Ignore;
Expand All @@ -70,7 +73,7 @@ concept TriggerPredicate = requires(const T p, tag_t tag) {
*/
template<typename T>
concept TriggerObserver = requires(T o, tag_t tag) {
{ o(tag) } -> std::convertible_to<trigger_observer_state>;
{ o(tag) } -> std::convertible_to<trigger_test_result>;
};

// clang-format on
Expand Down Expand Up @@ -708,7 +711,7 @@ class data_sink : public node<data_sink<T>> {

void
process(std::span<const T> history, std::span<const T> in_data, std::optional<property_map> tag_data0) override {
if (tag_data0 && trigger_predicate(tag_t{ 0, *tag_data0 })) {
if (tag_data0 && trigger_predicate(tag_t{ 0, *tag_data0 }) == trigger_test_result::Matching) {
// TODO fill dataset with metadata etc.
DataSet<T> dataset;
dataset.signal_values.reserve(pre_samples + post_samples); // TODO maybe make the circ. buffer smaller but preallocate these
Expand Down Expand Up @@ -793,24 +796,16 @@ class data_sink : public node<data_sink<T>> {
process(std::span<const T>, std::span<const T> in_data, std::optional<property_map> tag_data0) override {
if (tag_data0) {
const auto obsr = observer(tag_t{ 0, *tag_data0 });
// TODO set proper error state instead of throwing
if (obsr == trigger_observer_state::Stop || obsr == trigger_observer_state::StopAndStart) {
if (obsr == trigger_observer_state::Stop && !pending_dataset) {
throw std::runtime_error("multiplexed: Stop without start");
}

if (obsr == trigger_test_result::NotMatching || obsr == trigger_test_result::Matching) {
if (pending_dataset) {
if (obsr == trigger_observer_state::Stop) {
if (obsr == trigger_test_result::NotMatching) {
pending_dataset->timing_events[0].push_back({ static_cast<tag_t::signed_index_type>(pending_dataset->signal_values.size()), *tag_data0 });
}
publish_dataset(std::move(*pending_dataset));
pending_dataset.reset();
}
}
if (obsr == trigger_observer_state::Start || obsr == trigger_observer_state::StopAndStart) {
if (obsr == trigger_observer_state::Start && pending_dataset) {
throw std::runtime_error("multiplexed: Two starts without stop");
}
if (obsr == trigger_test_result::Matching) {
pending_dataset = DataSet<T>();
pending_dataset->signal_values.reserve(maximum_window_size); // TODO might be too much?
pending_dataset->timing_events = { { { 0, *tag_data0 } } };
Expand Down Expand Up @@ -895,7 +890,7 @@ class data_sink : public node<data_sink<T>> {

void
process(std::span<const T>, std::span<const T> in_data, std::optional<property_map> tag_data0) override {
if (tag_data0 && trigger_predicate({ 0, *tag_data0 })) {
if (tag_data0 && trigger_predicate({ 0, *tag_data0 }) == trigger_test_result::Matching) {
auto new_pending = pending_snapshot{ *tag_data0, sample_delay, sample_delay };
// make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in apply_sample_rate, see there)
auto rit = std::find_if(pending.rbegin(), pending.rend(), [delay = sample_delay](const auto &other) { return other.pending_samples < delay; });
Expand Down
74 changes: 34 additions & 40 deletions test/qa_data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,38 +110,36 @@ struct Observer {
return !same(x, other);
}

trigger_observer_state
trigger_test_result
operator()(const tag_t &tag) {
const auto ty = tag.get("YEAR");
const auto tm = tag.get("MONTH");
const auto td = tag.get("DAY");
if (!ty || !tm || !td) {
return trigger_observer_state::Ignore;
return trigger_test_result::Ignore;
}

const auto tup = std::make_tuple(std::get<int>(ty->get()), std::get<int>(tm->get()), std::get<int>(td->get()));
const auto &[y, m, d] = tup;
const auto ly = last_seen ? std::optional<int>(std::get<0>(*last_seen)) : std::nullopt;
const auto lm = last_seen ? std::optional<int>(std::get<1>(*last_seen)) : std::nullopt;
const auto ld = last_seen ? std::optional<int>(std::get<2>(*last_seen)) : std::nullopt;

const auto year_restart = year && *year == -1 && changed(y, ly);
const auto year_matches = !year || *year == -1 || same(y, year);
const auto month_restart = month && *month == -1 && changed(m, lm);
const auto month_matches = !month || *month == -1 || same(m, month);
const auto day_restart = day && *day == -1 && changed(d, ld);
const auto day_matches = !day || *day == -1 || same(d, day);
const auto matches = year_matches && month_matches && day_matches;
const auto restart = year_restart || month_restart || day_restart;

trigger_observer_state r = trigger_observer_state::Ignore;

if (last_matched && !matches) {
r = trigger_observer_state::Stop;
} else if (!last_matched && matches) {
r = trigger_observer_state::Start;
} else if ((!last_seen || last_matched) && matches && restart) {
r = trigger_observer_state::StopAndStart;
const auto tup = std::make_tuple(std::get<int>(ty->get()), std::get<int>(tm->get()), std::get<int>(td->get()));
const auto &[y, m, d] = tup;
const auto ly = last_seen ? std::optional<int>(std::get<0>(*last_seen)) : std::nullopt;
const auto lm = last_seen ? std::optional<int>(std::get<1>(*last_seen)) : std::nullopt;
const auto ld = last_seen ? std::optional<int>(std::get<2>(*last_seen)) : std::nullopt;

const auto year_restart = year && *year == -1 && changed(y, ly);
const auto year_matches = !year || *year == -1 || same(y, year);
const auto month_restart = month && *month == -1 && changed(m, lm);
const auto month_matches = !month || *month == -1 || same(m, month);
const auto day_restart = day && *day == -1 && changed(d, ld);
const auto day_matches = !day || *day == -1 || same(d, day);
const auto matches = year_matches && month_matches && day_matches;
const auto restart = year_restart || month_restart || day_restart;

trigger_test_result r = trigger_test_result::Ignore;

if (!matches) {
r = trigger_test_result::NotMatching;
} else if (!last_matched || restart) {
r = trigger_test_result::Matching;
}

last_seen = tup;
Expand Down Expand Up @@ -170,24 +168,20 @@ make_test_tags(tag_t::signed_index_type first_index, tag_t::signed_index_type in
}

static std::string
to_ascii_art(std::span<trigger_observer_state> states) {
to_ascii_art(std::span<trigger_test_result> states) {
bool started = false;
std::string r;
for (auto s : states) {
switch (s) {
case trigger_observer_state::Start:
r += started ? "E" : "|#";
case trigger_test_result::Matching:
r += started ? "||#" : "|#";
started = true;
break;
case trigger_observer_state::Stop:
r += started ? "|_" : "E";
case trigger_test_result::NotMatching:
r += started ? "|_" : "_";
started = false;
break;
case trigger_observer_state::StopAndStart:
r += started ? "||#" : "|#";
started = true;
break;
case trigger_observer_state::Ignore: r += started ? "#" : "_"; break;
case trigger_test_result::Ignore: r += started ? "#" : "_"; break;
}
};
return r;
Expand All @@ -196,7 +190,7 @@ to_ascii_art(std::span<trigger_observer_state> states) {
template<TriggerObserver O>
std::string
run_observer_test(std::span<const tag_t> tags, O o) {
std::vector<trigger_observer_state> r;
std::vector<trigger_test_result> r;
r.reserve(tags.size());
for (const auto &tag : tags) {
r.push_back(o(tag));
Expand Down Expand Up @@ -391,7 +385,7 @@ const boost::ut::suite DataSinkTests = [] {

auto is_trigger = [](const tag_t &tag) {
const auto v = tag.get("TYPE");
return v && std::get<std::string>(v->get()) == "TRIGGER";
return v && std::get<std::string>(v->get()) == "TRIGGER" ? trigger_test_result::Matching : trigger_test_result::Ignore;
};

auto poller = data_sink_registry::instance().get_trigger_poller<int32_t>(data_sink_query::sink_name("test_sink"), is_trigger, 3, 2, blocking_mode::Blocking);
Expand Down Expand Up @@ -442,7 +436,7 @@ const boost::ut::suite DataSinkTests = [] {

auto is_trigger = [](const tag_t &tag) {
const auto v = tag.get("TYPE");
return v && std::get<std::string>(v->get()) == "TRIGGER";
return (v && std::get<std::string>(v->get()) == "TRIGGER") ? trigger_test_result::Matching : trigger_test_result::Ignore;
};

const auto delay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples
Expand Down Expand Up @@ -561,7 +555,7 @@ const boost::ut::suite DataSinkTests = [] {

expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

auto is_trigger = [](const tag_t &tag) { return true; };
auto is_trigger = [](const tag_t &tag) { return trigger_test_result::Matching; };

auto poller = data_sink_registry::instance().get_trigger_poller<float>(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking);
expect(neq(poller, nullptr));
Expand Down Expand Up @@ -616,7 +610,7 @@ const boost::ut::suite DataSinkTests = [] {

expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

auto is_trigger = [](const tag_t &) { return true; };
auto is_trigger = [](const tag_t &) { return trigger_test_result::Matching; };

std::mutex m;
std::vector<float> received_data;
Expand Down

0 comments on commit 91ee68c

Please sign in to comment.