From 91ee68cc2f0c2d4986502843726fcf49a0cbe07e Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Tue, 27 Jun 2023 12:43:03 +0200 Subject: [PATCH] Simplify/Unify TagObserver/Predicate --- include/data_sink.hpp | 41 +++++++++++------------- test/qa_data_sink.cpp | 74 ++++++++++++++++++++----------------------- 2 files changed, 52 insertions(+), 63 deletions(-) diff --git a/include/data_sink.hpp b/include/data_sink.hpp index 60b6efade..fe206a3da 100644 --- a/include/data_sink.hpp +++ b/include/data_sink.hpp @@ -14,11 +14,10 @@ namespace fair::graph { enum class blocking_mode { NonBlocking, Blocking }; -enum class trigger_observer_state { - Start, ///< Start a new dataset - Stop, ///< Finish dataset - StopAndStart, ///< Finish pending dataset, start a new one - Ignore ///< Ignore tag +enum class trigger_test_result { + Matching, ///< Start a new dataset + NotMatching, ///< Finish dataset + Ignore ///< Ignore tag }; // Until clang-format can handle concepts @@ -33,13 +32,17 @@ concept DataSetCallback = std::invocable>; template concept StreamCallback = std::invocable> || std::invocable, std::span>; +/** + * For the 'Triggered' and 'Snapshot' acquisition modes. + * Stateless predicate to check whether a tag matches the trigger criteria. + */ template concept TriggerPredicate = requires(const T p, tag_t tag) { - { p(tag) } -> std::convertible_to; + { p(tag) } -> std::convertible_to; }; /** - * For the 'Multiplexed' acquisition mode: Stateful object checking all incoming tags to control which data should be sent + * For the 'Multiplexed' acquisition mode: Possibly stateful object checking all incoming tags to control which data should be sent * to the listener. * * A new dataset is started when the observer returns @c Start or @c StopAndStart. @@ -55,10 +58,10 @@ concept TriggerPredicate = requires(const T p, tag_t tag) { * struct color_observer { * trigger_observer_state operator()(const tag_t &tag) { * if (tag == green || tag == yellow) { - * return trigger_observer_state::StopAndStart; + * return trigger_observer_state::Matching; * } * if (tag == red) { - * return trigger_observer_state::Stop; + * return trigger_observer_state::NotMatching; * } * * return trigger_observer_state::Ignore; @@ -70,7 +73,7 @@ concept TriggerPredicate = requires(const T p, tag_t tag) { */ template concept TriggerObserver = requires(T o, tag_t tag) { - { o(tag) } -> std::convertible_to; + { o(tag) } -> std::convertible_to; }; // clang-format on @@ -708,7 +711,7 @@ class data_sink : public node> { void process(std::span history, std::span in_data, std::optional tag_data0) override { - if (tag_data0 && trigger_predicate(tag_t{ 0, *tag_data0 })) { + if (tag_data0 && trigger_predicate(tag_t{ 0, *tag_data0 }) == trigger_test_result::Matching) { // TODO fill dataset with metadata etc. DataSet dataset; dataset.signal_values.reserve(pre_samples + post_samples); // TODO maybe make the circ. buffer smaller but preallocate these @@ -793,24 +796,16 @@ class data_sink : public node> { process(std::span, std::span in_data, std::optional tag_data0) override { if (tag_data0) { const auto obsr = observer(tag_t{ 0, *tag_data0 }); - // TODO set proper error state instead of throwing - if (obsr == trigger_observer_state::Stop || obsr == trigger_observer_state::StopAndStart) { - if (obsr == trigger_observer_state::Stop && !pending_dataset) { - throw std::runtime_error("multiplexed: Stop without start"); - } - + if (obsr == trigger_test_result::NotMatching || obsr == trigger_test_result::Matching) { if (pending_dataset) { - if (obsr == trigger_observer_state::Stop) { + if (obsr == trigger_test_result::NotMatching) { pending_dataset->timing_events[0].push_back({ static_cast(pending_dataset->signal_values.size()), *tag_data0 }); } publish_dataset(std::move(*pending_dataset)); pending_dataset.reset(); } } - if (obsr == trigger_observer_state::Start || obsr == trigger_observer_state::StopAndStart) { - if (obsr == trigger_observer_state::Start && pending_dataset) { - throw std::runtime_error("multiplexed: Two starts without stop"); - } + if (obsr == trigger_test_result::Matching) { pending_dataset = DataSet(); pending_dataset->signal_values.reserve(maximum_window_size); // TODO might be too much? pending_dataset->timing_events = { { { 0, *tag_data0 } } }; @@ -895,7 +890,7 @@ class data_sink : public node> { void process(std::span, std::span in_data, std::optional tag_data0) override { - if (tag_data0 && trigger_predicate({ 0, *tag_data0 })) { + if (tag_data0 && trigger_predicate({ 0, *tag_data0 }) == trigger_test_result::Matching) { auto new_pending = pending_snapshot{ *tag_data0, sample_delay, sample_delay }; // make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in apply_sample_rate, see there) auto rit = std::find_if(pending.rbegin(), pending.rend(), [delay = sample_delay](const auto &other) { return other.pending_samples < delay; }); diff --git a/test/qa_data_sink.cpp b/test/qa_data_sink.cpp index 8753377d8..462a2e0e6 100644 --- a/test/qa_data_sink.cpp +++ b/test/qa_data_sink.cpp @@ -110,38 +110,36 @@ struct Observer { return !same(x, other); } - trigger_observer_state + trigger_test_result operator()(const tag_t &tag) { const auto ty = tag.get("YEAR"); const auto tm = tag.get("MONTH"); const auto td = tag.get("DAY"); if (!ty || !tm || !td) { - return trigger_observer_state::Ignore; + return trigger_test_result::Ignore; } - const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); - const auto &[y, m, d] = tup; - const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; - const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; - const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; - - const auto year_restart = year && *year == -1 && changed(y, ly); - const auto year_matches = !year || *year == -1 || same(y, year); - const auto month_restart = month && *month == -1 && changed(m, lm); - const auto month_matches = !month || *month == -1 || same(m, month); - const auto day_restart = day && *day == -1 && changed(d, ld); - const auto day_matches = !day || *day == -1 || same(d, day); - const auto matches = year_matches && month_matches && day_matches; - const auto restart = year_restart || month_restart || day_restart; - - trigger_observer_state r = trigger_observer_state::Ignore; - - if (last_matched && !matches) { - r = trigger_observer_state::Stop; - } else if (!last_matched && matches) { - r = trigger_observer_state::Start; - } else if ((!last_seen || last_matched) && matches && restart) { - r = trigger_observer_state::StopAndStart; + const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); + const auto &[y, m, d] = tup; + const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; + const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; + const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; + + const auto year_restart = year && *year == -1 && changed(y, ly); + const auto year_matches = !year || *year == -1 || same(y, year); + const auto month_restart = month && *month == -1 && changed(m, lm); + const auto month_matches = !month || *month == -1 || same(m, month); + const auto day_restart = day && *day == -1 && changed(d, ld); + const auto day_matches = !day || *day == -1 || same(d, day); + const auto matches = year_matches && month_matches && day_matches; + const auto restart = year_restart || month_restart || day_restart; + + trigger_test_result r = trigger_test_result::Ignore; + + if (!matches) { + r = trigger_test_result::NotMatching; + } else if (!last_matched || restart) { + r = trigger_test_result::Matching; } last_seen = tup; @@ -170,24 +168,20 @@ make_test_tags(tag_t::signed_index_type first_index, tag_t::signed_index_type in } static std::string -to_ascii_art(std::span states) { +to_ascii_art(std::span states) { bool started = false; std::string r; for (auto s : states) { switch (s) { - case trigger_observer_state::Start: - r += started ? "E" : "|#"; + case trigger_test_result::Matching: + r += started ? "||#" : "|#"; started = true; break; - case trigger_observer_state::Stop: - r += started ? "|_" : "E"; + case trigger_test_result::NotMatching: + r += started ? "|_" : "_"; started = false; break; - case trigger_observer_state::StopAndStart: - r += started ? "||#" : "|#"; - started = true; - break; - case trigger_observer_state::Ignore: r += started ? "#" : "_"; break; + case trigger_test_result::Ignore: r += started ? "#" : "_"; break; } }; return r; @@ -196,7 +190,7 @@ to_ascii_art(std::span states) { template std::string run_observer_test(std::span tags, O o) { - std::vector r; + std::vector r; r.reserve(tags.size()); for (const auto &tag : tags) { r.push_back(o(tag)); @@ -391,7 +385,7 @@ const boost::ut::suite DataSinkTests = [] { auto is_trigger = [](const tag_t &tag) { const auto v = tag.get("TYPE"); - return v && std::get(v->get()) == "TRIGGER"; + return v && std::get(v->get()) == "TRIGGER" ? trigger_test_result::Matching : trigger_test_result::Ignore; }; auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::sink_name("test_sink"), is_trigger, 3, 2, blocking_mode::Blocking); @@ -442,7 +436,7 @@ const boost::ut::suite DataSinkTests = [] { auto is_trigger = [](const tag_t &tag) { const auto v = tag.get("TYPE"); - return v && std::get(v->get()) == "TRIGGER"; + return (v && std::get(v->get()) == "TRIGGER") ? trigger_test_result::Matching : trigger_test_result::Ignore; }; const auto delay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples @@ -561,7 +555,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const tag_t &tag) { return true; }; + auto is_trigger = [](const tag_t &tag) { return trigger_test_result::Matching; }; auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking); expect(neq(poller, nullptr)); @@ -616,7 +610,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const tag_t &) { return true; }; + auto is_trigger = [](const tag_t &) { return trigger_test_result::Matching; }; std::mutex m; std::vector received_data;