Skip to content

Commit

Permalink
data_sink with preliminary streaming callback/polling
Browse files Browse the repository at this point in the history
  • Loading branch information
frankosterfeld committed Jun 6, 2023
1 parent 5619855 commit b8b93ca
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 0 deletions.
155 changes: 155 additions & 0 deletions include/data_sink.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#ifndef GNURADIO_DATA_SINK_HPP
#define GNURADIO_DATA_SINK_HPP

#include "circular_buffer.hpp"
#include "node.hpp"
#include "tag.hpp"

namespace fair::graph {

enum class acquisition_mode {
Continuous,
Triggered,
PostMortem
};

enum class blocking_mode {
NonBlocking,
Blocking
};

template<typename T>
class data_sink : public node<data_sink<T>> {
public:
IN<T> in;
std::size_t n_samples_consumed = 0;
std::size_t n_samples_max = -1;
int64_t last_tag_position = -1;
float sample_rate = -1.0f;

static constexpr std::size_t listener_buffer_size = 65536;

struct poller {
std::atomic<bool> finished = false;
std::atomic<std::size_t> drop_count = 0;
gr::circular_buffer<T> buffer = gr::circular_buffer<T>(listener_buffer_size);
decltype(buffer.new_reader()) reader = buffer.new_reader();

template<typename Handler>
[[nodiscard]] bool process(Handler fnc) {
const auto available = reader.available();
if (available == 0) {
return false;
}

const auto read_data = reader.get(available);
fnc(read_data);
reader.consume(available);
return true;
}
};

private:
struct listener {
acquisition_mode mode = acquisition_mode::Triggered;
std::pair<int64_t, int64_t> window; ///< window of data to return in relation to the matching tag position, e.g. [-2000, 3000] to obtain 2000 presamples and 3000 postsamples
std::size_t history_size = 0;
bool block = false;
int64_t drop_count = 0;
std::function<bool(fair::graph::tag_t::map_type)> trigger_predicate;
gr::circular_buffer<T> buffer;
std::optional<std::size_t> pending; ///< number of samples expected due to a previous trigger
std::function<void(std::span<const T>)> callback; // TODO we might want to pass back stats here like drop_count
std::weak_ptr<poller> polling_handler;
};

struct {
std::atomic<bool> dirty = false;
std::mutex mutex;
std::vector<listener> list;
} pending_listeners;

std::vector<listener> listeners;

public:
std::shared_ptr<poller> get_streaming_poller(blocking_mode block = blocking_mode::NonBlocking) {
auto handler = std::make_shared<poller>();
pending_listeners.list.push_back({
.mode = acquisition_mode::Continuous,
.block = block == blocking_mode::Blocking,
.buffer = gr::circular_buffer<T>(0),
.polling_handler = handler
});
pending_listeners.dirty = true;
return handler;
}

template<typename Callback>
void register_streaming_callback(Callback callback) {
std::lock_guard lg(pending_listeners.mutex);
pending_listeners.list.push_back({
.mode = acquisition_mode::Continuous,
.buffer = gr::circular_buffer<T>(0),
.callback = std::move(callback)
});
pending_listeners.dirty = true;
}

[[nodiscard]] work_return_t work() {
auto &in_port = input_port<"in">(this);
auto &reader = in_port.streamReader();

const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
if (n_readable == 0) {
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
}

const auto noutput_items = std::min(listener_buffer_size, n_readable);
const auto in_data = reader.get(noutput_items);

if (pending_listeners.dirty) {
std::lock_guard lg(pending_listeners.mutex);
listeners = pending_listeners.list;
pending_listeners.dirty = false;
}

for (auto &listener : listeners) {
if (listener.mode == acquisition_mode::Continuous) {
if (auto poller = listener.polling_handler.lock()) {
auto writer = poller->buffer.new_writer();
const auto read_data = reader.get(noutput_items);
if (listener.block) {
auto write_data = writer.reserve_output_range(noutput_items);
std::copy(read_data.begin(), read_data.end(), write_data.begin());
write_data.publish(write_data.size());
} else {
const auto can_write = writer.available();
const auto to_write = std::min(read_data.size(), can_write);
poller->drop_count += read_data.size() - can_write;
if (to_write > 0) {
auto write_data = writer.reserve_output_range(to_write);
std::copy(read_data.begin(), read_data.begin() + to_write - 1, write_data.begin());
write_data.publish(write_data.size());
}
}
} else if (listener.callback) {
listener.callback(in_data);
}
}
}

n_samples_consumed += noutput_items;

if (!reader.consume(noutput_items)) {
return work_return_t::ERROR;
}

return work_return_t::OK;
}
};

}

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fair::graph::data_sink<T>), in, n_samples_consumed, n_samples_max, last_tag_position, sample_rate);

