Skip to content

Commit

Permalink
upgrade TagSource to user-level production code examples
Browse files Browse the repository at this point in the history
Signed-off-by: rstein <[email protected]>
  • Loading branch information
RalphSteinhagen authored and wirew0rm committed Jun 22, 2023
1 parent ecf5fe5 commit 8be6734
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 55 deletions.
140 changes: 87 additions & 53 deletions test/blocklib/core/unit-test/tag_monitors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,97 @@

namespace fair::graph::tag_test {

void
print_tag(const tag_t &tag, std::string_view prefix = {}) {
fmt::print("{} @index= {}: {{", prefix, tag.index);
if (tag.map.empty()) {
fmt::print("}}\n");
return;
}
for (const auto &[key, value] : tag.map) {
fmt::print(" {:>5}: {} ", key, value);

enum class ProcessFunction {
USE_PROCESS_ONE = 0, ///
USE_PROCESS_BULK = 1 ///
};

void
print_tag(const tag_t &tag, std::string_view prefix = {}) {
fmt::print("{} @index= {}: {{", prefix, tag.index);
if (tag.map.empty()) {
fmt::print("}}\n");
return;
}
for (const auto &[key, value]: tag.map) {
fmt::print(" {:>5}: {} ", key, value);
}
fmt::print("}}\n");
}

template<typename T>
struct TagSource : public node<TagSource<T>> {
OUT<T> out;
std::vector<tag_t> tags{};
std::size_t next_tag{ 0 };
std::uint64_t n_samples_max = 1024;
std::uint64_t n_samples_produced{ 0 };
template<typename T, ProcessFunction UseProcessOne>
struct TagSource : public node<TagSource<T, UseProcessOne>> {
OUT<T> out;
std::vector<tag_t> tags{};
std::size_t next_tag{0};
std::uint64_t n_samples_max = 1024;
std::uint64_t n_samples_produced{0};

constexpr std::make_signed_t<std::size_t>
available_samples(const TagSource &) noexcept {
if constexpr (UseProcessOne == ProcessFunction::USE_PROCESS_ONE) {
return n_samples_produced < n_samples_max ? 1 : -1; // '-1' -> DONE, produced enough samples
} else if constexpr (UseProcessOne == ProcessFunction::USE_PROCESS_BULK) {
std::make_signed_t<std::size_t> nextTagIn =
next_tag < tags.size() ? (tags[next_tag].index - n_samples_produced) : n_samples_max -
n_samples_produced;
return n_samples_produced < n_samples_max ? std::max(1L, nextTagIn)
: -1; // '-1' -> DONE, produced enough samples
} else {
static_assert(fair::meta::always_false<T>, "ProcessFunction-type not handled");
}
}

constexpr std::make_signed_t<std::size_t>
available_samples(const TagSource &) noexcept {
const auto ret = static_cast<std::make_signed_t<std::size_t>>(n_samples_max - n_samples_produced);
return ret > 0 ? ret : -1; // '-1' -> DONE, produced enough samples
}
T
process_one() noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_ONE) {
if (next_tag < tags.size() &&
tags[next_tag].index <= static_cast<std::make_signed_t<std::size_t>>(n_samples_produced)) {
print_tag(tags[next_tag],
fmt::format("{}::process_one(...)\t publish tag at {:6}", this->name(), n_samples_produced));
tag_t &out_tag = this->output_tags()[0];
out_tag = tags[next_tag];
out_tag.index = 0; // indices > 0 write tags in the future ... handle with care
this->forward_tags();
next_tag++;
n_samples_produced++;
return static_cast<T>(1);
}

T
process_one() noexcept {
if (next_tag < tags.size() && tags[next_tag].index <= static_cast<std::make_signed_t<std::size_t>>(n_samples_produced)) {
print_tag(tags[next_tag], fmt::format("{}::process_one(...)\t publish tag at {:6}", this->name(), n_samples_produced));
tag_t &out_tag = this->output_tags()[0];
out_tag = tags[next_tag];
this->forward_tags();
next_tag++;
n_samples_produced++;
return static_cast<T>(1);
return static_cast<T>(0);
}

n_samples_produced++;
return static_cast<T>(0);
}
};

static_assert(HasRequiredProcessFunction<TagSource<int>>);
work_return_t
process_bulk(std::span<T> output) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) {
if (next_tag < tags.size() &&
tags[next_tag].index <= static_cast<std::make_signed_t<std::size_t>>(n_samples_produced)) {
print_tag(tags[next_tag],
fmt::format("{}::process_one(...)\t publish tag at {:6}", this->name(), n_samples_produced));
tag_t &out_tag = this->output_tags()[0];
out_tag = tags[next_tag];
out_tag.index = 0; // indices > 0 write tags in the future ... handle with care
this->forward_tags();
next_tag++;
}

n_samples_produced += output.size();
return n_samples_produced < n_samples_max ? work_return_t::OK : work_return_t::DONE;
}
};

enum class ProcessFunction {
USE_PROCESS_ONE = 0, ///
USE_PROCESS_BULK = 1 ///
};
static_assert(HasRequiredProcessFunction<TagSource<int, ProcessFunction::USE_PROCESS_ONE>>);
static_assert(HasRequiredProcessFunction<TagSource<int, ProcessFunction::USE_PROCESS_BULK>>);

template<typename T, ProcessFunction UseProcessOne>
struct TagMonitor : public node<TagMonitor<T, UseProcessOne>> {
IN<T> in;
OUT<T> out;
std::vector<tag_t> tags{};
std::uint64_t n_samples_produced{ 0 };
template<typename T, ProcessFunction UseProcessOne>
struct TagMonitor : public node<TagMonitor<T, UseProcessOne>> {
IN<T> in;
OUT<T> out;
std::vector<tag_t> tags{};
std::uint64_t n_samples_produced{0};

constexpr T
process_one(const T &input) noexcept
constexpr T
process_one(const T &input) noexcept
requires(UseProcessOne == ProcessFunction::USE_PROCESS_ONE)
{
if (this->input_tags_present()) {
Expand Down Expand Up @@ -145,13 +176,16 @@ struct TagSink : public node<TagSink<T, UseProcessOne>> {
}
};

static_assert(HasRequiredProcessFunction<TagSink<int, ProcessFunction::USE_PROCESS_ONE>>);
static_assert(HasRequiredProcessFunction<TagSink<int, ProcessFunction::USE_PROCESS_BULK>>);
static_assert(HasRequiredProcessFunction<TagSink<int, ProcessFunction::USE_PROCESS_ONE>>);
static_assert(HasRequiredProcessFunction<TagSink<int, ProcessFunction::USE_PROCESS_BULK>>);

} // namespace fair::graph::tag_test

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fair::graph::tag_test::TagSource<T>), out, n_samples_max);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, fair::graph::tag_test::ProcessFunction b), (fair::graph::tag_test::TagMonitor<T, b>), in, out);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, fair::graph::tag_test::ProcessFunction b), (fair::graph::tag_test::TagSink<T, b>), in);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, fair::graph::tag_test::ProcessFunction b),
(fair::graph::tag_test::TagSource<T, b>), out, n_samples_max);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, fair::graph::tag_test::ProcessFunction b),
(fair::graph::tag_test::TagMonitor<T, b>), in, out);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, fair::graph::tag_test::ProcessFunction b),
(fair::graph::tag_test::TagSink<T, b>), in);

#endif // GRAPH_PROTOTYPE_TAG_MONITORS_HPP
6 changes: 4 additions & 2 deletions test/qa_tags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ const boost::ut::suite TagPropagation = [] {

"tag_source"_test = [] {
std::uint64_t n_samples = 1024;
graph flow_graph;
auto &src = flow_graph.make_node<TagSource<float>>({ { "n_samples_max", n_samples }, { "name", "TagSource" } });
graph flow_graph;
auto &src = flow_graph.make_node<TagSource<float, ProcessFunction::USE_PROCESS_BULK>>(
{{"n_samples_max", n_samples},
{"name", "TagSource"}});
src.tags = { // TODO: allow parameter settings to include maps?!?
{0, {{"key", "value@0"}}},
{1, {{"key", "value@1"}}},
Expand Down

0 comments on commit 8be6734

Please sign in to comment.