diff --git a/include/data_sink.hpp b/include/data_sink.hpp new file mode 100644 index 00000000..d7d84548 --- /dev/null +++ b/include/data_sink.hpp @@ -0,0 +1,1025 @@ +#ifndef GNURADIO_DATA_SINK_HPP +#define GNURADIO_DATA_SINK_HPP + +#include "circular_buffer.hpp" +#include "dataset.hpp" +#include "history_buffer.hpp" +#include "node.hpp" +#include "tag.hpp" + +#include +#include +#include + +namespace fair::graph { + +enum class blocking_mode { NonBlocking, Blocking }; + +enum class trigger_match_result { + Matching, ///< Start a new dataset + NotMatching, ///< Finish dataset + Ignore ///< Ignore tag +}; + +template +class data_sink; + +// Until clang-format can handle concepts +// clang-format off + +template +concept DataSetCallback = std::invocable>; + +/** + * Stream callback functions receive the span of data, with optional tags and reference to the sink. + */ +template +concept StreamCallback = std::invocable> || std::invocable, std::span> || std::invocable, std::span, const data_sink&>; + +/** + * Used for testing whether a tag should trigger data acquisition. + * + * For the 'Triggered' (data window) and 'Snapshot' (single sample) acquisition modes: + * Stateless predicate to check whether a tag matches the trigger criteria. + * + * @code + * auto matcher = [](const auto &tag) { + * const auto is_trigger = ...check if tag is trigger...; + * return is_trigger ? trigger_match_result::Matching : trigger_match_result::Ignore; + * }; + * @endcode + * + * For the 'Multiplexed' acquisition mode: Possibly stateful object checking all incoming tags to control which data should be sent + * to the listener. + * + * A new dataset is started when the matcher returns @c Start or @c StopAndStart. + * A dataset is closed and sent when @c Stop or @StopAndStart is returned. + * + * For the multiplexed case, the matcher might be stateful and can rely on being called with each incoming tag exactly once, in the order they arrive. + * + * Example: + * + * @code + * // matcher observing three possible tag values, "green", "yellow", "red". + * // starting a dataset when seeing "green", stopping on "red", starting a new dataset on "yellow" + * struct color_matcher { + * matcher_result operator()(const tag_t &tag) { + * if (tag == green || tag == yellow) { + * return trigger_match_result::Matching; + * } + * if (tag == red) { + * return trigger_match_result::NotMatching; + * } + * + * return trigger_match_result::Ignore; + * } + * }; + * @endcode + * + * @see trigger_match_result + */ +template +concept TriggerMatcher = requires(T matcher, tag_t tag) { + { matcher(tag) } -> std::convertible_to; +}; + +// clang-format on + +struct data_sink_query { + std::optional _sink_name; + std::optional _signal_name; + + static data_sink_query + signal_name(std::string_view name) { + return { {}, std::string{ name } }; + } + + static data_sink_query + sink_name(std::string_view name) { + return { std::string{ name }, {} }; + } +}; + +class data_sink_registry { + std::mutex _mutex; + std::vector _sinks; + +public: + // TODO this shouldn't be a singleton but associated with the flow graph (?) + // TODO reconsider mutex usage when moving to the graph + static data_sink_registry & + instance() { + static data_sink_registry s_instance; + return s_instance; + } + + template + void + register_sink(data_sink *sink) { + std::lock_guard lg{ _mutex }; + _sinks.push_back(sink); + } + + template + void + unregister_sink(data_sink *sink) { + std::lock_guard lg{ _mutex }; + std::erase_if(_sinks, [sink](const std::any &v) { + try { + return std::any_cast *>(v) == sink; + } catch (...) { + return false; + } + }); + } + + template + std::shared_ptr::poller> + get_streaming_poller(const data_sink_query &query, blocking_mode block = blocking_mode::Blocking) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + return sink ? sink->get_streaming_poller(block) : nullptr; + } + + template + std::shared_ptr::dataset_poller> + get_trigger_poller(const data_sink_query &query, M matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + return sink ? sink->get_trigger_poller(std::forward(matcher), pre_samples, post_samples, block) : nullptr; + } + + template + std::shared_ptr::dataset_poller> + get_multiplexed_poller(const data_sink_query &query, M matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + return sink ? sink->get_multiplexed_poller(std::forward(matcher), maximum_window_size, block) : nullptr; + } + + template + std::shared_ptr::dataset_poller> + get_snapshot_poller(const data_sink_query &query, M matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + return sink ? sink->get_snapshot_poller(std::forward(matcher), delay, block) : nullptr; + } + + template Callback> + bool + register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback callback) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + if (!sink) { + return false; + } + + sink->register_streaming_callback(max_chunk_size, std::forward(callback)); + return true; + } + + template Callback, TriggerMatcher M> + bool + register_trigger_callback(const data_sink_query &query, M matcher, std::size_t pre_samples, std::size_t post_samples, Callback callback) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + if (!sink) { + return false; + } + + sink->register_trigger_callback(std::forward(matcher), pre_samples, post_samples, std::forward(callback)); + return true; + } + + template Callback, TriggerMatcher M> + bool + register_multiplexed_callback(const data_sink_query &query, M matcher, std::size_t maximum_window_size, Callback callback) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + if (!sink) { + return false; + } + + sink->register_multiplexed_callback(std::forward(matcher), maximum_window_size, std::forward(callback)); + return true; + } + + template Callback, TriggerMatcher M> + bool + register_snapshot_callback(const data_sink_query &query, M matcher, std::chrono::nanoseconds delay, Callback callback) { + std::lock_guard lg{ _mutex }; + auto sink = find_sink(query); + if (!sink) { + return false; + } + + sink->register_snapshot_callback(std::forward(matcher), delay, std::forward(callback)); + return true; + } + +private: + template + data_sink * + find_sink(const data_sink_query &query) { + auto matches = [&query](const std::any &v) { + try { + auto sink = std::any_cast *>(v); + const auto sink_name_matches = !query._sink_name || *query._sink_name == sink->name(); + const auto signal_name_matches = !query._signal_name || *query._signal_name == sink->signal_name; + return sink_name_matches && signal_name_matches; + } catch (...) { + return false; + } + }; + + const auto it = std::find_if(_sinks.begin(), _sinks.end(), matches); + if (it == _sinks.end()) { + return nullptr; + } + + return std::any_cast *>(*it); + } +}; + +namespace detail { +template +inline bool +copy_span(std::span src, std::span dst) { + assert(src.size() <= dst.size()); + if (src.size() > dst.size()) { + return false; + } + std::copy(src.begin(), src.end(), dst.begin()); + return true; +} + +template +inline std::optional +get(const property_map &m, const std::string_view &key) { + const auto it = m.find(key); + if (it == m.end()) { + return {}; + } + + return std::get(it->second); +} + +} // namespace detail + +/** + * @brief generic data sink for exporting arbitrary-typed streams to non-GR C++ APIs. + * + * Each sink registers with a (user-defined/exchangeable) global registry that can be + * queried by the non-GR caller to find the sink responsible for a given signal name, etc. + * and either retrieve a poller handler that allows asynchronous data from a different thread, + * or register a callback that is invoked by the sink if the user-conditions are met. + * + *
+ * @code
+ *         ╔═══════════════╗
+ *    in0 ━╢   data sink   ║                      ┌──── caller ────┐
+ * (err0) ━╢ (opt. error)  ║                      │                │
+ *         ║               ║  retrieve poller or  │ (custom non-GR │
+ *         ║ :signal_name  ║←--------------------→│  user code...) │
+ *         ║ :signal_unit  ║  register            │                │
+ *         ║ :...          ║  callback function   └───┬────────────┘
+ *         ╚═ GR block ═╤══╝                          │
+ *                      │                             │
+ *                      │                             │
+ *                      │      ╭─registry─╮           │
+ *            register/ │      ╞══════════╡           │ queries for specific
+ *          deregister  ╰─────→│ [sinks]  │←──────────╯ signal_info_t list/criteria
+ *                             ╞══════════╡
+ *                             ╰──────────╯
+ *
+ * 
+ * Pollers can be configured to be blocking, i.e. blocks the flow-graph + * if data is not being retrieved in time, or non-blocking, i.e. data being dropped when + * the user-defined buffer size is full. + * N.B. due to the nature of the GR scheduler, signals from the same sink are notified + * synchronuously (/asynchronuously) if handled by the same (/different) sink block. + * + * @tparam T input sample type + */ +template +class data_sink : public node> { + struct abstract_listener; + + static constexpr std::size_t _listener_buffer_size = 65536; + std::deque> _listeners; + std::mutex _listener_mutex; + std::optional> _history; + bool _has_signal_info_from_settings = false; + +public: + Annotated, Unit<"Hz">> sample_rate = 1.f; + Annotated signal_name = std::string("unknown signal"); + Annotated> signal_unit = std::string("a.u."); + Annotated> signal_min = std::numeric_limits::lowest(); + Annotated> signal_max = std::numeric_limits::max(); + + IN in; + + struct poller { + // TODO consider whether reusing port here makes sense + gr::circular_buffer buffer = gr::circular_buffer(_listener_buffer_size); + decltype(buffer.new_reader()) reader = buffer.new_reader(); + decltype(buffer.new_writer()) writer = buffer.new_writer(); + gr::circular_buffer tag_buffer = gr::circular_buffer(1024); + decltype(tag_buffer.new_reader()) tag_reader = tag_buffer.new_reader(); + decltype(tag_buffer.new_writer()) tag_writer = tag_buffer.new_writer(); + std::size_t samples_read = 0; // reader thread + std::atomic finished = false; + std::atomic drop_count = 0; + + template + [[nodiscard]] bool + process(Handler fnc) { + const auto available = reader.available(); + if (available == 0) { + return false; + } + + const auto read_data = reader.get(available); + if constexpr (requires { fnc(std::span(), std::span()); }) { + const auto tags = tag_reader.get(); + const auto it = std::find_if_not(tags.begin(), tags.end(), [until = static_cast(samples_read + available)](const auto &tag) { return tag.index < until; }); + auto relevant_tags = std::vector(tags.begin(), it); + for (auto &t : relevant_tags) { + t.index -= static_cast(samples_read); + } + fnc(read_data, std::span(relevant_tags)); + std::ignore = tag_reader.consume(relevant_tags.size()); + } else { + std::ignore = tag_reader.consume(tag_reader.available()); + fnc(read_data); + } + + std::ignore = reader.consume(available); + samples_read += available; + return true; + } + }; + + struct dataset_poller { + gr::circular_buffer> buffer = gr::circular_buffer>(_listener_buffer_size); + decltype(buffer.new_reader()) reader = buffer.new_reader(); + decltype(buffer.new_writer()) writer = buffer.new_writer(); + + std::atomic finished = false; + std::atomic drop_count = 0; + + [[nodiscard]] bool + process(std::invocable>> auto fnc) { + const auto available = reader.available(); + if (available == 0) { + return false; + } + + const auto read_data = reader.get(available); + fnc(read_data); + std::ignore = reader.consume(available); + return true; + } + }; + + data_sink() { data_sink_registry::instance().register_sink(this); } + + ~data_sink() { + stop(); + data_sink_registry::instance().unregister_sink(this); + } + + void + init(const property_map & /*old_settings*/, const property_map &new_settings) { + if (apply_signal_info(new_settings)) { + _has_signal_info_from_settings = true; + } + } + + std::shared_ptr + get_streaming_poller(blocking_mode block_mode = blocking_mode::Blocking) { + std::lock_guard lg(_listener_mutex); + const auto block = block_mode == blocking_mode::Blocking; + auto handler = std::make_shared(); + add_listener(std::make_unique>(handler, block, *this), block); + return handler; + } + + template + std::shared_ptr + get_trigger_poller(M matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { + const auto block = block_mode == blocking_mode::Blocking; + auto handler = std::make_shared(); + std::lock_guard lg(_listener_mutex); + add_listener(std::make_unique>(std::move(matcher), handler, pre_samples, post_samples, block), block); + ensure_history_size(pre_samples); + return handler; + } + + template + std::shared_ptr + get_multiplexed_poller(M matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { + std::lock_guard lg(_listener_mutex); + const auto block = block_mode == blocking_mode::Blocking; + auto handler = std::make_shared(); + add_listener(std::make_unique>(std::move(matcher), maximum_window_size, handler, block), block); + return handler; + } + + template + std::shared_ptr + get_snapshot_poller(M matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { + const auto block = block_mode == blocking_mode::Blocking; + auto handler = std::make_shared(); + std::lock_guard lg(_listener_mutex); + add_listener(std::make_unique>(std::move(matcher), delay, handler, block), block); + return handler; + } + + template Callback> + void + register_streaming_callback(std::size_t max_chunk_size, Callback callback) { + add_listener(std::make_unique>(max_chunk_size, std::move(callback), *this), false); + } + + template Callback> + void + register_trigger_callback(M matcher, std::size_t pre_samples, std::size_t post_samples, Callback callback) { + add_listener(std::make_unique>(std::move(matcher), pre_samples, post_samples, std::move(callback)), false); + ensure_history_size(pre_samples); + } + + template Callback> + void + register_multiplexed_callback(M matcher, std::size_t maximum_window_size, Callback callback) { + std::lock_guard lg(_listener_mutex); + add_listener(std::make_unique>(std::move(matcher), maximum_window_size, std::move(callback)), false); + } + + template Callback> + void + register_snapshot_callback(M matcher, std::chrono::nanoseconds delay, Callback callback) { + std::lock_guard lg(_listener_mutex); + add_listener(std::make_unique>(std::move(matcher), delay, std::move(callback)), false); + } + + // TODO this code should be called at the end of graph processing + void + stop() noexcept { + std::lock_guard lg(_listener_mutex); + for (auto &listener : _listeners) { + listener->stop(); + } + } + + [[nodiscard]] work_return_t + process_bulk(std::span in_data) noexcept { + std::optional tagData; + if (this->input_tags_present()) { + assert(this->input_tags()[0].index == 0); + tagData = this->input_tags()[0].map; + // signal info from settings overrides info from tags + if (!_has_signal_info_from_settings) { + apply_signal_info(this->input_tags()[0].map); + } + } + + { + std::lock_guard lg(_listener_mutex); // TODO review/profile if a lock-free data structure should be used here + const auto history_view = _history ? _history->get_span(0) : std::span(); + std::erase_if(_listeners, [](const auto &l) { return l->expired; }); + for (auto &listener : _listeners) { + listener->process(history_view, in_data, tagData); + } + if (_history) { + // store potential pre-samples for triggers at the beginning of the next chunk + const auto to_write = std::min(in_data.size(), _history->capacity()); + _history->push_back_bulk(in_data.last(to_write)); + } + } + + return work_return_t::OK; + } + +private: + bool + apply_signal_info(const property_map &properties) { + try { + const auto srate = detail::get(properties, tag::SAMPLE_RATE.key()); + const auto name = detail::get(properties, tag::SIGNAL_NAME.key()); + const auto unit = detail::get(properties, tag::SIGNAL_UNIT.key()); + const auto min = detail::get(properties, tag::SIGNAL_MIN.key()); + const auto max = detail::get(properties, tag::SIGNAL_MAX.key()); + + // commit + if (srate) { + sample_rate = *srate; + } + if (name) { + signal_name = *name; + } + if (unit) { + signal_unit = *unit; + } + if (min) { + signal_min = *min; + } + if (max) { + signal_max = *max; + } + + // forward to listeners + if (srate || name || unit || min || max) { + const auto dstempl = make_dataset_template(); + + std::lock_guard lg{ _listener_mutex }; + for (auto &l : _listeners) { + if (srate) { + l->apply_sample_rate(sample_rate); + } + if (name || unit || min || max) { + l->set_dataset_template(dstempl); + } + } + } + return name || unit || min || max; + } catch (const std::bad_variant_access &) { + // TODO log? + return false; + } + } + + DataSet + make_dataset_template() const { + DataSet dstempl; + dstempl.signal_names = { signal_name }; + dstempl.signal_units = { signal_unit }; + dstempl.signal_ranges = { { signal_min, signal_max } }; + return dstempl; + } + + void + ensure_history_size(std::size_t new_size) { + const auto old_size = _history ? _history->capacity() : std::size_t{0}; + if (new_size <= old_size) { + return; + } + // TODO Important! + // - History size must be limited to avoid users causing OOM + // - History should shrink again + + // transitional, do not reallocate/copy, but create a shared buffer with size N, + // and a per-listener history buffer where more than N samples is needed. + auto new_history = gr::history_buffer(new_size); + if (_history) { + new_history.push_back_bulk(_history->begin(), _history->end()); + } + _history = new_history; + } + + void + add_listener(std::unique_ptr &&l, bool block) { + l->set_dataset_template(make_dataset_template()); + l->apply_sample_rate(sample_rate); + if (block) { + _listeners.push_back(std::move(l)); + } else { + _listeners.push_front(std::move(l)); + } + } + + struct abstract_listener { + bool expired = false; + + virtual ~abstract_listener() = default; + + void + set_expired() { + expired = true; + } + + virtual void + apply_sample_rate(float /*sample_rate*/) {} + + virtual void + set_dataset_template(DataSet) {} + + virtual void + process(std::span history, std::span data, std::optional tag_data0) + = 0; + virtual void + stop() = 0; + }; + + template + struct continuous_listener : public abstract_listener { + static constexpr auto has_callback = !std::is_same_v; + static constexpr auto callback_takes_tags = std::is_invocable_v, std::span> + || std::is_invocable_v, std::span, const data_sink &>; + + const data_sink &parent_sink; + bool block = false; + std::size_t samples_written = 0; + + // callback-only + std::size_t buffer_fill = 0; + std::vector buffer; + std::vector tag_buffer; + + // polling-only + std::weak_ptr polling_handler = {}; + + Callback callback; + + explicit continuous_listener(std::size_t max_chunk_size, Callback c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} + + explicit continuous_listener(std::shared_ptr poller, bool do_block, const data_sink &parent) : parent_sink(parent), block(do_block), polling_handler{ std::move(poller) } {} + + inline void + call_callback(std::span data, std::span tags) { + if constexpr (std::is_invocable_v, std::span, const data_sink &>) { + callback(std::move(data), std::move(tags), parent_sink); + } else if constexpr (std::is_invocable_v, std::span>) { + callback(std::move(data), std::move(tags)); + } else { + callback(std::move(data)); + } + } + + void + process(std::span, std::span data, std::optional tag_data0) override { + using namespace fair::graph::detail; + + if constexpr (has_callback) { + // if there's pending data, fill buffer and send out + if (buffer_fill > 0) { + const auto n = std::min(data.size(), buffer.size() - buffer_fill); + detail::copy_span(data.first(n), std::span(buffer).subspan(buffer_fill, n)); + if constexpr (callback_takes_tags) { + if (tag_data0) { + tag_buffer.push_back({ static_cast(buffer_fill), *tag_data0 }); + tag_data0.reset(); + } + } + buffer_fill += n; + if (buffer_fill == buffer.size()) { + call_callback(std::span(buffer), std::span(tag_buffer)); + samples_written += buffer.size(); + buffer_fill = 0; + tag_buffer.clear(); + } + + data = data.last(data.size() - n); + } + + // send out complete chunks directly + while (data.size() >= buffer.size()) { + if constexpr (callback_takes_tags) { + std::vector tags; + if (tag_data0) { + tags.push_back({ 0, std::move(*tag_data0) }); + tag_data0.reset(); + } + call_callback(data.first(buffer.size()), std::span(tags)); + } else { + callback(data.first(buffer.size())); + } + samples_written += buffer.size(); + data = data.last(data.size() - buffer.size()); + } + + // write remaining data to the buffer + if (!data.empty()) { + detail::copy_span(data, std::span(buffer).first(data.size())); + buffer_fill = data.size(); + if constexpr (callback_takes_tags) { + if (tag_data0) { + tag_buffer.push_back({ 0, std::move(*tag_data0) }); + } + } + } + } else { + auto poller = polling_handler.lock(); + if (!poller) { + this->set_expired(); + return; + } + + const auto to_write = block ? data.size() : std::min(data.size(), poller->writer.available()); + + if (to_write > 0) { + if (tag_data0) { + auto tw = poller->tag_writer.reserve_output_range(1); + tw[0] = { static_cast(samples_written), std::move(*tag_data0) }; + tw.publish(1); + } + auto write_data = poller->writer.reserve_output_range(to_write); + detail::copy_span(data.first(to_write), std::span(write_data)); + write_data.publish(write_data.size()); + } + poller->drop_count += data.size() - to_write; + samples_written += to_write; + } + } + + void + stop() override { + if constexpr (has_callback) { + if (buffer_fill > 0) { + call_callback(std::span(buffer).first(buffer_fill), std::span(tag_buffer)); + tag_buffer.clear(); + buffer_fill = 0; + } + } else { + if (auto p = polling_handler.lock()) { + p->finished = true; + } + } + } + }; + + struct pending_window { + DataSet dataset; + std::size_t pending_post_samples = 0; + }; + + template + struct trigger_listener : public abstract_listener { + bool block = false; + std::size_t pre_samples = 0; + std::size_t post_samples = 0; + + DataSet dataset_template; + M trigger_matcher = {}; + std::deque pending_trigger_windows; // triggers that still didn't receive all their data + std::weak_ptr polling_handler = {}; + + Callback callback; + + explicit trigger_listener(M matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) + : block(do_block), pre_samples(pre), post_samples(post), trigger_matcher(std::move(matcher)), polling_handler{ std::move(handler) } {} + + explicit trigger_listener(M matcher, std::size_t pre, std::size_t post, Callback cb) : pre_samples(pre), post_samples(post), trigger_matcher(std::move(matcher)), callback{ std::move(cb) } {} + + void + set_dataset_template(DataSet dst) override { + dataset_template = std::move(dst); + } + + inline void + publish_dataset(DataSet &&data) { + if constexpr (!std::is_same_v) { + callback(std::move(data)); + } else { + auto poller = polling_handler.lock(); + if (!poller) { + this->set_expired(); + return; + } + + auto write_data = poller->writer.reserve_output_range(1); + if (block) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + if (poller->writer.available() > 0) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + poller->drop_count++; + } + } + } + } + + void + process(std::span history, std::span in_data, std::optional tag_data0) override { + if (tag_data0 && trigger_matcher(tag_t{ 0, *tag_data0 }) == trigger_match_result::Matching) { + DataSet dataset = dataset_template; + dataset.signal_values.reserve(pre_samples + post_samples); // TODO maybe make the circ. buffer smaller but preallocate these + + const auto pre_sample_view = history.last(std::min(pre_samples, history.size())); + dataset.signal_values.insert(dataset.signal_values.end(), pre_sample_view.begin(), pre_sample_view.end()); + + dataset.timing_events = { { { static_cast(pre_sample_view.size()), *tag_data0 } } }; + pending_trigger_windows.push_back({ .dataset = std::move(dataset), .pending_post_samples = post_samples }); + } + + auto window = pending_trigger_windows.begin(); + while (window != pending_trigger_windows.end()) { + const auto post_sample_view = in_data.first(std::min(window->pending_post_samples, in_data.size())); + window->dataset.signal_values.insert(window->dataset.signal_values.end(), post_sample_view.begin(), post_sample_view.end()); + window->pending_post_samples -= post_sample_view.size(); + + if (window->pending_post_samples == 0) { + this->publish_dataset(std::move(window->dataset)); + window = pending_trigger_windows.erase(window); + } else { + ++window; + } + } + } + + void + stop() override { + for (auto &window : pending_trigger_windows) { + if (!window.dataset.signal_values.empty()) { + this->publish_dataset(std::move(window.dataset)); + } + } + pending_trigger_windows.clear(); + if (auto p = polling_handler.lock()) { + p->finished = true; + } + } + }; + + template + struct multiplexed_listener : public abstract_listener { + bool block = false; + M matcher; + DataSet dataset_template; + std::optional> pending_dataset; + std::size_t maximum_window_size; + std::weak_ptr polling_handler = {}; + Callback callback; + + explicit multiplexed_listener(M matcher_, std::size_t max_window_size, Callback cb) : matcher(std::move(matcher_)), maximum_window_size(max_window_size), callback(cb) {} + + explicit multiplexed_listener(M matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) + : block(do_block), matcher(std::move(matcher_)), maximum_window_size(max_window_size), polling_handler{ std::move(handler) } {} + + void + set_dataset_template(DataSet dst) override { + dataset_template = std::move(dst); + } + + inline void + publish_dataset(DataSet &&data) { + if constexpr (!std::is_same_v) { + callback(std::move(data)); + } else { + auto poller = polling_handler.lock(); + if (!poller) { + this->set_expired(); + return; + } + + auto write_data = poller->writer.reserve_output_range(1); + if (block) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + if (poller->writer.available() > 0) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + poller->drop_count++; + } + } + } + } + + void + process(std::span, std::span in_data, std::optional tag_data0) override { + if (tag_data0) { + const auto obsr = matcher(tag_t{ 0, *tag_data0 }); + if (obsr == trigger_match_result::NotMatching || obsr == trigger_match_result::Matching) { + if (pending_dataset) { + if (obsr == trigger_match_result::NotMatching) { + pending_dataset->timing_events[0].push_back({ static_cast(pending_dataset->signal_values.size()), *tag_data0 }); + } + this->publish_dataset(std::move(*pending_dataset)); + pending_dataset.reset(); + } + } + if (obsr == trigger_match_result::Matching) { + pending_dataset = dataset_template; + pending_dataset->signal_values.reserve(maximum_window_size); // TODO might be too much? + pending_dataset->timing_events = { { { 0, *tag_data0 } } }; + } + } + if (pending_dataset) { + const auto to_write = std::min(in_data.size(), maximum_window_size - pending_dataset->signal_values.size()); + const auto view = in_data.first(to_write); + pending_dataset->signal_values.insert(pending_dataset->signal_values.end(), view.begin(), view.end()); + + if (pending_dataset->signal_values.size() == maximum_window_size) { + this->publish_dataset(std::move(*pending_dataset)); + pending_dataset.reset(); + } + } + } + + void + stop() override { + if (pending_dataset) { + this->publish_dataset(std::move(*pending_dataset)); + pending_dataset.reset(); + } + if (auto p = polling_handler.lock()) { + p->finished = true; + } + } + }; + + struct pending_snapshot { + property_map tag_data; + std::size_t delay = 0; + std::size_t pending_samples = 0; + }; + + template + struct snapshot_listener : public abstract_listener { + bool block = false; + std::chrono::nanoseconds time_delay; + std::size_t sample_delay = 0; + DataSet dataset_template; + M trigger_matcher = {}; + std::deque pending; + std::weak_ptr polling_handler = {}; + Callback callback; + + explicit snapshot_listener(M matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) + : block(do_block), time_delay(delay), trigger_matcher(std::move(matcher)), polling_handler{ std::move(poller) } {} + + explicit snapshot_listener(M matcher, std::chrono::nanoseconds delay, Callback cb) : time_delay(delay), trigger_matcher(std::move(matcher)), callback(std::move(cb)) {} + + void + set_dataset_template(DataSet dst) override { + dataset_template = std::move(dst); + } + + void + apply_sample_rate(float rateHz) override { + sample_delay = std::round(std::chrono::duration_cast>(time_delay).count() * rateHz); + // TODO do we need to update the requested_samples of pending here? (considering both old and new time_delay) + } + + inline void + publish_dataset(DataSet &&data) { + if constexpr (!std::is_same_v) { + callback(std::move(data)); + } else { + auto poller = polling_handler.lock(); + if (!poller) { + this->set_expired(); + return; + } + + auto write_data = poller->writer.reserve_output_range(1); + if (block) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + if (poller->writer.available() > 0) { + write_data[0] = std::move(data); + write_data.publish(1); + } else { + poller->drop_count++; + } + } + } + } + + void + process(std::span, std::span in_data, std::optional tag_data0) override { + if (tag_data0 && trigger_matcher({ 0, *tag_data0 }) == trigger_match_result::Matching) { + auto new_pending = pending_snapshot{ *tag_data0, sample_delay, sample_delay }; + // make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in apply_sample_rate, see there) + auto rit = std::find_if(pending.rbegin(), pending.rend(), [delay = sample_delay](const auto &other) { return other.pending_samples < delay; }); + pending.insert(rit.base(), std::move(new_pending)); + } + + auto it = pending.begin(); + while (it != pending.end()) { + if (it->pending_samples >= in_data.size()) { + it->pending_samples -= in_data.size(); + break; + } + + DataSet dataset = dataset_template; + dataset.timing_events = { { { -static_cast(it->delay), std::move(it->tag_data) } } }; + dataset.signal_values = { in_data[it->pending_samples] }; + this->publish_dataset(std::move(dataset)); + + it = pending.erase(it); + } + } + + void + stop() override { + pending.clear(); + if (auto p = polling_handler.lock()) { + p->finished = true; + } + } + }; +}; + +} // namespace fair::graph + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fair::graph::data_sink), in, sample_rate, signal_name, signal_unit, signal_min, signal_max); + +#endif diff --git a/include/utils.hpp b/include/utils.hpp index 41a17ce9..7d8e0bbd 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -33,6 +33,8 @@ namespace fair::meta { using namespace fair::literals; +struct null_type {}; + template struct print_types; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5baae9d6..d12e5820 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -28,6 +28,7 @@ function(add_app_test TEST_NAME) endfunction() add_ut_test(qa_buffer) +add_ut_test(qa_data_sink) add_ut_test(qa_dynamic_port) add_ut_test(qa_dynamic_node) add_ut_test(qa_hier_node) diff --git a/test/qa_data_sink.cpp b/test/qa_data_sink.cpp new file mode 100644 index 00000000..2bfc9182 --- /dev/null +++ b/test/qa_data_sink.cpp @@ -0,0 +1,743 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#if defined(__clang__) && __clang_major__ >= 16 +// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection +template<> +auto boost::ut::cfg = boost::ut::runner>{}; +#endif + +template<> +struct fmt::formatter { + template + constexpr auto + parse(ParseContext &ctx) { + return ctx.begin(); + } + + template + constexpr auto + format(const fair::graph::tag_t &tag, FormatContext &ctx) const { + return fmt::format_to(ctx.out(), "{}", tag.index); + } +}; + +namespace fair::graph::data_sink_test { + +static constexpr std::int32_t n_samples = 200000; + +template +struct Source : public node> { + OUT out; + std::int32_t n_samples_produced = 0; + std::int32_t n_samples_max = 1024; + std::size_t n_tag_offset = 0; + float sample_rate = 1000.0f; + T next_value = {}; + std::size_t next_tag = 0; + std::vector tags; // must be sorted by index, only one tag per sample + + void + init(const property_map &, const property_map &) { + // optional init function that is called after construction and whenever settings change + fair::graph::publish_tag(out, { { "n_samples_max", n_samples_max } }, n_tag_offset); + } + + constexpr std::make_signed_t + available_samples(const Source &) noexcept { + // TODO unify with other test sources + // split into chunks so that we have a single tag at index 0 (or none) + auto ret = static_cast>(n_samples_max - n_samples_produced); + if (next_tag < tags.size()) { + if (n_samples_produced < tags[next_tag].index) { + ret = tags[next_tag].index - n_samples_produced; + } else if (next_tag + 1 < tags.size()) { + // tag at first sample? then read up until before next tag + ret = tags[next_tag+1].index - n_samples_produced; + } + } + + return ret > 0 ? ret : -1; // '-1' -> DONE, produced enough samples + } + + T + process_one() noexcept { + if (next_tag < tags.size() && tags[next_tag].index <= static_cast>(n_samples_produced)) { + tag_t &out_tag = this->output_tags()[0]; + // TODO when not enforcing single samples in available_samples, one would have to do: + // const auto base = std::max(out.streamWriter().position() + 1, tag_t::signed_index_type{0}); + // out_tag = tag_t{ tags[next_tag].index - base, tags[next_tag].map }; + // Still think there could be nicer API to set a tag from process_one() + out_tag = tag_t{ 0, tags[next_tag].map }; + this->forward_tags(); + next_tag++; + } + + n_samples_produced++; + return next_value++; + } +}; + +/** + * Example tag matcher (TriggerMatcher implementation) for the multiplexed listener case (interleaved data). As a toy example, we use + * data tagged as Year/Month/Day. + * + * For each of year, month, day, the user can specify whether: + * + * - option not set: The field is to be ignored + * - -1: Whenever a change between the previous and the current tag is observed, start a new data set (StopAndStart) + * - other values >= 0: A new dataset is started when the tag matches, and stopped, when a tag doesn't match + * + * (Note that this Matcher is stateful and remembers the last tag seen) + */ +struct Matcher { + std::optional year; + std::optional month; + std::optional day; + std::optional> last_seen; + bool last_matched = false; + + explicit Matcher(std::optional year_, std::optional month_, std::optional day_) : year(year_), month(month_), day(day_) {} + + static inline bool + same(int x, std::optional other) { + return other && x == *other; + } + + static inline bool + changed(int x, std::optional other) { + return !same(x, other); + } + + trigger_match_result + operator()(const tag_t &tag) { + const auto ty = tag.get("YEAR"); + const auto tm = tag.get("MONTH"); + const auto td = tag.get("DAY"); + if (!ty || !tm || !td) { + return trigger_match_result::Ignore; + } + + const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); + const auto &[y, m, d] = tup; + const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; + const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; + const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; + + const auto year_restart = year && *year == -1 && changed(y, ly); + const auto year_matches = !year || *year == -1 || same(y, year); + const auto month_restart = month && *month == -1 && changed(m, lm); + const auto month_matches = !month || *month == -1 || same(m, month); + const auto day_restart = day && *day == -1 && changed(d, ld); + const auto day_matches = !day || *day == -1 || same(d, day); + const auto matches = year_matches && month_matches && day_matches; + const auto restart = year_restart || month_restart || day_restart; + + trigger_match_result r = trigger_match_result::Ignore; + + if (!matches) { + r = trigger_match_result::NotMatching; + } else if (!last_matched || restart) { + r = trigger_match_result::Matching; + } + + last_seen = tup; + last_matched = matches; + return r; + } +}; + +static tag_t +make_tag(tag_t::signed_index_type index, int year, int month, int day) { + return tag_t{ index, { { "YEAR", year }, { "MONTH", month }, { "DAY", day } } }; +} + +static std::vector +make_test_tags(tag_t::signed_index_type first_index, tag_t::signed_index_type interval) { + std::vector tags; + for (int y = 1; y <= 3; ++y) { + for (int m = 1; m <= 2; ++m) { + for (int d = 1; d <= 3; ++d) { + tags.push_back(make_tag(first_index, y, m, d)); + first_index += interval; + } + } + } + return tags; +} + +static std::string +to_ascii_art(std::span states) { + bool started = false; + std::string r; + for (auto s : states) { + switch (s) { + case trigger_match_result::Matching: + r += started ? "||#" : "|#"; + started = true; + break; + case trigger_match_result::NotMatching: + r += started ? "|_" : "_"; + started = false; + break; + case trigger_match_result::Ignore: r += started ? "#" : "_"; break; + } + }; + return r; +} + +template +std::string +run_matcher_test(std::span tags, M o) { + std::vector r; + r.reserve(tags.size()); + for (const auto &tag : tags) { + r.push_back(o(tag)); + } + return to_ascii_art(r); +} + +} // namespace fair::graph::data_sink_test + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fair::graph::data_sink_test::Source), out, n_samples_produced, n_samples_max, n_tag_offset, sample_rate); + +template +std::string +format_list(const T &l) { + return fmt::format("[{}]", fmt::join(l, ", ")); +} + +template +bool +indexes_match(const T &lhs, const U &rhs) { + auto index_match = [](const auto &l, const auto &r) { return l.index == r.index; }; + + return std::equal(std::begin(lhs), std::end(lhs), std::begin(rhs), std::end(rhs), index_match); +} + +const boost::ut::suite DataSinkTests = [] { + using namespace boost::ut; + using namespace fair::graph; + using namespace fair::graph::data_sink_test; + using namespace std::string_literals; + + "callback continuous mode"_test = [] { + static constexpr std::int32_t n_samples = 200005; + static constexpr std::size_t chunk_size = 1000; + + const auto src_tags = make_test_tags(0, 1000); + + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + auto &sink = flow_graph.make_node>(); + src.tags = src_tags; + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + std::atomic samples_seen1 = 0; + std::atomic chunks_seen1 = 0; + auto callback = [&samples_seen1, &chunks_seen1](std::span buffer) { + for (std::size_t i = 0; i < buffer.size(); ++i) { + expect(eq(buffer[i], static_cast(samples_seen1 + i))); + } + + samples_seen1 += buffer.size(); + chunks_seen1++; + if (chunks_seen1 < 201) { + expect(eq(buffer.size(), chunk_size)); + } else { + expect(eq(buffer.size(), 5)); + } + }; + + std::mutex m2; + std::size_t samples_seen2 = 0; + std::size_t chunks_seen2 = 0; + std::vector received_tags; + auto callback_with_tags = [&samples_seen2, &chunks_seen2, &m2, &received_tags](std::span buffer, std::span tags) { + for (std::size_t i = 0; i < buffer.size(); ++i) { + expect(eq(buffer[i], static_cast(samples_seen2 + i))); + } + + for (const auto &tag : tags) { + expect(ge(tag.index, 0)); + expect(lt(tag.index, buffer.size())); + } + + auto lg = std::lock_guard{ m2 }; + std::vector adjusted; + std::transform(tags.begin(), tags.end(), std::back_inserter(adjusted), [samples_seen2](const auto &tag) { + return tag_t{ static_cast(samples_seen2) + tag.index, tag.map }; + }); + received_tags.insert(received_tags.end(), adjusted.begin(), adjusted.end()); + samples_seen2 += buffer.size(); + chunks_seen2++; + if (chunks_seen2 < 201) { + expect(eq(buffer.size(), chunk_size)); + } else { + expect(eq(buffer.size(), 5)); + } + }; + + auto callback_with_tags_and_sink = [&sink](std::span, std::span, const data_sink &passed_sink) { + expect(eq(passed_sink.name(), "test_sink"s)); + expect(eq(sink.unique_name, passed_sink.unique_name)); + }; + + expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback)); + expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback_with_tags)); + expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback_with_tags_and_sink)); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + auto lg = std::lock_guard{ m2 }; + expect(eq(chunks_seen1.load(), 201)); + expect(eq(chunks_seen2, 201)); + expect(eq(samples_seen1.load(), n_samples)); + expect(eq(samples_seen2, n_samples)); + expect(eq(indexes_match(received_tags, src_tags), true)) << fmt::format("{} != {}", format_list(received_tags), format_list(src_tags)); + }; + + "blocking polling continuous mode"_test = [] { + constexpr std::int32_t n_samples = 200000; + + graph flow_graph; + const auto tags = make_test_tags(0, 1000); + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + src.tags = tags; + auto &sink = flow_graph.make_node>(); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + std::atomic samples_seen = 0; + + auto poller_data_only = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); + expect(neq(poller_data_only, nullptr)); + + auto poller_with_tags = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); + expect(neq(poller_with_tags, nullptr)); + + auto runner1 = std::async([poller = poller_data_only] { + std::vector received; + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished; + while (poller->process([&received](const auto &data) { received.insert(received.end(), data.begin(), data.end()); })) { + } + } + + return received; + }); + + auto runner2 = std::async([poller = poller_with_tags] { + std::vector received; + std::vector received_tags; + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished; + while (poller->process([&received, &received_tags](const auto &data, const auto &tags_) { + auto rtags = std::vector(tags_.begin(), tags_.end()); + for (auto &t : rtags) { + t.index += static_cast(received.size()); + } + received_tags.insert(received_tags.end(), rtags.begin(), rtags.end()); + received.insert(received.end(), data.begin(), data.end()); + })) { + } + } + + return std::make_tuple(received, received_tags); + }); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + std::vector expected(n_samples); + std::iota(expected.begin(), expected.end(), 0.0); + + const auto received1 = runner1.get(); + const auto &[received2, received_tags] = runner2.get(); + expect(eq(received1.size(), expected.size())); + expect(eq(received1, expected)); + expect(eq(poller_data_only->drop_count.load(), 0)); + expect(eq(received2.size(), expected.size())); + expect(eq(received2, expected)); + expect(eq(received_tags.size(), tags.size())); + expect(eq(indexes_match(received_tags, tags), true)) << fmt::format("{} != {}", format_list(received_tags), format_list(tags)); + expect(eq(poller_with_tags->drop_count.load(), 0)); + }; + + "blocking polling trigger mode non-overlapping"_test = [] { + constexpr std::int32_t n_samples = 200000; + + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + const auto tags = std::vector{ { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; + src.tags = tags; + auto &sink = flow_graph.make_node>( + { { "signal_name", "test signal" }, { "signal_unit", "none" }, { "signal_min", int32_t{ 0 } }, { "signal_max", int32_t{ n_samples - 1 } } }); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + auto is_trigger = [](const tag_t &tag) { + const auto v = tag.get("TYPE"); + return v && std::get(v->get()) == "TRIGGER" ? trigger_match_result::Matching : trigger_match_result::Ignore; + }; + + // lookup by signal name + auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::signal_name("test signal"), is_trigger, 3, 2, blocking_mode::Blocking); + expect(neq(poller, nullptr)); + + auto polling = std::async([poller] { + std::vector received_data; + std::vector received_tags; + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished; + [[maybe_unused]] auto r = poller->process([&received_data, &received_tags](const auto &datasets) { + for (const auto &dataset : datasets) { + received_data.insert(received_data.end(), dataset.signal_values.begin(), dataset.signal_values.end()); + // signal info from sink settings + expect(eq(dataset.signal_names.size(), 1u)); + expect(eq(dataset.signal_units.size(), 1u)); + expect(eq(dataset.signal_ranges.size(), 1u)); + expect(eq(dataset.timing_events.size(), 1u)); + expect(eq(dataset.signal_names[0], "test signal"s)); + expect(eq(dataset.signal_units[0], "none"s)); + expect(eq(dataset.signal_ranges[0], std::vector{ 0, n_samples - 1 })); + expect(eq(dataset.timing_events[0].size(), 1u)); + expect(eq(dataset.timing_events[0][0].index, 3)); + received_tags.insert(received_tags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); + } + }); + } + return std::make_tuple(received_data, received_tags); + }); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + const auto &[received_data, received_tags] = polling.get(); + const auto expected_tags = { tags[0], tags[2] }; // triggers-only + + expect(eq(received_data.size(), 10)); + expect(eq(received_data, std::vector{ 2997, 2998, 2999, 3000, 3001, 179997, 179998, 179999, 180000, 180001 })); + expect(eq(received_tags.size(), expected_tags.size())); + + expect(eq(poller->drop_count.load(), 0)); + }; + + "blocking snapshot mode"_test = [] { + constexpr std::int32_t n_samples = 200000; + + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + src.tags = { { 0, + { { std::string(tag::SIGNAL_NAME.key()), "test signal" }, + { std::string(tag::SIGNAL_UNIT.key()), "none" }, + { std::string(tag::SIGNAL_MIN.key()), int32_t{ 0 } }, + { std::string(tag::SIGNAL_MAX.key()), n_samples - 1 } } }, + { 3000, { { "TYPE", "TRIGGER" } } }, + { 8000, { { "TYPE", "NO_TRIGGER" } } }, + { 180000, { { "TYPE", "TRIGGER" } } } }; + auto &sink = flow_graph.make_node>({ { "sample_rate", 10000.f } }); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + auto is_trigger = [](const tag_t &tag) { + const auto v = tag.get("TYPE"); + return (v && std::get(v->get()) == "TRIGGER") ? trigger_match_result::Matching : trigger_match_result::Ignore; + }; + + const auto delay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples + auto poller = data_sink_registry::instance().get_snapshot_poller(data_sink_query::sink_name("test_sink"), is_trigger, delay, blocking_mode::Blocking); + expect(neq(poller, nullptr)); + + std::vector received_data_cb; + + auto callback = [&received_data_cb](const auto &dataset) { received_data_cb.insert(received_data_cb.end(), dataset.signal_values.begin(), dataset.signal_values.end()); }; + + expect(data_sink_registry::instance().register_snapshot_callback(data_sink_query::sink_name("test_sink"), is_trigger, delay, callback)); + + auto poller_result = std::async([poller] { + std::vector received_data; + + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished; + [[maybe_unused]] auto r = poller->process([&received_data](const auto &datasets) { + for (const auto &dataset : datasets) { + // signal info from tags + expect(eq(dataset.signal_names.size(), 1u)); + expect(eq(dataset.signal_units.size(), 1u)); + expect(eq(dataset.signal_ranges.size(), 1u)); + expect(eq(dataset.timing_events.size(), 1u)); + expect(eq(dataset.signal_names[0], "test signal"s)); + expect(eq(dataset.signal_units[0], "none"s)); + expect(eq(dataset.signal_ranges[0], std::vector{ 0, n_samples - 1 })); + expect(eq(dataset.timing_events[0].size(), 1u)); + expect(eq(dataset.timing_events[0][0].index, -5000)); + received_data.insert(received_data.end(), dataset.signal_values.begin(), dataset.signal_values.end()); + } + }); + } + + return received_data; + }); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + const auto received_data = poller_result.get(); + expect(eq(received_data_cb, received_data)); + expect(eq(received_data, std::vector{ 8000, 185000 })); + expect(eq(poller->drop_count.load(), 0)); + }; + + "blocking multiplexed mode"_test = [] { + const auto tags = make_test_tags(0, 10000); + + const std::int32_t n_samples = tags.size() * 10000 + 100000; + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + src.tags = tags; + auto &sink = flow_graph.make_node>(); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + { + const auto t = std::span(tags); + + // Test the test matcher + expect(eq(run_matcher_test(t, Matcher({}, -1, {})), "|###||###||###||###||###||###"s)); + expect(eq(run_matcher_test(t, Matcher(-1, {}, {})), "|######||######||######"s)); + expect(eq(run_matcher_test(t, Matcher(1, {}, {})), "|######|____________"s)); + expect(eq(run_matcher_test(t, Matcher(1, {}, 2)), "_|#|__|#|_____________"s)); + expect(eq(run_matcher_test(t, Matcher({}, {}, 1)), "|#|__|#|__|#|__|#|__|#|__|#|__"s)); + } + + const auto matchers = std::array{ Matcher({}, -1, {}), Matcher(-1, {}, {}), Matcher(1, {}, {}), Matcher(1, {}, 2), Matcher({}, {}, 1) }; + + // Following the patterns above, where each #/_ is 10000 samples + const auto expected = std::array, matchers.size()>{ { { 0, 29999, 30000, 59999, 60000, 89999, 90000, 119999, 120000, 149999, 150000, 249999 }, + { 0, 59999, 60000, 119999, 120000, 219999 }, + { 0, 59999 }, + { 10000, 19999, 40000, 49999 }, + { 0, 9999, 30000, 39999, 60000, 69999, 90000, 99999, 120000, 129999, 150000, 159999 } } }; + std::array::dataset_poller>, matchers.size()> pollers; + + std::vector>> results; + std::array, matchers.size()> results_cb; + + for (std::size_t i = 0; i < results_cb.size(); ++i) { + auto callback = [&r = results_cb[i]](const auto &dataset) { + r.push_back(dataset.signal_values.front()); + r.push_back(dataset.signal_values.back()); + }; + expect(eq(data_sink_registry::instance().register_multiplexed_callback(data_sink_query::sink_name("test_sink"), Matcher(matchers[i]), 100000, callback), true)); + + pollers[i] = data_sink_registry::instance().get_multiplexed_poller(data_sink_query::sink_name("test_sink"), Matcher(matchers[i]), 100000, blocking_mode::Blocking); + expect(neq(pollers[i], nullptr)); + } + + for (std::size_t i = 0; i < pollers.size(); ++i) { + auto f = std::async([poller = pollers[i]] { + std::vector ranges; + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished.load(); + while (poller->process([&ranges](const auto &datasets) { + for (const auto &dataset : datasets) { + // default signal info, we didn't set anything + expect(eq(dataset.signal_names.size(), 1u)); + expect(eq(dataset.signal_units.size(), 1u)); + expect(eq(dataset.timing_events.size(), 1u)); + expect(eq(dataset.signal_names[0], "unknown signal"s)); + expect(eq(dataset.signal_units[0], "a.u."s)); + ranges.push_back(dataset.signal_values.front()); + ranges.push_back(dataset.signal_values.back()); + } + })) { + } + } + return ranges; + }); + results.push_back(std::move(f)); + } + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + for (std::size_t i = 0; i < results.size(); ++i) { + expect(eq(results[i].get(), expected[i])); + expect(eq(results_cb[i], expected[i])); + } + }; + + "blocking polling trigger mode overlapping"_test = [] { + constexpr std::int32_t n_samples = 150000; + constexpr std::size_t n_triggers = 300; + + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + + for (std::size_t i = 0; i < n_triggers; ++i) { + src.tags.push_back(tag_t{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); + } + + auto &sink = flow_graph.make_node>(); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + auto is_trigger = [](const tag_t &tag) { return trigger_match_result::Matching; }; + + auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking); + expect(neq(poller, nullptr)); + + auto polling = std::async([poller] { + std::vector received_data; + std::vector received_tags; + bool seen_finished = false; + while (!seen_finished) { + seen_finished = poller->finished.load(); + while (poller->process([&received_data, &received_tags](const auto &datasets) { + for (const auto &dataset : datasets) { + expect(eq(dataset.signal_values.size(), 5000u) >> fatal); + received_data.push_back(dataset.signal_values.front()); + received_data.push_back(dataset.signal_values.back()); + expect(eq(dataset.timing_events.size(), 1u)) >> fatal; + expect(eq(dataset.timing_events[0].size(), 1u)); + expect(eq(dataset.timing_events[0][0].index, 3000)); + received_tags.insert(received_tags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); + } + })) { + } + } + return std::make_tuple(received_data, received_tags); + }); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + const auto &[received_data, received_tags] = polling.get(); + auto expected_start = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + expect(eq(poller->drop_count.load(), 0u)); + expect(eq(received_data.size(), 2 * n_triggers) >> fatal); + expect(eq(std::vector(received_data.begin(), received_data.begin() + 5), expected_start)); + expect(eq(received_tags.size(), n_triggers)); + }; + + "callback trigger mode overlapping"_test = [] { + constexpr std::int32_t n_samples = 150000; + constexpr std::size_t n_triggers = 300; + + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + + for (std::size_t i = 0; i < n_triggers; ++i) { + src.tags.push_back(tag_t{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); + } + + auto &sink = flow_graph.make_node>(); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + auto is_trigger = [](const tag_t &) { return trigger_match_result::Matching; }; + + std::mutex m; + std::vector received_data; + + auto callback = [&received_data, &m](auto &&dataset) { + std::lock_guard lg{ m }; + expect(eq(dataset.signal_values.size(), 5000u)); + received_data.push_back(dataset.signal_values.front()); + received_data.push_back(dataset.signal_values.back()); + }; + + data_sink_registry::instance().register_trigger_callback(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, callback); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + std::lock_guard lg{ m }; + auto expected_start = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + expect(eq(received_data.size(), 2 * n_triggers)); + expect(eq(std::vector(received_data.begin(), received_data.begin() + 5), expected_start)); + }; + + "non-blocking polling continuous mode"_test = [] { + graph flow_graph; + auto &src = flow_graph.make_node>({ { "n_samples_max", n_samples } }); + auto &sink = flow_graph.make_node>(); + sink.set_name("test_sink"); + + expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); + + auto invalid_type_poller = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink")); + expect(eq(invalid_type_poller, nullptr)); + + auto poller = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink")); + expect(neq(poller, nullptr)); + + auto polling = std::async([poller] { + expect(neq(poller, nullptr)); + std::size_t samples_seen = 0; + bool seen_finished = false; + while (!seen_finished) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(20ms); + + seen_finished = poller->finished.load(); + while (poller->process([&samples_seen](const auto &data) { samples_seen += data.size(); })) { + } + } + + return samples_seen; + }); + + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; + sched.run_and_wait(); + + sink.stop(); // TODO the scheduler should call this + + const auto samples_seen = polling.get(); + expect(eq(samples_seen + poller->drop_count, n_samples)); + }; +}; + +int +main() { /* tests are statically executed */ +}