#endif
1 change: 1 addition & 0 deletions include/tag.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum class tag_propagation_policy_t {
*/
struct alignas(hardware_constructive_interference_size) tag_t {
using map_type = std::map<std::string, pmtv::pmt, std::less<>>;

int64_t index = 0;
map_type map;

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ endif ()
endfunction()

add_ut_test(qa_buffer)
add_ut_test(qa_data_sink)
add_ut_test(qa_dynamic_port)
add_ut_test(qa_filter)
add_ut_test(qa_settings)
Expand Down
154 changes: 154 additions & 0 deletions test/qa_data_sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include <boost/ut.hpp>

#include <buffer.hpp>
#include <data_sink.hpp>
#include <graph.hpp>
#include <node.hpp>
#include <reflection.hpp>
#include <scheduler.hpp>

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <future>

#if defined(__clang__) && __clang_major__ >= 16
// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection
template<>
auto boost::ut::cfg<boost::ut::override> = boost::ut::runner<boost::ut::reporter<>>{};
#endif

namespace fair::graph::data_sink_test {

template<typename T>
struct Source : public node<Source<T>> {
OUT<T> out;
std::int32_t n_samples_produced = 0;
std::int32_t n_samples_max = 1024;
std::int32_t n_tag_offset = 0;
float sample_rate = 1000.0f;
T next_value = {};

void
init(const tag_t::map_type &old_settings, const tag_t::map_type &new_settings) {
// optional init function that is called after construction and whenever settings change
fair::graph::publish_tag(out, { { "n_samples_max", n_samples_max } }, n_tag_offset);
}

constexpr std::int64_t
available_samples(const Source &self) noexcept {
const auto ret = static_cast<std::int64_t>(n_samples_max - n_samples_produced);
return ret > 0 ? ret : -1; // '-1' -> DONE, produced enough samples
}

[[nodiscard]] constexpr T
process_one() noexcept {
n_samples_produced++;
return next_value++;
}
};

} // namespace fair::graph::data_sink_test

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fair::graph::data_sink_test::Source<T>), out, n_samples_produced, n_samples_max, n_tag_offset, sample_rate);

const boost::ut::suite DataSinkTests = [] {
using namespace boost::ut;
using namespace fair::graph;
using namespace fair::graph::data_sink_test;

"callback continuous mode"_test = [] {
graph flow_graph;
constexpr std::int32_t n_samples = gr::util::round_up(1'000'000, 1024);

auto &src = flow_graph.make_node<Source<float>>({ { "n_samples_max", n_samples } });
auto &sink = flow_graph.make_node<data_sink<float>>();
expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

std::size_t samples_seen = 0;
auto callback = [&samples_seen](std::span<const float> buffer) {
for (std::size_t i = 0; i < buffer.size(); ++i) {
expect(eq(buffer[i], static_cast<float>(samples_seen + i)));
}
samples_seen += buffer.size();
};

sink.register_streaming_callback(callback);

fair::graph::scheduler::simple sched{std::move(flow_graph)};
sched.work();

expect(eq(sink.n_samples_consumed, n_samples));
expect(eq(samples_seen, n_samples));
};

"blocking polling continuous mode"_test = [] {
constexpr std::int32_t n_samples = gr::util::round_up(1'000'000, 1024);

graph flow_graph;
auto &src = flow_graph.make_node<Source<float>>({ { "n_samples_max", n_samples } });
auto &sink = flow_graph.make_node<data_sink<float>>();
expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

std::atomic<std::size_t> samples_seen = 0;

auto poller = sink.get_streaming_poller(blocking_mode::Blocking);

auto polling = std::async([poller, &samples_seen] {
while (!poller->finished) {
[[maybe_unused]] poller->process([&samples_seen](const auto &data) {
samples_seen += data.size();
});
}
});

fair::graph::scheduler::simple sched{std::move(flow_graph)};
sched.work();

poller->finished = true; // TODO this should be done by the block

polling.wait();

expect(eq(sink.n_samples_consumed, n_samples));
expect(eq(samples_seen.load(), n_samples));
expect(eq(poller->drop_count.load(), 0));
};

"non-blocking polling continuous mode"_test = [] {
constexpr std::int32_t n_samples = gr::util::round_up(1'000'000, 1024);

graph flow_graph;
auto &src = flow_graph.make_node<Source<float>>({ { "n_samples_max", n_samples } });
auto &sink = flow_graph.make_node<data_sink<float>>();
expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

std::atomic<std::size_t> samples_seen = 0;

auto poller = sink.get_streaming_poller();

auto polling = std::async([poller, &samples_seen] {
while (!poller->finished) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(20ms);
[[maybe_unused]] poller->process([&samples_seen](const auto &data) {
samples_seen += data.size();
});
}
});

fair::graph::scheduler::simple sched{std::move(flow_graph)};
sched.work();

poller->finished = true; // TODO this should be done by the block

polling.wait();

expect(eq(sink.n_samples_consumed, n_samples));
expect(lt(samples_seen.load(), n_samples));
expect(gt(poller->drop_count.load(), 0));
};
};

int
main() { /* tests are statically executed */
}

0 comments on commit b8b93ca

Please sign in to comment.