diff --git a/blocks/basic/test/qa_DataSink.cpp b/blocks/basic/test/qa_DataSink.cpp index cb078dab..f18c31ef 100644 --- a/blocks/basic/test/qa_DataSink.cpp +++ b/blocks/basic/test/qa_DataSink.cpp @@ -9,8 +9,8 @@ #include #include #include -#include #include +#include #include #include @@ -21,14 +21,12 @@ using namespace std::chrono_literals; template<> struct fmt::formatter { template - constexpr auto - parse(ParseContext &ctx) { + constexpr auto parse(ParseContext& ctx) { return ctx.begin(); } template - constexpr auto - format(const gr::Tag &tag, FormatContext &ctx) const { + constexpr auto format(const gr::Tag& tag, FormatContext& ctx) const { return fmt::format_to(ctx.out(), "{}", tag.index); } }; @@ -58,18 +56,11 @@ struct Matcher { explicit Matcher(std::optional year_, std::optional month_, std::optional day_) : year(year_), month(month_), day(day_) {} - static inline bool - same(int x, std::optional other) { - return other && x == *other; - } + static inline bool same(int x, std::optional other) { return other && x == *other; } - static inline bool - changed(int x, std::optional other) { - return !same(x, other); - } + static inline bool changed(int x, std::optional other) { return !same(x, other); } - TriggerMatchResult - operator()(const Tag &tag) { + TriggerMatchResult operator()(const Tag& tag) { const auto ty = tag.get("YEAR"); const auto tm = tag.get("MONTH"); const auto td = tag.get("DAY"); @@ -78,7 +69,7 @@ struct Matcher { } const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); - const auto &[y, m, d] = tup; + const auto& [y, m, d] = tup; const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; @@ -106,13 +97,9 @@ struct Matcher { } }; -static Tag -makeTag(Tag::signed_index_type index, int year, int month, int day) { - return Tag{ index, { { "YEAR", year }, { "MONTH", month }, { "DAY", day } } }; -} +static Tag makeTag(Tag::signed_index_type index, int year, int month, int day) { return Tag{index, {{"YEAR", year}, {"MONTH", month}, {"DAY", day}}}; } -static std::vector -makeTestTags(Tag::signed_index_type firstIndex, Tag::signed_index_type interval) { +static std::vector makeTestTags(Tag::signed_index_type firstIndex, Tag::signed_index_type interval) { std::vector tags; for (int y = 1; y <= 3; ++y) { for (int m = 1; m <= 2; ++m) { @@ -125,8 +112,7 @@ makeTestTags(Tag::signed_index_type firstIndex, Tag::signed_index_type interval) return tags; } -static std::string -toAsciiArt(std::span states) { +static std::string toAsciiArt(std::span states) { bool started = false; std::string r; for (auto s : states) { @@ -146,28 +132,25 @@ toAsciiArt(std::span states) { } template -std::string -runMatcherTest(std::span tags, M o) { +std::string runMatcherTest(std::span tags, M o) { std::vector r; r.reserve(tags.size()); - for (const auto &tag : tags) { + for (const auto& tag : tags) { r.push_back(o(tag)); } return toAsciiArt(r); } -std::pair, std::vector> -extractMetadataTags(const std::vector &tags) { - constexpr auto tagsToExtract = std::array{ gr::tag::SAMPLE_RATE.shortKey(), gr::tag::SIGNAL_NAME.shortKey(), gr::tag::SIGNAL_UNIT.shortKey(), gr::tag::SIGNAL_MIN.shortKey(), - gr::tag::SIGNAL_MAX.shortKey() }; +std::pair, std::vector> extractMetadataTags(const std::vector& tags) { + constexpr auto tagsToExtract = std::array{gr::tag::SAMPLE_RATE.shortKey(), gr::tag::SIGNAL_NAME.shortKey(), gr::tag::SIGNAL_UNIT.shortKey(), gr::tag::SIGNAL_MIN.shortKey(), gr::tag::SIGNAL_MAX.shortKey()}; std::vector metadataTags; std::vector nonMetadataTags; - for (const auto &tag : tags) { + for (const auto& tag : tags) { Tag metadata; Tag nonMetadata; metadata.index = tag.index; nonMetadata.index = tag.index; - for (const auto &[key, value] : tag.map) { + for (const auto& [key, value] : tag.map) { if (std::find(tagsToExtract.begin(), tagsToExtract.end(), key) != tagsToExtract.end()) { metadata.map[key] = value; } else { @@ -181,7 +164,7 @@ extractMetadataTags(const std::vector &tags) { nonMetadataTags.push_back(nonMetadata); } } - return { metadataTags, nonMetadataTags }; + return {metadataTags, nonMetadataTags}; } struct Metadata { @@ -192,10 +175,9 @@ struct Metadata { std::optional sample_rate; }; -Metadata -metadataFromTag(const Tag &tag) { +Metadata metadataFromTag(const Tag& tag) { Metadata m; - for (const auto &[key, value] : tag.map) { + for (const auto& [key, value] : tag.map) { if (key == gr::tag::SIGNAL_NAME.shortKey()) { m.signal_name = std::get(value); } else if (key == gr::tag::SIGNAL_UNIT.shortKey()) { @@ -211,10 +193,9 @@ metadataFromTag(const Tag &tag) { return m; } -Metadata -latestMetadata(const std::vector &tags) { +Metadata latestMetadata(const std::vector& tags) { Metadata metadata; - for (const auto &tag : tags | std::views::reverse) { + for (const auto& tag : tags | std::views::reverse) { const auto m = metadataFromTag(tag); if (!metadata.signal_name) { metadata.signal_name = m.signal_name; @@ -235,8 +216,7 @@ latestMetadata(const std::vector &tags) { return metadata; } -bool -spinUntil(std::chrono::milliseconds timeout, auto fnc) { +bool spinUntil(std::chrono::milliseconds timeout, auto fnc) { const auto start = std::chrono::steady_clock::now(); while (std::chrono::steady_clock::now() - start < timeout) { @@ -251,15 +231,13 @@ spinUntil(std::chrono::milliseconds timeout, auto fnc) { } // namespace gr::basic::data_sink_test template -std::string -formatList(const T &l) { +std::string formatList(const T& l) { return fmt::format("[{}]", fmt::join(l, ", ")); } template -bool -indexesMatch(const T &lhs, const U &rhs) { - auto index_match = [](const auto &l, const auto &r) { return l.index == r.index; }; +bool indexesMatch(const T& lhs, const U& rhs) { + auto index_match = [](const auto& l, const auto& r) { return l.index == r.index; }; return std::equal(std::begin(lhs), std::end(lhs), std::begin(rhs), std::end(rhs), index_match); } @@ -280,11 +258,10 @@ const boost::ut::suite DataSinkTests = [] { const auto srcTags = makeTestTags(0, 1000); gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>( - { { "n_samples_max", kSamples }, { "mark_tag", false }, { "signal_name", "test source" }, { "signal_unit", "test unit" }, { "signal_min", -42.f }, { "signal_max", 42.f } }); - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); - src.tags = srcTags; + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}, {"signal_name", "test source"}, {"signal_unit", "test unit"}, {"signal_min", -42.f}, {"signal_max", 42.f}}); + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); + src._tags = srcTags; expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); @@ -314,15 +291,14 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(buffer[i], static_cast(samplesSeen2 + i))); } - for (const auto &tag : tags) { + for (const auto& tag : tags) { expect(ge(tag.index, 0)); expect(lt(tag.index, static_cast(buffer.size()))); } - auto lg = std::lock_guard{ m2 }; + auto lg = std::lock_guard{m2}; std::vector adjusted; - std::transform(tags.begin(), tags.end(), std::back_inserter(adjusted), - [samplesSeen2](const auto &tag) { return Tag{ static_cast(samplesSeen2) + tag.index, tag.map }; }); + std::transform(tags.begin(), tags.end(), std::back_inserter(adjusted), [samplesSeen2](const auto& tag) { return Tag{static_cast(samplesSeen2) + tag.index, tag.map}; }); receivedTags.insert(receivedTags.end(), adjusted.begin(), adjusted.end()); samplesSeen2 += buffer.size(); chunksSeen2++; @@ -333,7 +309,7 @@ const boost::ut::suite DataSinkTests = [] { } }; - auto callbackWithTagsAndSink = [&sink](std::span, std::span, const DataSink &passedSink) { + auto callbackWithTagsAndSink = [&sink](std::span, std::span, const DataSink& passedSink) { expect(eq(passedSink.name.value, "test_sink"s)); expect(eq(sink.unique_name, passedSink.unique_name)); }; @@ -357,16 +333,16 @@ const boost::ut::suite DataSinkTests = [] { })) << boost::ut::fatal; }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); registerThread.join(); - auto lg = std::lock_guard{ m2 }; + auto lg = std::lock_guard{m2}; expect(eq(chunksSeen1.load(), 201UZ)); expect(eq(chunksSeen2, 201UZ)); expect(eq(samplesSeen1.load(), static_cast(kSamples))); expect(eq(samplesSeen2, static_cast(kSamples))); - const auto &[metadataTags, nonMetadataTags] = extractMetadataTags(receivedTags); + const auto& [metadataTags, nonMetadataTags] = extractMetadataTags(receivedTags); expect(eq(nonMetadataTags.size(), srcTags.size())); expect(eq(indexesMatch(nonMetadataTags, srcTags), true)) << fmt::format("{} != {}", formatList(receivedTags), formatList(srcTags)); const auto metadata = latestMetadata(metadataTags); @@ -381,11 +357,10 @@ const boost::ut::suite DataSinkTests = [] { gr::Graph testGraph; const auto tags = makeTestTags(0, 1000); - auto &src = testGraph.emplaceBlock>( - { { "n_samples_max", kSamples }, { "mark_tag", false }, { "signal_name", "test source" }, { "signal_unit", "test unit" }, { "signal_min", -42.f }, { "signal_max", 42.f } }); - src.tags = tags; - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" }, { "signal_name", "test signal" } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}, {"signal_name", "test source"}, {"signal_unit", "test unit"}, {"signal_min", -42.f}, {"signal_max", 42.f}}); + src._tags = tags; + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}, {"signal_name", "test signal"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); @@ -401,7 +376,7 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished; - while (poller->process([&received](const auto &data) { received.insert(received.end(), data.begin(), data.end()); })) { + while (poller->process([&received](const auto& data) { received.insert(received.end(), data.begin(), data.end()); })) { } } @@ -419,9 +394,9 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished; - while (poller->process([&received, &receivedTags](const auto &data, const auto &tags_) { + while (poller->process([&received, &receivedTags](const auto& data, const auto& tags_) { auto rtags = std::vector(tags_.begin(), tags_.end()); - for (auto &t : rtags) { + for (auto& t : rtags) { t.index += static_cast(received.size()); } receivedTags.insert(receivedTags.end(), rtags.begin(), rtags.end()); @@ -434,7 +409,7 @@ const boost::ut::suite DataSinkTests = [] { }); { - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); const auto pollerAfterStop = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink")); @@ -447,9 +422,9 @@ const boost::ut::suite DataSinkTests = [] { std::vector expected(kSamples); std::iota(expected.begin(), expected.end(), 0.0); - const auto &[pollerDataOnly, received1] = runner1.get(); - const auto &[pollerWithTags, received2, receivedTags] = runner2.get(); - const auto &[metadataTags, nonMetadataTags] = extractMetadataTags(receivedTags); + const auto& [pollerDataOnly, received1] = runner1.get(); + const auto& [pollerWithTags, received2, receivedTags] = runner2.get(); + const auto& [metadataTags, nonMetadataTags] = extractMetadataTags(receivedTags); expect(eq(received1.size(), expected.size())); expect(eq(received1, expected)); expect(eq(pollerDataOnly->drop_count.load(), 0UZ)); @@ -471,19 +446,18 @@ const boost::ut::suite DataSinkTests = [] { constexpr gr::Size_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, { "mark_tag", false } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}}); - const auto tags = std::vector{ { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; - src.tags = tags; - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>( - { { "name", "test_sink" }, { "signal_name", "test signal" }, { "signal_unit", "none" }, { "signal_min", -2.0f }, { "signal_max", 2.0f } }); + const auto tags = std::vector{{3000, {{"TYPE", "TRIGGER"}}}, {8000, {{"TYPE", "NO_TRIGGER"}}}, {180000, {{"TYPE", "TRIGGER"}}}}; + src._tags = tags; + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}, {"signal_name", "test signal"}, {"signal_unit", "none"}, {"signal_min", -2.0f}, {"signal_max", 2.0f}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); auto polling = std::async([] { - auto isTrigger = [](const Tag &tag) { + auto isTrigger = [](const Tag& tag) { const auto v = tag.get("TYPE"); return v && std::get(v->get()) == "TRIGGER" ? TriggerMatchResult::Matching : TriggerMatchResult::Ignore; }; @@ -499,8 +473,8 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished; - [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto &datasets) { - for (const auto &dataset : datasets) { + [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto& datasets) { + for (const auto& dataset : datasets) { receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); // signal info from sink settings expect(eq(dataset.signal_names.size(), 1u)); @@ -509,7 +483,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "none"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ -2, +2 })); + expect(eq(dataset.signal_ranges[0], std::vector{-2, +2})); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, 3)); receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); @@ -519,14 +493,14 @@ const boost::ut::suite DataSinkTests = [] { return std::make_tuple(poller, receivedData, receivedTags); }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); - const auto &[poller, receivedData, receivedTags] = polling.get(); - const auto expected_tags = { tags[0], tags[2] }; // triggers-only + const auto& [poller, receivedData, receivedTags] = polling.get(); + const auto expected_tags = {tags[0], tags[2]}; // triggers-only expect(eq(receivedData.size(), 10UZ)); - expect(eq(receivedData, std::vector{ 2997, 2998, 2999, 3000, 3001, 179997, 179998, 179999, 180000, 180001 })); + expect(eq(receivedData, std::vector{2997, 2998, 2999, 3000, 3001, 179997, 179998, 179999, 180000, 180001})); expect(eq(receivedTags.size(), expected_tags.size())); expect(eq(poller->drop_count.load(), 0UZ)); @@ -535,13 +509,12 @@ const boost::ut::suite DataSinkTests = [] { "propagation of signal metadata per data set"_test = [] { constexpr gr::Size_t kSamples = 40000000; - gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>( - { { "n_samples_max", kSamples }, { "mark_tag", false }, { "signal_name", "test signal" }, { "signal_unit", "no unit" }, { "signal_min", -2.f }, { "signal_max", 2.f } }); - const auto tags = std::vector{ { 39000000, { { "TYPE", "TRIGGER" } } } }; - src.tags = tags; - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>(); + gr::Graph testGraph; + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}, {"signal_name", "test signal"}, {"signal_unit", "no unit"}, {"signal_min", -2.f}, {"signal_max", 2.f}}); + const auto tags = std::vector{{39000000, {{"TYPE", "TRIGGER"}}}}; + src._tags = tags; + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>(); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); @@ -550,7 +523,7 @@ const boost::ut::suite DataSinkTests = [] { std::vector receivedData; std::vector receivedTags; bool seenFinished = false; - auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; + auto isTrigger = [](const Tag&) { return TriggerMatchResult::Matching; }; std::shared_ptr::DataSetPoller> poller; expect(spinUntil(4s, [&] { poller = DataSinkRegistry::instance().getTriggerPoller(DataSinkQuery::signalName("test signal"), isTrigger, 0UZ, 2UZ, BlockingMode::Blocking); @@ -563,8 +536,8 @@ const boost::ut::suite DataSinkTests = [] { while (!seenFinished) { seenFinished = poller->finished; - [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto &datasets) { - for (const auto &dataset : datasets) { + [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto& datasets) { + for (const auto& dataset : datasets) { receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); // signal info from sink settings expect(eq(dataset.signal_names.size(), 1u)); @@ -573,7 +546,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "no unit"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ -2, +2 })); + expect(eq(dataset.signal_ranges[0], std::vector{-2, +2})); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, 0)); receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); @@ -583,46 +556,37 @@ const boost::ut::suite DataSinkTests = [] { return std::make_tuple(poller, receivedData, receivedTags); }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); - const auto &[_, receivedData, receivedTags] = polling.get(); - expect(eq(receivedData, std::vector{ 39000000, 39000001 })); + const auto& [_, receivedData, receivedTags] = polling.get(); + expect(eq(receivedData, std::vector{39000000, 39000001})); }; "blocking snapshot mode"_test = [] { constexpr gr::Size_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, - { "mark_tag", false }, - { "sample_rate", 10000.f }, - { "signal_name", "test signal" }, - { "signal_unit", "none" }, - { "signal_min", 0.f }, - { "signal_max", static_cast(kSamples - 1) } }); - src.tags = { { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}, {"sample_rate", 10000.f}, {"signal_name", "test signal"}, {"signal_unit", "none"}, {"signal_min", 0.f}, {"signal_max", static_cast(kSamples - 1)}}); + src._tags = {{3000, {{"TYPE", "TRIGGER"}}}, {8000, {{"TYPE", "NO_TRIGGER"}}}, {180000, {{"TYPE", "TRIGGER"}}}}; + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); - constexpr auto kDelay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples + constexpr auto kDelay = std::chrono::milliseconds{500}; // sample rate 10000 -> 5000 samples std::vector receivedDataCb; - auto callback = [&receivedDataCb](const auto &dataset) { receivedDataCb.insert(receivedDataCb.end(), dataset.signal_values.begin(), dataset.signal_values.end()); }; + auto callback = [&receivedDataCb](const auto& dataset) { receivedDataCb.insert(receivedDataCb.end(), dataset.signal_values.begin(), dataset.signal_values.end()); }; - auto isTrigger = [](const Tag &tag) { + auto isTrigger = [](const Tag& tag) { const auto v = tag.get("TYPE"); return (v && std::get(v->get()) == "TRIGGER") ? TriggerMatchResult::Matching : TriggerMatchResult::Ignore; }; - auto registerThread = std::thread([&] { - expect(spinUntil(4s, [&] { return DataSinkRegistry::instance().registerSnapshotCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, kDelay, callback); })) - << boost::ut::fatal; - }); + auto registerThread = std::thread([&] { expect(spinUntil(4s, [&] { return DataSinkRegistry::instance().registerSnapshotCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, kDelay, callback); })) << boost::ut::fatal; }); auto poller_result = std::async([isTrigger, kDelay] { std::shared_ptr::DataSetPoller> poller; @@ -636,8 +600,8 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished; - [[maybe_unused]] auto r = poller->process([&receivedData](const auto &datasets) { - for (const auto &dataset : datasets) { + [[maybe_unused]] auto r = poller->process([&receivedData](const auto& datasets) { + for (const auto& dataset : datasets) { // signal info propagated from source to sink expect(eq(dataset.signal_names.size(), 1u)); expect(eq(dataset.signal_units.size(), 1u)); @@ -645,7 +609,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "none"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ 0, kSamples - 1 })); + expect(eq(dataset.signal_ranges[0], std::vector{0, kSamples - 1})); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, -5000)); receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); @@ -656,14 +620,14 @@ const boost::ut::suite DataSinkTests = [] { return std::make_tuple(poller, receivedData); }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); registerThread.join(); - const auto &[poller, receivedData] = poller_result.get(); + const auto& [poller, receivedData] = poller_result.get(); expect(eq(receivedDataCb, receivedData)); - expect(eq(receivedData, std::vector{ 8000, 185000 })); + expect(eq(receivedData, std::vector{8000, 185000})); expect(eq(poller->drop_count.load(), 0UZ)); }; @@ -673,10 +637,10 @@ const boost::ut::suite DataSinkTests = [] { const gr::Size_t n_samples = static_cast(tags.size() * 10000 + 100000); gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples }, { "mark_tag", false } }); - src.tags = tags; - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", 2500u } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", n_samples}, {"mark_tag", false}}); + src._tags = tags; + auto& delay = testGraph.emplaceBlock>({{"delay_ms", 2500u}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); @@ -692,14 +656,10 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(runMatcherTest(t, Matcher({}, {}, 1)), "|#|__|#|__|#|__|#|__|#|__|#|__"s)); } - const auto matchers = std::array{ Matcher({}, -1, {}), Matcher(-1, {}, {}), Matcher(1, {}, {}), Matcher(1, {}, 2), Matcher({}, {}, 1) }; + const auto matchers = std::array{Matcher({}, -1, {}), Matcher(-1, {}, {}), Matcher(1, {}, {}), Matcher(1, {}, 2), Matcher({}, {}, 1)}; // Following the patterns above, where each #/_ is 10000 samples - const auto expected = std::array, matchers.size()>{ { { 0, 29999, 30000, 59999, 60000, 89999, 90000, 119999, 120000, 149999, 150000, 249999 }, - { 0, 59999, 60000, 119999, 120000, 219999 }, - { 0, 59999 }, - { 10000, 19999, 40000, 49999 }, - { 0, 9999, 30000, 39999, 60000, 69999, 90000, 99999, 120000, 129999, 150000, 159999 } } }; + const auto expected = std::array, matchers.size()>{{{0, 29999, 30000, 59999, 60000, 89999, 90000, 119999, 120000, 149999, 150000, 249999}, {0, 59999, 60000, 119999, 120000, 219999}, {0, 59999}, {10000, 19999, 40000, 49999}, {0, 9999, 30000, 39999, 60000, 69999, 90000, 99999, 120000, 129999, 150000, 159999}}}; std::vector>> results; std::array, matchers.size()> resultsCb; @@ -709,7 +669,7 @@ const boost::ut::suite DataSinkTests = [] { expect(spinUntil(3s, [&] { for (auto i = 0UZ; i < registered.size(); ++i) { if (!registered[i]) { - auto callback = [&r = resultsCb[i]](const auto &dataset) { + auto callback = [&r = resultsCb[i]](const auto& dataset) { r.push_back(dataset.signal_values.front()); r.push_back(dataset.signal_values.back()); }; @@ -731,8 +691,8 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished.load(); - while (poller->process([&ranges](const auto &datasets) { - for (const auto &dataset : datasets) { + while (poller->process([&ranges](const auto& datasets) { + for (const auto& dataset : datasets) { // default signal info, we didn't set anything expect(eq(dataset.signal_names.size(), 1u)); expect(eq(dataset.signal_units.size(), 1u)); @@ -750,7 +710,7 @@ const boost::ut::suite DataSinkTests = [] { results.push_back(std::move(f)); } - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); registerThread.join(); @@ -765,20 +725,20 @@ const boost::ut::suite DataSinkTests = [] { constexpr std::size_t kTriggers = 300; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, { "mark_tag", false } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}}); for (std::size_t i = 0; i < kTriggers; ++i) { - src.tags.push_back(Tag{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); + src._tags.push_back(Tag{static_cast(60000 + i), {{"TYPE", "TRIGGER"}}}); } - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); auto polling = std::async([] { - auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; + auto isTrigger = [](const Tag&) { return TriggerMatchResult::Matching; }; std::shared_ptr::DataSetPoller> poller; expect(spinUntil(4s, [&] { @@ -790,8 +750,8 @@ const boost::ut::suite DataSinkTests = [] { bool seenFinished = false; while (!seenFinished) { seenFinished = poller->finished.load(); - while (poller->process([&receivedData, &receivedTags](const auto &datasets) { - for (const auto &dataset : datasets) { + while (poller->process([&receivedData, &receivedTags](const auto& datasets) { + for (const auto& dataset : datasets) { expect(eq(dataset.signal_values.size(), 5000u) >> fatal); receivedData.push_back(dataset.signal_values.front()); receivedData.push_back(dataset.signal_values.back()); @@ -806,11 +766,11 @@ const boost::ut::suite DataSinkTests = [] { return std::make_tuple(poller, receivedData, receivedTags); }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); - const auto &[poller, receivedData, receivedTags] = polling.get(); - auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + const auto& [poller, receivedData, receivedTags] = polling.get(); + auto expectedStart = std::vector{57000, 61999, 57001, 62000, 57002}; expect(eq(poller->drop_count.load(), 0u)); expect(eq(receivedData.size(), 2 * kTriggers) >> fatal); expect(eq(std::vector(receivedData.begin(), receivedData.begin() + 5), expectedStart)); @@ -822,41 +782,40 @@ const boost::ut::suite DataSinkTests = [] { constexpr std::size_t kTriggers = 300; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, { "mark_tag", false } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}}); for (std::size_t i = 0; i < kTriggers; ++i) { - src.tags.push_back(Tag{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); + src._tags.push_back(Tag{static_cast(60000 + i), {{"TYPE", "TRIGGER"}}}); } - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); - auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; + auto isTrigger = [](const Tag&) { return TriggerMatchResult::Matching; }; std::mutex m; std::vector receivedData; - auto callback = [&receivedData, &m](auto &&dataset) { - std::lock_guard lg{ m }; + auto callback = [&receivedData, &m](auto&& dataset) { + std::lock_guard lg{m}; expect(eq(dataset.signal_values.size(), 5000u)); receivedData.push_back(dataset.signal_values.front()); receivedData.push_back(dataset.signal_values.back()); }; - auto registerThread = std::thread([&] { - expect(spinUntil(4s, [&] { return DataSinkRegistry::instance().registerTriggerCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, 3000, 2000, callback); })) - << boost::ut::fatal; + auto registerThread = std::thread([&] { // + expect(spinUntil(4s, [&] { return DataSinkRegistry::instance().registerTriggerCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, 3000, 2000, callback); })) << boost::ut::fatal; }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); registerThread.join(); - std::lock_guard lg{ m }; - auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + std::lock_guard lg{m}; + auto expectedStart = std::vector{57000, 61999, 57001, 62000, 57002}; expect(eq(receivedData.size(), 2 * kTriggers)); expect(eq(std::vector(receivedData.begin(), receivedData.begin() + 5), expectedStart)); }; @@ -865,9 +824,9 @@ const boost::ut::suite DataSinkTests = [] { constexpr std::uint32_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, { "mark_tag", false } }); - auto &delay = testGraph.emplaceBlock>({ { "delay_ms", kProcessingDelayMs } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto& src = testGraph.emplaceBlock>({{"n_samples_max", kSamples}, {"mark_tag", false}}); + auto& delay = testGraph.emplaceBlock>({{"delay_ms", kProcessingDelayMs}}); + auto& sink = testGraph.emplaceBlock>({{"name", "test_sink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(delay))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(delay).to<"in">(sink))); @@ -889,20 +848,19 @@ const boost::ut::suite DataSinkTests = [] { std::this_thread::sleep_for(20ms); seenFinished = poller->finished.load(); - while (poller->process([&samplesSeen](const auto &data) { samplesSeen += data.size(); })) { + while (poller->process([&samplesSeen](const auto& data) { samplesSeen += data.size(); })) { } } return std::make_tuple(poller, samplesSeen); }); - Scheduler sched{ std::move(testGraph) }; + Scheduler sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); - const auto &[poller, samplesSeen] = polling.get(); + const auto& [poller, samplesSeen] = polling.get(); expect(eq(samplesSeen + poller->drop_count, static_cast(kSamples))); }; }; -int -main() { /* tests are statically executed */ } +int main() { /* tests are statically executed */ } diff --git a/blocks/basic/test/qa_sources.cpp b/blocks/basic/test/qa_sources.cpp index e8385cfe..152ef39f 100644 --- a/blocks/basic/test/qa_sources.cpp +++ b/blocks/basic/test/qa_sources.cpp @@ -6,14 +6,13 @@ #include #include -#include #include #include +#include #include template -constexpr void -addTimeTagEntry(gr::basic::ClockSource &clockSource, std::uint64_t timeInNanoseconds, const std::string &value) { +constexpr void addTimeTagEntry(gr::basic::ClockSource& clockSource, std::uint64_t timeInNanoseconds, const std::string& value) { clockSource.tag_times.value.push_back(timeInNanoseconds); clockSource.tag_values.value.push_back(value); } @@ -24,9 +23,9 @@ const boost::ut::suite TagTests = [] { using namespace gr::basic; using namespace gr::testing; - static const auto mismatchedKey = [](const property_map &map) { + static const auto mismatchedKey = [](const property_map& map) { std::vector keys; - for (const auto &pair : map) { + for (const auto& pair : map) { keys.push_back(pair.first); } return keys; @@ -40,24 +39,23 @@ const boost::ut::suite TagTests = [] { constexpr gr::Size_t n_samples = 1900; constexpr float sample_rate = 2000.f; Graph testGraph; - auto &src = testGraph.emplaceBlock>( - { { "sample_rate", sample_rate }, { "n_samples_max", n_samples }, { "name", "ClockSource" }, { "verbose_console", verbose } }); - src.tags = { - { 0, { { "key", "value@0" } } }, // - { 1, { { "key", "value@1" } } }, // - { 100, { { "key", "value@100" } } }, // - { 150, { { "key", "value@150" } } }, // - { 1000, { { "key", "value@1000" } } }, // - { 1001, { { "key", "value@1001" } } }, // - { 1002, { { "key", "value@1002" } } }, // - { 1023, { { "key", "value@1023" } } } // + auto& src = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"n_samples_max", n_samples}, {"name", "ClockSource"}, {"verbose_console", verbose}}); + src.tags = { + {0, {{"key", "value@0"}}}, // + {1, {{"key", "value@1"}}}, // + {100, {{"key", "value@100"}}}, // + {150, {{"key", "value@150"}}}, // + {1000, {{"key", "value@1000"}}}, // + {1001, {{"key", "value@1001"}}}, // + {1002, {{"key", "value@1002"}}}, // + {1023, {{"key", "value@1023"}}} // }; - auto &sink1 = testGraph.emplaceBlock>({ { "name", "TagSink1" } }); - auto &sink2 = testGraph.emplaceBlock>({ { "name", "TagSink2" } }); + auto& sink1 = testGraph.emplaceBlock>({{"name", "TagSink1"}}); + auto& sink2 = testGraph.emplaceBlock>({{"name", "TagSink2"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).template to<"in">(sink1))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).template to<"in">(sink2))); - scheduler::Simple sched{ std::move(testGraph) }; + scheduler::Simple sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); if (verbose) { fmt::println("finished ClockSource sched.runAndWait() w/ {}", useIoThreadPool ? "Graph/Block provided-thread" : "user-provided thread"); @@ -68,16 +66,6 @@ const boost::ut::suite TagTests = [] { expect(eq(static_cast(sink1.n_samples_produced), n_samples)) << fmt::format("sink1 did not consume enough input samples ({} vs. {})", sink1.n_samples_produced, n_samples); expect(eq(static_cast(sink2.n_samples_produced), n_samples)) << fmt::format("sink2 did not consume enough input samples ({} vs. {})", sink2.n_samples_produced, n_samples); - if (std::getenv("DISABLE_SENSITIVE_TESTS") == nullptr) { - expect(approx(sink1.effective_sample_rate(), sample_rate, 500.f)) - << fmt::format("sink1: effective sample rate {} vs {} +- {} does not match", sink1.effective_sample_rate(), sample_rate, 500.f); - expect(approx(sink2.effective_sample_rate(), sample_rate, 500.f)) - << fmt::format("sink2: effective sample rate {} vs {} +- {} does not match", sink1.effective_sample_rate(), sample_rate, 500.f); - } - - fmt::print("sink1 (USE_PROCESS_ONE): effective {} vs. expected {} sample rate [Hz]\n", sink1.effective_sample_rate(), sample_rate); - fmt::print("sink2 (USE_PROCESS_BULK): effective {} vs. expected {} sample rate [Hz]\n", sink2.effective_sample_rate(), sample_rate); - // TODO: last decimator/interpolator + stride addition seems to break the limiting the input samples to the min of available vs. n_samples-until next tags // expect(equal_tag_lists(src.tags, sink1.tags)) << "sink1 (USE_PROCESS_ONE) did not receive the required tags"; // expect(equal_tag_lists(src.tags, sink2.tags)) << "sink2 (USE_PROCESS_BULK) did not receive the required tags"; @@ -92,30 +80,18 @@ const boost::ut::suite TagTests = [] { "SignalGenerator test"_test = [] { const std::size_t N = 16; // test points const double offset = 2.; - std::vector signals{ "Const", "Sin", "Cos", "Square", "Saw", "Triangle" }; + std::vector signals{"Const", "Sin", "Cos", "Square", "Saw", "Triangle"}; - for (const auto &sig : signals) { + for (const auto& sig : signals) { SignalGenerator signalGen{}; - auto failed = signalGen.settings().set({ { "signal_type", sig }, // - { "sample_rate", 2048.f }, - { "frequency", 256. }, - { "amplitude", 1. }, - { "offset", offset }, - { "phase", std::numbers::pi / 4 } }); + auto failed = signalGen.settings().set({{"signal_type", sig}, // + {"sample_rate", 2048.f}, {"frequency", 256.}, {"amplitude", 1.}, {"offset", offset}, {"phase", std::numbers::pi / 4}}); expect(failed.empty()) << fmt::format("settings have mismatching keys or value types. offending keys: {}\n", fmt::join(mismatchedKey(failed), ", ")); const auto applyResult = signalGen.settings().applyStagedParameters(); - expect(eq(applyResult.forwardParameters.size(), 1UZ)) << fmt::format("incorrect number of to be forwarded settings. forward keys: {}\n", - fmt::join(mismatchedKey(applyResult.forwardParameters), ", ")); + expect(eq(applyResult.forwardParameters.size(), 1UZ)) << fmt::format("incorrect number of to be forwarded settings. forward keys: {}\n", fmt::join(mismatchedKey(applyResult.forwardParameters), ", ")); // expected values corresponds to sample_rate = 1024., frequency = 128., amplitude = 1., offset = 0., phase = pi/4. - std::map> expResults = { - { "Const", { 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1. } }, - { "Sin", { 0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0., 0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0. } }, - { "Cos", { 0.707106, 0., -0.707106, -1., -0.7071067, 0., 0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0., 0.707106, 1. } }, - { "Square", { 1., 1., 1., -1., -1., -1., -1., 1., 1., 1., 1., -1., -1., -1., -1., 1. } }, - { "Saw", { 0.25, 0.5, 0.75, -1., -0.75, -0.5, -0.25, 0., 0.25, 0.5, 0.75, -1., -0.75, -0.5, -0.25, 0. } }, - { "Triangle", { 0.5, 1., 0.5, 0., -0.5, -1., -0.5, 0., 0.5, 1., 0.5, 0., -0.5, -1., -0.5, 0. } } - }; + std::map> expResults = {{"Const", {1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.}}, {"Sin", {0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0., 0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0.}}, {"Cos", {0.707106, 0., -0.707106, -1., -0.7071067, 0., 0.707106, 1., 0.707106, 0., -0.707106, -1., -0.707106, 0., 0.707106, 1.}}, {"Square", {1., 1., 1., -1., -1., -1., -1., 1., 1., 1., 1., -1., -1., -1., -1., 1.}}, {"Saw", {0.25, 0.5, 0.75, -1., -0.75, -0.5, -0.25, 0., 0.25, 0.5, 0.75, -1., -0.75, -0.5, -0.25, 0.}}, {"Triangle", {0.5, 1., 0.5, 0., -0.5, -1., -0.5, 0., 0.5, 1., 0.5, 0., -0.5, -1., -0.5, 0.}}}; for (std::size_t i = 0; i < N; i++) { const auto val = signalGen.processOne(0); @@ -127,26 +103,21 @@ const boost::ut::suite TagTests = [] { "SignalGenerator ImChart test"_test = [] { const std::size_t N = 512; // test points - std::vector signals{ "Const", "Sin", "Cos", "Square", "Saw", "Triangle" }; - for (const auto &sig : signals) { + std::vector signals{"Const", "Sin", "Cos", "Square", "Saw", "Triangle"}; + for (const auto& sig : signals) { SignalGenerator signalGen{}; - const auto failed = signalGen.settings().set({ { "signal_type", sig }, // - { "sample_rate", 8192.f }, - { "frequency", 32. }, - { "amplitude", 2. }, - { "offset", 0. }, - { "phase", std::numbers::pi / 4. } }); + const auto failed = signalGen.settings().set({{"signal_type", sig}, // + {"sample_rate", 8192.f}, {"frequency", 32.}, {"amplitude", 2.}, {"offset", 0.}, {"phase", std::numbers::pi / 4.}}); expect(failed.empty()) << fmt::format("settings have mismatching keys or value types. offending keys: {}\n", fmt::join(mismatchedKey(failed), ", ")); const auto applyResult = signalGen.settings().applyStagedParameters(); - expect(eq(applyResult.forwardParameters.size(), 1UZ)) << fmt::format("incorrect number of to be forwarded settings. forward keys: {}\n", - fmt::join(mismatchedKey(applyResult.forwardParameters), ", ")); + expect(eq(applyResult.forwardParameters.size(), 1UZ)) << fmt::format("incorrect number of to be forwarded settings. forward keys: {}\n", fmt::join(mismatchedKey(applyResult.forwardParameters), ", ")); std::vector xValues(N), yValues(N); std::iota(xValues.begin(), xValues.end(), 0); std::ranges::generate(yValues, [&signalGen]() { return signalGen.processOne(0); }); fmt::println("Chart {}\n\n", sig); - auto chart = gr::graphs::ImChart<128, 16>({ { 0., static_cast(N) }, { -2.6, 2.6 } }); + auto chart = gr::graphs::ImChart<128, 16>({{0., static_cast(N)}, {-2.6, 2.6}}); chart.draw(xValues, yValues, sig); chart.draw(); } @@ -156,14 +127,14 @@ const boost::ut::suite TagTests = [] { constexpr gr::Size_t n_samples = 200; constexpr float sample_rate = 1000.f; Graph testGraph; - auto &clockSrc = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "n_samples_max", n_samples }, { "name", "ClockSource" } }); - auto &signalGen = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "name", "SignalGenerator" } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "TagSink" }, { "verbose_console", true } }); + auto& clockSrc = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"n_samples_max", n_samples}, {"name", "ClockSource"}}); + auto& signalGen = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"name", "SignalGenerator"}}); + auto& sink = testGraph.emplaceBlock>({{"name", "TagSink"}, {"verbose_console", true}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(clockSrc).to<"in">(signalGen))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(signalGen).to<"in">(sink))); - scheduler::Simple sched{ std::move(testGraph) }; + scheduler::Simple sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); expect(eq(n_samples, static_cast(sink.n_samples_produced))) << "Number of samples does not match"; @@ -174,25 +145,17 @@ const boost::ut::suite TagTests = [] { const std::size_t N = 128; // test points double startValue = 10.; double finalValue = 20.; - std::vector signals{ Const, LinearRamp, ParabolicRamp, CubicSpline, ImpulseResponse }; - for (const auto &sig : signals) { + std::vector signals{Const, LinearRamp, ParabolicRamp, CubicSpline, ImpulseResponse}; + for (const auto& sig : signals) { FunctionGenerator funcGen{}; - std::ignore = funcGen.settings().set({ createPropertyMapEntry(signal_type, sig), - { "n_samples_max", static_cast(N) }, - { "sample_rate", 128.f }, - createPropertyMapEntry(start_value, startValue), - createPropertyMapEntry(final_value, finalValue), - createPropertyMapEntry(duration, 1.), - createPropertyMapEntry(round_off_time, .15), - createPropertyMapEntry(impulse_time0, .2), - createPropertyMapEntry(impulse_time1, .15) }); + std::ignore = funcGen.settings().set({createPropertyMapEntry(signal_type, sig), {"n_samples_max", static_cast(N)}, {"sample_rate", 128.f}, createPropertyMapEntry(start_value, startValue), createPropertyMapEntry(final_value, finalValue), createPropertyMapEntry(duration, 1.), createPropertyMapEntry(round_off_time, .15), createPropertyMapEntry(impulse_time0, .2), createPropertyMapEntry(impulse_time1, .15)}); std::ignore = funcGen.settings().applyStagedParameters(); std::vector xValues(N), yValues(N); std::iota(xValues.begin(), xValues.end(), 0); std::ranges::generate(yValues, [&funcGen]() { return funcGen.processOne(0.); }); fmt::println("Chart {}\n\n", toString(sig)); - auto chart = gr::graphs::ImChart<128, 32>({ { 0., static_cast(N) }, { 7., 22. } }); + auto chart = gr::graphs::ImChart<128, 32>({{0., static_cast(N)}, {7., 22.}}); chart.draw(xValues, yValues, toString(sig)); chart.draw(); } @@ -203,31 +166,30 @@ const boost::ut::suite TagTests = [] { constexpr std::uint32_t N = 1000; constexpr float sample_rate = 1000.f; Graph testGraph; - auto &clockSrc = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "n_samples_max", N }, { "name", "ClockSource" } }); + auto& clockSrc = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"n_samples_max", N}, {"name", "ClockSource"}}); - clockSrc.tags = { Tag(0, createConstPropertyMap(5.f)), // - Tag(100, createLinearRampPropertyMap(5.f, 30.f, .2f)), // - Tag(300, createConstPropertyMap(30.f)), // - Tag(350, createParabolicRampPropertyMap(30.f, 20.f, .1f, 0.02f)), // - Tag(550, createConstPropertyMap(20.f)), // - Tag(650, createCubicSplinePropertyMap(20.f, 10.f, .1f)), // - Tag(800, createConstPropertyMap(10.f)), - Tag(850, createImpulseResponsePropertyMap(10.f, 20.f, .02f, .06f)) }; + clockSrc.tags = {Tag(0, createConstPropertyMap(5.f)), // + Tag(100, createLinearRampPropertyMap(5.f, 30.f, .2f)), // + Tag(300, createConstPropertyMap(30.f)), // + Tag(350, createParabolicRampPropertyMap(30.f, 20.f, .1f, 0.02f)), // + Tag(550, createConstPropertyMap(20.f)), // + Tag(650, createCubicSplinePropertyMap(20.f, 10.f, .1f)), // + Tag(800, createConstPropertyMap(10.f)), Tag(850, createImpulseResponsePropertyMap(10.f, 20.f, .02f, .06f))}; - auto &funcGen = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "name", "FunctionGenerator" } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "TagSink" } }); + auto& funcGen = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"name", "FunctionGenerator"}}); + auto& sink = testGraph.emplaceBlock>({{"name", "TagSink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(clockSrc).to<"in">(funcGen))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(funcGen).to<"in">(sink))); - scheduler::Simple sched{ std::move(testGraph) }; + scheduler::Simple sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); expect(eq(N, static_cast(sink.samples.size()))) << "Number of samples does not match"; std::vector xValues(N); std::vector yValues(sink.samples.begin(), sink.samples.end()); std::iota(xValues.begin(), xValues.end(), 0); - auto chart = gr::graphs::ImChart<128, 32>({ { 0., static_cast(N) }, { 0., 35. } }); + auto chart = gr::graphs::ImChart<128, 32>({{0., static_cast(N)}, {0., 35.}}); chart.draw(xValues, yValues, "Signal"); chart.draw(); }; @@ -237,7 +199,7 @@ const boost::ut::suite TagTests = [] { constexpr std::uint32_t N = 1000; constexpr float sample_rate = 10000.f; Graph testGraph; - auto &clockSrc = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "n_samples_max", N }, { "name", "ClockSource" } }); + auto& clockSrc = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"n_samples_max", N}, {"name", "ClockSource"}}); // all times are in nanoseconds addTimeTagEntry(clockSrc, 1'000'000, "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=1"); @@ -250,8 +212,8 @@ const boost::ut::suite TagTests = [] { addTimeTagEntry(clockSrc, 85'000'000, "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=8"); clockSrc.repeat_period = 5'000'000; clockSrc.do_zero_order_hold = true; - auto &funcGen = testGraph.emplaceBlock>({ { "sample_rate", sample_rate }, { "name", "FunctionGenerator" } }); - funcGen.match_pred = [](const auto &tableEntry, const auto &searchEntry, const auto attempt) -> std::optional { + auto& funcGen = testGraph.emplaceBlock>({{"sample_rate", sample_rate}, {"name", "FunctionGenerator"}}); + funcGen.match_pred = [](const auto& tableEntry, const auto& searchEntry, const auto attempt) -> std::optional { if (searchEntry.find("context") == searchEntry.end()) { return std::nullopt; } @@ -281,33 +243,31 @@ const boost::ut::suite TagTests = [] { }; // Time duration is in seconds - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=1" } }, createConstPropertyMap(5.f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=2" } }, createLinearRampPropertyMap(5.f, 30.f, 0.02f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=3" } }, createConstPropertyMap(30.f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=4" } }, createParabolicRampPropertyMap(30.f, 20.f, .01f, 0.002f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=5" } }, createConstPropertyMap(20.f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=6" } }, createCubicSplinePropertyMap(20.f, 10.f, 0.01f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=7" } }, createConstPropertyMap(10.f)); - funcGen.addFunctionTableEntry({ { "context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=8" } }, createImpulseResponsePropertyMap(10.f, 20.f, 0.002f, 0.006f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=1"}}, createConstPropertyMap(5.f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=2"}}, createLinearRampPropertyMap(5.f, 30.f, 0.02f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=3"}}, createConstPropertyMap(30.f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=4"}}, createParabolicRampPropertyMap(30.f, 20.f, .01f, 0.002f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=5"}}, createConstPropertyMap(20.f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=6"}}, createCubicSplinePropertyMap(20.f, 10.f, 0.01f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=7"}}, createConstPropertyMap(10.f)); + funcGen.addFunctionTableEntry({{"context", "CMD_BP_START:FAIR.SELECTOR.C=1:S=1:P=8"}}, createImpulseResponsePropertyMap(10.f, 20.f, 0.002f, 0.006f)); - auto &sink = testGraph.emplaceBlock>({ { "name", "TagSink" } }); + auto& sink = testGraph.emplaceBlock>({{"name", "TagSink"}}); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(clockSrc).to<"in">(funcGen))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(funcGen).to<"in">(sink))); - scheduler::Simple sched{ std::move(testGraph) }; + scheduler::Simple sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); expect(eq(N, static_cast(sink.samples.size()))) << "Number of samples does not match"; std::vector xValues(N); std::vector yValues(sink.samples.begin(), sink.samples.end()); std::iota(xValues.begin(), xValues.end(), 0); - auto chart = gr::graphs::ImChart<128, 32>({ { 0., static_cast(N) }, { 0., 35. } }); + auto chart = gr::graphs::ImChart<128, 32>({{0., static_cast(N)}, {0., 35.}}); chart.draw(xValues, yValues, "Signal"); chart.draw(); }; }; -int -main() { /* not needed for UT */ -} +int main() { /* not needed for UT */ } diff --git a/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp b/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp index b3421d4b..f5c4a1d5 100644 --- a/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp +++ b/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp @@ -1,10 +1,9 @@ #ifndef BASICFILEIO_HPP #define BASICFILEIO_HPP -#include -#include #include #include +#include #include #include @@ -17,12 +16,6 @@ namespace gr::blocks::fileio { namespace detail { -[[nodiscard]] inline std::string getIsoTime() noexcept { - std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - return fmt::format("{:%Y-%m-%dT%H:%M:%S}.{:06}", // ms-precision ISO time-format - fmt::localtime(std::chrono::system_clock::to_time_t(now)), // - std::chrono::duration_cast(now.time_since_epoch()).count() % 1'000); -} inline void ensureDirectoryExists(const std::filesystem::path& filePath) { std::filesystem::create_directories(filePath.parent_path()); } @@ -33,7 +26,8 @@ inline std::vector getSortedFilesContaining(const std::st } std::vector matchingFiles; - std::copy_if(std::filesystem::directory_iterator(filePath.parent_path()), std::filesystem::directory_iterator{}, std::back_inserter(matchingFiles), [&](const auto& entry) { return entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos; }); + std::copy_if(std::filesystem::directory_iterator(filePath.parent_path()), std::filesystem::directory_iterator{}, std::back_inserter(matchingFiles), // + [&](const auto& entry) { return entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos; }); std::sort(matchingFiles.begin(), matchingFiles.end()); return matchingFiles; @@ -158,7 +152,7 @@ Important: this implementation assumes a host-order, CPU architecture specific b } break; case Mode::multi: { // _fileCounter ensures that the filenames are unique and still sortable by date-time, with an additional counter to handle rapid successive file creation. - _actualFileName = filePath.parent_path() / (detail::getIsoTime() + "_" + std::to_string(_fileCounter++) + "_" + filePath.filename().string()); + _actualFileName = filePath.parent_path() / (gr::time::getIsoTime() + "_" + std::to_string(_fileCounter++) + "_" + filePath.filename().string()); _file.open(_actualFileName, std::ios::binary); break; } diff --git a/blocks/testing/include/gnuradio-4.0/testing/PerformanceMonitor.hpp b/blocks/testing/include/gnuradio-4.0/testing/PerformanceMonitor.hpp new file mode 100644 index 00000000..e3aa8d26 --- /dev/null +++ b/blocks/testing/include/gnuradio-4.0/testing/PerformanceMonitor.hpp @@ -0,0 +1,187 @@ +#ifndef GNURADIO_PERFORMANCEMONITOR_HPP +#define GNURADIO_PERFORMANCEMONITOR_HPP + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace gr::testing { + +namespace details { +template +concept Numeric = std::integral || std::floating_point; + +template +std::string to_si_prefix(T value_base, std::string_view unit = "s", std::size_t significant_digits = 0) { + static constexpr std::array si_prefixes{'q', 'r', 'y', 'z', 'a', 'f', 'p', 'n', 'u', 'm', ' ', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y', 'R', 'Q'}; + static constexpr long double base = 1000.0l; + long double value = static_cast(value_base); + + std::size_t exponent = 10u; + if (value == 0.0l) { + return fmt::format("{:.{}f}{}{}{}", value, significant_digits, unit.empty() ? "" : " ", si_prefixes[exponent], unit); + } + while (value >= base && exponent < si_prefixes.size()) { + value /= base; + ++exponent; + } + while (value < 1.0l && exponent > 0u) { + value *= base; + --exponent; + } + if (significant_digits == 0 && exponent > 10) { + if (value < 10.0l) { + significant_digits = 2u; + } else if (value < 100.0l) { + significant_digits = 1u; + } + } else if (significant_digits == 1 && value >= 100.0l) { + --significant_digits; + } else if (significant_digits >= 2u) { + if (value >= 100.0l) { + significant_digits -= 2u; + } else if (value >= 10.0l) { + --significant_digits; + } + } + + return fmt::format("{:.{}f}{}{}{}", value, significant_digits, unit.empty() ? "" : " ", si_prefixes[exponent], unit); +} +} // namespace details + +template +struct PerformanceMonitor : public Block, ResamplingRatio<1UL, 1UL, false>> { + using Description = Doc; + + using ClockSourceType = std::chrono::system_clock; + PortIn in; + PortOut outRes; // Resident Set Size: number of bytes the process has in real memory, estimated and smoothed + PortOut outRate; // Effective sample rate in Hz + + gr::Annotated, Visible> evaluate_perf_rate{1'000'000}; + // Note: `publish_rate` is approximate and depends on `evaluate_perf_rate`. + // If it takes more time to collect `evaluate_perf_rate` samples than the actual update rate can be much higher than `publish_rate`. + gr::Annotated, Visible> publish_rate{1.f}; + gr::Annotated print to console">, Visible> output_csv_file_path = ""; + + // statistics of updates + gr::Size_t n_writes{0U}; + gr::Size_t n_updates_res{0U}; + gr::Size_t n_updates_rate{0U}; + + gr::Size_t _nSamplesCounter{0}; + std::ofstream _file; + std::chrono::time_point _lastTimePoint = ClockSourceType::now(); + float _timeFromLastUpdate{0.f}; // in sec + bool _addCsvHeader = true; + + void start() { + _lastTimePoint = ClockSourceType::now(); + _nSamplesCounter = 0; + n_writes = 0U; + n_updates_res = 0U; + n_updates_rate = 0U; + openFile(); + } + + void stop() { closeFile(); } + + gr::work::Status processBulk(gr::ConsumableSpan auto& inSpan, gr::PublishableSpan auto& outResSpan, gr::PublishableSpan auto& outRateSpan) { + const std::size_t nSamples = std::min(inSpan.size(), static_cast(evaluate_perf_rate - _nSamplesCounter)); + std::ignore = inSpan.consume(nSamples); + _nSamplesCounter += nSamples; + + if (_nSamplesCounter >= evaluate_perf_rate) { + addNewMetrics(outResSpan, outRateSpan); + } else { + if (outResSpan.size() > 0) { + outResSpan.publish(0); + } + if (outRateSpan.size() > 0) { + outRateSpan.publish(0); + } + }; + + return gr::work::Status::OK; + } + +private: + void closeFile() { + if (_file.is_open()) { + _file.close(); + } + } + + void openFile() { + if (output_csv_file_path != "") { + _file.open(output_csv_file_path, std::ios::out); + _addCsvHeader = true; + if (!_file) { + throw gr::exception(fmt::format("failed to open file '{}'.", output_csv_file_path)); + } + } + } + + void addNewMetrics(gr::PublishableSpan auto& outResSpan, gr::PublishableSpan auto& outRateSpan) { + const std::chrono::time_point timeNow = ClockSourceType::now(); + const auto dTime = std::chrono::duration_cast(timeNow - _lastTimePoint).count(); + const double rate = (dTime == 0) ? 0. : static_cast(_nSamplesCounter) * 1.e6 / static_cast(dTime); + + _timeFromLastUpdate += static_cast(dTime) / 1.e6f; // microseconds to seconds + + const auto memoryStat = gr::memory::getUsage(); + const auto residentSize = static_cast(memoryStat.residentSize); + + // write to the output ports + if (outResSpan.size() >= 1) { + outResSpan[0] = residentSize; + outResSpan.publish(1); + n_updates_res++; + } + if (outRateSpan.size() >= 1) { + outRateSpan[0] = rate; + outRateSpan.publish(1); + n_updates_rate++; + } + + if (_timeFromLastUpdate >= publish_rate) { + if (output_csv_file_path == "") { + fmt::println("Performance at {}, #{} dT:{} s, rate:{}, memory_resident:{}", // + gr::time::getIsoTime(), n_writes, _timeFromLastUpdate, details::to_si_prefix(rate, "S/s"), details::to_si_prefix(residentSize, "b")); + } else { + if (_file.is_open()) { + if (_addCsvHeader) { + _file << "Id,Time,Rate [Hz],Memory.Resident[bytes],Memory.Virtual[bytes]" << std::endl; + _addCsvHeader = false; + } + _file << n_writes << "," << gr::time::getIsoTime() << "," << rate << "," << residentSize << std::endl; + } + } + _timeFromLastUpdate = 0.f; + n_writes++; + } + + _nSamplesCounter = 0; + _lastTimePoint = timeNow; + } +}; + +} // namespace gr::testing + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::testing::PerformanceMonitor), in, outRes, outRate, publish_rate, evaluate_perf_rate, output_csv_file_path); + +auto registerPerformanceMonitor = gr::registerBlock(gr::globalBlockRegistry()); + +#endif // GNURADIO_PERFORMANCEMONITOR_HPP diff --git a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp index 89da05f8..f65ebc5c 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp @@ -8,8 +8,8 @@ #include #include -#include #include +#include namespace gr::testing { @@ -19,8 +19,7 @@ enum class ProcessFunction { USE_PROCESS_ONE_SIMD = 2 /// }; -inline constexpr void -print_tag(const Tag &tag, std::string_view prefix = {}) noexcept { +inline constexpr void print_tag(const Tag& tag, std::string_view prefix = {}) noexcept { if (tag.map.empty()) { fmt::print("{} @index= {}: map: {{ }}\n", prefix, tag.index); return; @@ -29,9 +28,8 @@ print_tag(const Tag &tag, std::string_view prefix = {}) noexcept { } template -inline constexpr void -map_diff_report(const MapType &map1, const MapType &map2, const std::string &name1, const std::string &name2, const std::optional &ignoreKey = std::nullopt) { - for (const auto &[key, value] : map1) { +inline constexpr void map_diff_report(const MapType& map1, const MapType& map2, const std::string& name1, const std::string& name2, const std::optional& ignoreKey = std::nullopt) { + for (const auto& [key, value] : map1) { if (ignoreKey && key == *ignoreKey) { continue; // skip this key } @@ -45,8 +43,7 @@ map_diff_report(const MapType &map1, const MapType &map2, const std::string &nam } template -inline constexpr void -mismatch_report(const IterType &mismatchedTag1, const IterType &mismatchedTag2, const IterType &tags1_begin, const std::optional &ignoreKey = std::nullopt) { +inline constexpr void mismatch_report(const IterType& mismatchedTag1, const IterType& mismatchedTag2, const IterType& tags1_begin, const std::optional& ignoreKey = std::nullopt) { const auto index = static_cast(std::distance(tags1_begin, mismatchedTag1)); fmt::print("mismatch at index {}", index); if (mismatchedTag1->index != mismatchedTag2->index) { @@ -60,14 +57,13 @@ mismatch_report(const IterType &mismatchedTag1, const IterType &mismatchedTag2, } } -inline constexpr bool -equal_tag_lists(const std::vector &tags1, const std::vector &tags2, const std::optional &ignoreKey = std::nullopt) { +inline constexpr bool equal_tag_lists(const std::vector& tags1, const std::vector& tags2, const std::optional& ignoreKey = std::nullopt) { if (tags1.size() != tags2.size()) { fmt::println("vectors have different sizes ({} vs {})\n", tags1.size(), tags2.size()); return false; } - auto customComparator = [&ignoreKey](const Tag &tag1, const Tag &tag2) { + auto customComparator = [&ignoreKey](const Tag& tag1, const Tag& tag2) { if (ignoreKey) { // make a copy of the maps to compare without the ignored key auto map1 = tag1.map; @@ -89,67 +85,93 @@ equal_tag_lists(const std::vector &tags1, const std::vector &tags2, co template struct TagSource : public Block> { - PortOut out; - std::vector tags{}; - std::vector values{}; // if values are set it works like repeated source. Example: values = { 1, 2, 3 }; output: 1,2,3,1,2,3... `mark_tag` is ignored in this case. - std::size_t value_index{ 0 }; // current index in values array - std::size_t next_tag{ 0 }; - gr::Size_t n_samples_max{ 1024 }; - gr::Size_t n_samples_produced{ 0 }; - float sample_rate = 1000.0f; - std::string signal_name = "unknown signal"; - std::string signal_unit = "unknown unit"; - float signal_min = std::numeric_limits::lowest(); - float signal_max = std::numeric_limits::max(); - bool verbose_console = false; - bool mark_tag = true; // true: mark tagged samples with '1' or '0' otherwise. false: [0, 1, 2, ..., ], if values is not empty mark_tag is ignored - - void - start() { + PortOut out; + + bool repeat_tags = false; // if true tags are repeated from the beginning. Example: Given the tag indices {1, 3, 5}, the output tag indices would be: 1, 3, 5, 6, 8, 10, ... + std::vector values{}; // if values are set it works like repeated source. Example: values = { 1, 2, 3 }; output: 1,2,3,1,2,3... `mark_tag` is ignored in this case. + gr::Size_t n_samples_max{1024}; // if 0 -> infinite samples + gr::Size_t n_samples_produced{0ULL}; // for infinite samples the counter wraps around back to 0, _tagIndex = 0, _valueIndex = 0 + float sample_rate = 1000.0f; + std::string signal_name = "unknown signal"; + std::string signal_unit = "unknown unit"; + float signal_min = std::numeric_limits::lowest(); + float signal_max = std::numeric_limits::max(); + bool verbose_console = false; + bool mark_tag = true; // true: mark tagged samples with '1' or '0' otherwise. false: [0, 1, 2, ..., ], if values is not empty mark_tag is ignored + + std::vector _tags{}; // It is expected that Tag.index is in ascending order + std::size_t _tagIndex{0}; // current index in tags array + std::size_t _valueIndex{0}; // current index in values array + + void start() { n_samples_produced = 0U; - value_index = 0U; + _valueIndex = 0U; + _tagIndex = 0U; + if (_tags.size() > 1) { + bool isAscending = std::ranges::is_sorted(_tags, [](const Tag& lhs, const Tag& rhs) { return lhs.index < rhs.index; }); + if (!isAscending) { + using namespace gr::message; + this->emitErrorMessage("error()", Error("The input tags should be ascending by index.")); + } + } } - T - processOne(std::size_t offset) noexcept - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) + T processOne(std::size_t offset) noexcept + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) { - const bool generatedTag = generateTag("processOne(...)", offset); + const auto [tagGenerated, tagRepeatStarted] = generateTag("processOne(...)", offset); n_samples_produced++; - if (n_samples_produced >= n_samples_max) { + if (!isInfinite() && n_samples_produced >= n_samples_max) { this->requestStop(); } + + if (isInfinite() && tagRepeatStarted) { + n_samples_produced = 0U; + } + if (!values.empty()) { - if (value_index == values.size()) { - value_index = 0; + if (_valueIndex == values.size()) { + _valueIndex = 0; } - T currentValue = values[value_index]; - value_index++; + T currentValue = values[_valueIndex]; + _valueIndex++; return currentValue; } - return mark_tag ? (generatedTag ? static_cast(1) : static_cast(0)) : static_cast(n_samples_produced); + return mark_tag ? (tagGenerated ? static_cast(1) : static_cast(0)) : static_cast(n_samples_produced); } - work::Status - processBulk(PublishableSpan auto &output) noexcept - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) + work::Status processBulk(PublishableSpan auto& output) noexcept + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) { - const bool generatedTag = generateTag("processBulk(...)"); - Tag::signed_index_type nextTagIn = next_tag < tags.size() ? std::max(1L, tags[next_tag].index - static_cast(n_samples_produced)) - : static_cast(n_samples_max - n_samples_produced); - const std::size_t nSamples = n_samples_produced < n_samples_max ? std::min(static_cast(std::max(1L, nextTagIn)), output.size()) : 0UZ; // '0UZ' -> DONE, produced enough samples + const auto [tagGenerated, tagRepeatStarted] = generateTag("processBulk(...)"); + const auto nSamplesRemainder = getNProducedSamplesRemainder(); + + gr::Size_t nextTagIn = 1U; + if (isInfinite() && tagRepeatStarted) { + nextTagIn = 1; // just publish last tag and then start from the beginning + } else { + if (_tagIndex < _tags.size()) { + if (static_cast(_tags[_tagIndex].index) > nSamplesRemainder) { + nextTagIn = static_cast(_tags[_tagIndex].index) - nSamplesRemainder; + } + } else { + nextTagIn = isInfinite() ? static_cast(output.size()) : n_samples_max - n_samples_produced; + } + } + + const std::size_t nSamples = isInfinite() || n_samples_produced < n_samples_max ? std::min(static_cast(std::max(1U, nextTagIn)), output.size()) : 0UZ; // '0UZ' -> DONE, produced enough samples if (!values.empty()) { for (std::size_t i = 0; i < nSamples; ++i) { - if (value_index == values.size()) { - value_index = 0; + if (_valueIndex == values.size()) { + _valueIndex = 0; } - output[i] = values[value_index]; - value_index++; + output[i] = values[_valueIndex]; + _valueIndex++; } } else { if (mark_tag) { - output[0] = generatedTag ? static_cast(1) : static_cast(0); + output[0] = tagGenerated ? static_cast(1) : static_cast(0); } else { for (std::size_t i = 0; i < nSamples; ++i) { output[i] = static_cast(n_samples_produced + i); @@ -157,67 +179,85 @@ struct TagSource : public Block> { } } - n_samples_produced += static_cast(nSamples); + if (isInfinite() && tagRepeatStarted) { + n_samples_produced = 0U; + } else { + n_samples_produced += static_cast(nSamples); + } output.publish(nSamples); - return n_samples_produced < n_samples_max ? work::Status::OK : work::Status::DONE; + return !isInfinite() && n_samples_produced >= n_samples_max ? work::Status::DONE : work::Status::OK; } private: - bool - generateTag(std::string_view processFunctionName, std::size_t offset = 0) { - if (next_tag < tags.size() && tags[next_tag].index <= static_cast(n_samples_produced)) { + [[nodiscard]] auto generateTag(std::string_view processFunctionName, std::size_t offset = 0) { + struct { + bool tagGenerated = false; + bool tagRepeatStarted = false; + } result; + + const auto nSamplesRemainder = getNProducedSamplesRemainder(); + if (_tagIndex < _tags.size() && static_cast(_tags[_tagIndex].index) <= nSamplesRemainder) { if (verbose_console) { - print_tag(tags[next_tag], fmt::format("{}::{}\t publish tag at {:6}", this->name.value, processFunctionName, n_samples_produced)); + print_tag(_tags[_tagIndex], fmt::format("{}::{}\t publish tag at {:6}", this->name.value, processFunctionName, n_samples_produced)); } - out.publishTag(tags[next_tag].map, static_cast(offset)); // indices > 0 write tags in the future ... handle with care + out.publishTag(_tags[_tagIndex].map, static_cast(offset)); // indices > 0 write tags in the future ... handle with care this->_outputTagsChanged = true; - next_tag++; - return true; + _tagIndex++; + if (repeat_tags && _tagIndex == _tags.size()) { + _tagIndex = 0; + result.tagRepeatStarted = true; + } + result.tagGenerated = true; + return result; } - return false; + return result; + } + + [[nodiscard]] gr::Size_t getNProducedSamplesRemainder() const { // + return repeat_tags && !_tags.empty() && !isInfinite() ? n_samples_produced % static_cast(_tags.back().index + 1) : n_samples_produced; } + + [[nodiscard]] bool isInfinite() const { return n_samples_max == 0U; } }; template struct TagMonitor : public Block> { - using ClockSourceType = std::chrono::system_clock; - PortIn in; - PortOut out; - std::vector samples{}; - std::vector tags{}; - gr::Size_t n_samples_expected{ 0 }; - gr::Size_t n_samples_produced{ 0 }; - float sample_rate = 1000.0f; - std::string signal_name; - bool log_tags = true; - bool log_samples = true; - bool verbose_console = false; - std::chrono::time_point _timeFirstSample = ClockSourceType::now(); - std::chrono::time_point _timeLastSample = ClockSourceType::now(); - - void - start() { + PortIn in; + PortOut out; + + std::vector samples{}; + gr::Size_t n_samples_expected{0}; + gr::Size_t n_samples_produced{0}; // for infinite samples the counter wraps around back to 0 + float sample_rate = 1000.0f; + std::string signal_name; + bool log_tags = true; + bool log_samples = true; + bool verbose_console = false; + + std::vector _tags{}; + + void start() { if (verbose_console) { fmt::println("started TagMonitor {} aka. '{}'", this->unique_name, this->name); } - _timeFirstSample = ClockSourceType::now(); samples.clear(); if (log_samples) { samples.reserve(std::max(0UZ, static_cast(n_samples_expected))); } - tags.clear(); + _tags.clear(); } - constexpr T - processOne(const T &input) noexcept - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) + constexpr T processOne(const T& input) noexcept + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) { if (this->input_tags_present()) { - const Tag &tag = this->mergedInputTag(); + const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, n_samples_produced)); } - tags.emplace_back(n_samples_produced, tag.map); + if (log_tags) { + _tags.emplace_back(n_samples_produced, tag.map); + } } if (log_samples) { samples.emplace_back(input); @@ -227,16 +267,17 @@ struct TagMonitor : public Block> { } template V> - [[nodiscard]] constexpr V - processOne(const V &input) noexcept // to note: the SIMD-version does not support adding tags mid-way since this is chunked at V::size() - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE_SIMD) + [[nodiscard]] constexpr V processOne(const V& input) noexcept // to note: the SIMD-version does not support adding tags mid-way since this is chunked at V::size() + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE_SIMD) { if (this->input_tags_present()) { - const Tag &tag = this->mergedInputTag(); + const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, n_samples_produced)); } - tags.emplace_back(n_samples_produced, tag.map); + if (log_tags) { + _tags.emplace_back(n_samples_produced, tag.map); + } } if (log_samples) { if constexpr (gr::meta::any_simd) { @@ -255,16 +296,17 @@ struct TagMonitor : public Block> { return input; } - constexpr work::Status - processBulk(std::span input, std::span output) noexcept - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) + constexpr work::Status processBulk(std::span input, std::span output) noexcept + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) { - if (log_tags && this->input_tags_present()) { - const Tag &tag = this->mergedInputTag(); + if (this->input_tags_present()) { + const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processBulk(...{}, ...{})\t received tag at {:6}", this->name, input.size(), output.size(), n_samples_produced)); } - tags.emplace_back(n_samples_produced, tag.map); + if (log_tags) { + _tags.emplace_back(n_samples_produced, tag.map); + } } if (log_samples) { @@ -281,50 +323,46 @@ struct TagMonitor : public Block> { template struct TagSink : public Block> { using ClockSourceType = std::chrono::system_clock; - PortIn in; - std::vector samples{}; - std::vector tags{}; - gr::Size_t n_samples_expected{ 0 }; - std::uint32_t n_samples_produced{ 0 }; - float sample_rate = 1000.0f; - std::string signal_name; - bool log_tags = true; - bool log_samples = true; - bool verbose_console = false; - std::chrono::time_point _timeFirstSample = ClockSourceType::now(); - std::chrono::time_point _timeLastSample = ClockSourceType::now(); - - void - start() { + PortIn in; + + std::vector samples{}; + gr::Size_t n_samples_expected{0}; + gr::Size_t n_samples_produced{0}; // for infinite samples the counter wraps around back to 0 + float sample_rate = 1000.0f; + std::string signal_name; + bool log_tags = true; + bool log_samples = true; + bool verbose_console = false; + + std::vector _tags{}; + + void start() { if (verbose_console) { fmt::println("started sink {} aka. '{}'", this->unique_name, this->name); } - _timeFirstSample = ClockSourceType::now(); samples.clear(); if (log_samples) { samples.reserve(std::max(0UZ, static_cast(n_samples_expected))); } - tags.clear(); + _tags.clear(); } - void - stop() { + void stop() { if (verbose_console) { fmt::println("stopped sink {} aka. '{}'", this->unique_name, this->name); } } - constexpr void - processOne(const T &input) noexcept // N.B. non-SIMD since we need a sample-by-sample accurate tag detection - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) + constexpr void processOne(const T& input) noexcept // N.B. non-SIMD since we need a sample-by-sample accurate tag detection + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) { if (this->input_tags_present()) { - const Tag &tag = this->mergedInputTag(); + const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...1) \t received tag at {:6}", this->name, n_samples_produced)); } if (log_tags) { - tags.emplace_back(n_samples_produced, tag.map); + _tags.emplace_back(n_samples_produced, tag.map); } } if (log_samples) { @@ -334,51 +372,37 @@ struct TagSink : public Block> { if (n_samples_expected > 0 && n_samples_produced >= n_samples_expected) { this->requestStop(); } - _timeLastSample = ClockSourceType::now(); } // template V> - constexpr work::Status - processBulk(std::span input) noexcept - requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) + constexpr work::Status processBulk(std::span input) noexcept + requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) { if (this->input_tags_present()) { - const Tag &tag = this->mergedInputTag(); + const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processBulk(...{})\t received tag at {:6}", this->name, input.size(), n_samples_produced)); } if (log_tags) { - tags.emplace_back(n_samples_produced, tag.map); + _tags.emplace_back(n_samples_produced, tag.map); } } if (log_samples) { samples.insert(samples.cend(), input.begin(), input.end()); } - n_samples_produced += static_cast(input.size()); - _timeLastSample = ClockSourceType::now(); + n_samples_produced += static_cast(input.size()); return n_samples_expected > 0 && n_samples_produced >= n_samples_expected ? work::Status::DONE : work::Status::OK; } - - float - effective_sample_rate() const { - const auto total_elapsed_time = std::chrono::duration_cast(_timeLastSample - _timeFirstSample).count(); - return total_elapsed_time == 0 ? std::numeric_limits::quiet_NaN() : static_cast(n_samples_produced) * 1e6f / static_cast(total_elapsed_time); - } }; } // namespace gr::testing -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource), out, n_samples_max, sample_rate, signal_name, signal_unit, signal_min, signal_max, verbose_console, mark_tag, values); -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagMonitor), in, out, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags, - log_samples, verbose_console, samples); -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSink), in, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags, - log_samples, verbose_console, samples); - -auto registerTagSource = gr::registerBlock(gr::globalBlockRegistry()) - | gr::registerBlock(gr::globalBlockRegistry()); -auto registerTagMonitor = gr::registerBlock(gr::globalBlockRegistry()) - | gr::registerBlock(gr::globalBlockRegistry()); -auto registerTagSink = gr::registerBlock(gr::globalBlockRegistry()) - | gr::registerBlock(gr::globalBlockRegistry()); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource), out, n_samples_max, sample_rate, signal_name, signal_unit, signal_min, signal_max, verbose_console, mark_tag, values, repeat_tags); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagMonitor), in, out, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags, log_samples, verbose_console, samples); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSink), in, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags, log_samples, verbose_console, samples); + +auto registerTagSource = gr::registerBlock(gr::globalBlockRegistry()) | gr::registerBlock(gr::globalBlockRegistry()); +auto registerTagMonitor = gr::registerBlock(gr::globalBlockRegistry()) | gr::registerBlock(gr::globalBlockRegistry()); +auto registerTagSink = gr::registerBlock(gr::globalBlockRegistry()) | gr::registerBlock(gr::globalBlockRegistry()); #endif // GNURADIO_TAGMONITORS_HPP diff --git a/core/include/gnuradio-4.0/Settings.hpp b/core/include/gnuradio-4.0/Settings.hpp index 0307ce8c..1338484d 100644 --- a/core/include/gnuradio-4.0/Settings.hpp +++ b/core/include/gnuradio-4.0/Settings.hpp @@ -533,7 +533,7 @@ class BasicSettings : public SettingsBase { } } - [[nodiscard]] gr::Size_t getNStoredParameters() const noexcept override { return 0; } // Implemented only for compatibility + [[nodiscard]] gr::Size_t getNStoredParameters() const noexcept override { return 1; } // Implemented only for compatibility [[nodiscard]] std::map>, settings::PMTCompare> getStoredAll() const noexcept override { return {}; } // Implemented only for compatibility diff --git a/core/include/gnuradio-4.0/Transactions.hpp b/core/include/gnuradio-4.0/Transactions.hpp index ce11c6f0..022a7645 100644 --- a/core/include/gnuradio-4.0/Transactions.hpp +++ b/core/include/gnuradio-4.0/Transactions.hpp @@ -241,7 +241,7 @@ class CtxSettings : public SettingsBase { const property_map& parameters = tag.map; bool wasChanged = false; for (const auto& [key, value] : parameters) { - auto processOneMember = [&](auto member) { + auto processOneMember = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; if constexpr (settings::isWritableMember(member)) { if (autoUpdateParameters.contains(key) && std::string(get_display_name(member)) == key && std::holds_alternative(value)) { @@ -317,7 +317,7 @@ class CtxSettings : public SettingsBase { std::lock_guard lg(_lock); gr::Size_t nParameters{0}; for (const auto& stored : _storedParameters) { - nParameters += stored.second.size(); + nParameters += static_cast(stored.second.size()); } return nParameters; } diff --git a/core/include/gnuradio-4.0/thread/MemoryMonitor.hpp b/core/include/gnuradio-4.0/thread/MemoryMonitor.hpp new file mode 100644 index 00000000..eeabad22 --- /dev/null +++ b/core/include/gnuradio-4.0/thread/MemoryMonitor.hpp @@ -0,0 +1,93 @@ +#ifndef GNURADIO_MEMORYMONITOR_HPP +#define GNURADIO_MEMORYMONITOR_HPP + +#include +#include +#include + +#if defined(_WIN32) || defined(_WIN64) +#include +#include +#elif defined(__linux__) +#include +#include +#elif defined(__APPLE__) +#include +#endif + +namespace gr { +namespace memory { + +/* +For Linux `/proc/pid/statm` is used. It provides information about memory usage, measured in pages. +The following information is available: +size (1) total program size (same as VmSize in /proc/pid/status) +resident (2) resident set size +shared (3) number of resident shared pages +text (4) text (code) +lib (5) library (unused since Linux 2.6; always 0) +data (6) data + stack +dt (7) dirty pages (unused since Linux 2.6; always 0) + +For the moment only resident memory is provided. But it can be extended further with the information available from `proc/self/statm` or `/proc/self/stat`. +*/ +struct Stat { + std::size_t residentSize; // Resident Set Size: number of bytes the process has in real memory +}; + +inline Stat getUsage() { +#if defined(_WIN32) || defined(_WIN64) + PROCESS_MEMORY_COUNTERS pmc; + if (!GetProcessMemoryInfo(GetCurrentProcess(), &pmc, sizeof(pmc))) { + return {.residentSize = 0UZ}; + } + return {.residentSize = pmc.WorkingSetSize}; + +#elif defined(__EMSCRIPTEN__) + return {.residentSize = 0UZ}; + +#elif defined(__linux__) + if (!std::filesystem::exists("/proc/self/statm")) { + return {.residentSize = 0UZ}; + } + std::ifstream stat("/proc/self/statm"); + if (!stat.is_open()) { + return {.residentSize = 0UZ}; + } + std::string ignore; + std::size_t rss; + stat >> ignore >> rss; + rss = rss * static_cast(sysconf(_SC_PAGESIZE)); + + return {.residentSize = rss}; + +#elif defined(__APPLE__) + struct mach_task_basic_info info; + mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; + if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS) { + return {.residentSize = 0UZ}; + } + return {.residentSize = info.resident_size}; + +#else + return {.residentSize = 0UZ}; +#endif +} + +inline std::string getPlatformName() { +#if defined(_WIN32) || defined(_WIN64) + return "Windows"; +#elif defined(__linux__) + return "Linux"; +#elif defined(__APPLE__) + return "macOS"; +#else + return ""; +#endif +} + +} // namespace memory + +} // namespace gr + +#endif // GNURADIO_MEMORYMONITOR_HPP diff --git a/core/test/CMakeLists.txt b/core/test/CMakeLists.txt index 4c1c156a..50c7a58e 100644 --- a/core/test/CMakeLists.txt +++ b/core/test/CMakeLists.txt @@ -10,10 +10,10 @@ function(setup_test_no_asan TEST_NAME) endfunction() function(setup_test TEST_NAME) - if(PYTHON_AVAILABLE) + if (PYTHON_AVAILABLE) target_include_directories(${TEST_NAME} PRIVATE ${Python3_INCLUDE_DIRS} ${NUMPY_INCLUDE_DIR}) target_link_libraries(${TEST_NAME} PRIVATE ${Python3_LIBRARIES}) - endif() + endif () setup_test_no_asan(${TEST_NAME}) endfunction() @@ -45,6 +45,7 @@ add_ut_test(qa_Messages) add_ut_test(qa_GraphMessages) add_ut_test(qa_thread_affinity) add_ut_test(qa_thread_pool) +add_ut_test(qa_PerformanceMonitor) if (ENABLE_BLOCK_REGISTRY AND ENABLE_BLOCK_PLUGINS) add_app_test(qa_grc) diff --git a/core/test/qa_PerformanceMonitor.cpp b/core/test/qa_PerformanceMonitor.cpp new file mode 100644 index 00000000..368e9996 --- /dev/null +++ b/core/test/qa_PerformanceMonitor.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { +using namespace std::chrono_literals; +template +auto createWatchdog(Scheduler& sched, std::chrono::seconds timeOut = 2s, std::chrono::milliseconds pollingPeriod = 40ms) { + auto externalInterventionNeeded = std::make_shared(false); + + std::thread watchdogThread([&sched, externalInterventionNeeded, timeOut, pollingPeriod]() { + auto timeout = std::chrono::steady_clock::now() + timeOut; + while (std::chrono::steady_clock::now() < timeout) { + if (sched.state() == gr::lifecycle::State::STOPPED) { + return; + } + std::this_thread::sleep_for(pollingPeriod); + } + fmt::println("watchdog kicked in"); + externalInterventionNeeded->store(true, std::memory_order_relaxed); + sched.requestStop(); + fmt::println("requested scheduler to stop"); + }); + + return std::make_pair(std::move(watchdogThread), externalInterventionNeeded); +} +} // namespace + +int main(int argc, char* argv[]) { + using namespace boost::ut; + using namespace gr; + using namespace gr::testing; + + int runTime = -1; // in seconds + int testCaseId = -1; + std::string outFilePath = ""; + + if (argc >= 2) { + runTime = std::atoi(argv[1]); + } + if (argc >= 3) { + testCaseId = std::atoi(argv[2]); + } + if (argc >= 4) { + outFilePath = std::string(argv[3]); + } + fmt::println("3 optional settings are available: qa_PerformanceMonitor [in sec] [1:no tags,2:moderate,3:1-to-1] "); + fmt::println(":{} s, :{}, :{}", runTime, testCaseId, outFilePath); + + auto threadPool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); + + gr::Size_t nSamples = 0U; + gr::Size_t evaluatePerfRate = 100'000; + Graph testGraph; + const property_map srcParameter = {{"n_samples_max", nSamples}, {"name", "TagSource"}, {"verbose_console", false}, {"repeat_tags", true}}; + auto& src = testGraph.emplaceBlock>(srcParameter); + + // parameters of generated Tags + gr::Tag::signed_index_type nSamplesPerTag = 10000; + bool tagWithAutoUpdate = false; + std::string tagName = tagWithAutoUpdate ? "sample_rate" : "some_random_name_1234"; + std::string outputCsvFilePath = ""; + + if (outFilePath != "") { + outputCsvFilePath = outFilePath; + } + if (testCaseId == 1) { + nSamplesPerTag = 0; + } else if (testCaseId == 2) { + nSamplesPerTag = 10000; + } else if (testCaseId == 3) { + nSamplesPerTag = 1; + } + + if (nSamplesPerTag > 0) { + src._tags = {gr::Tag(static_cast(nSamplesPerTag - 1), {{tagName, 2000.f}})}; + }; + + auto& monitorBulk = testGraph.emplaceBlock>({{"name", "TagMonitorBulk"}, {"log_samples", false}, {"log_tags", false}}); + auto& monitorOne = testGraph.emplaceBlock>({{"name", "TagMonitorOne"}, {"log_samples", false}, {"log_tags", false}}); + auto& monitorPerformance = testGraph.emplaceBlock>( // + {{"name", "PerformanceMonitor"}, {"evaluate_perf_rate", evaluatePerfRate}, {"output_csv_file_path", outputCsvFilePath}}); + + // performance statistics outputs + auto& sinkRes = testGraph.emplaceBlock>({{"name", "TagSinkRes"}, {"log_samples", false}, {"log_tags", false}}); + auto& sinkRate = testGraph.emplaceBlock>({{"name", "TagSinkRate"}, {"log_samples", false}, {"log_tags", false}}); + + // src -> monitorBulk -> monitorOne -> monitorPerformance + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(monitorBulk))); + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(monitorBulk).to<"in">(monitorOne))); + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(monitorOne).to<"in">(monitorPerformance))); + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"outRes">(monitorPerformance).to<"in">(sinkRes))); + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"outRate">(monitorPerformance).to<"in">(sinkRate))); + + auto sched = scheduler::Simple{std::move(testGraph), threadPool}; + auto [watchdogThread, externalInterventionNeeded] = createWatchdog(sched, runTime > 0 ? std::chrono::seconds(runTime) : 2s); + expect(sched.runAndWait().has_value()); + + if (watchdogThread.joinable()) { + watchdogThread.join(); + } + + expect(approx(monitorPerformance.n_updates_res, sinkRes.n_samples_produced, 2)); + expect(approx(monitorPerformance.n_updates_rate, sinkRate.n_samples_produced, 2)); + // expect(!externalInterventionNeeded->load(std::memory_order_relaxed)); +} diff --git a/core/test/qa_Settings.cpp b/core/test/qa_Settings.cpp index 9ebc090a..e711c1f9 100644 --- a/core/test/qa_Settings.cpp +++ b/core/test/qa_Settings.cpp @@ -685,14 +685,18 @@ const boost::ut::suite TransactionTests = [] { auto& src = testGraph.emplaceBlock>(srcParameter); const auto timeNow = std::chrono::system_clock::now(); - src.tags.push_back({50, {{"sample_rate", 50.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(30))}}}); + src._tags.push_back({50, {{"sample_rate", 50.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(30))}, // + {std::string(gr::tag::TRIGGER_NAME.shortKey()), "name30"}, {std::string(gr::tag::TRIGGER_OFFSET.shortKey()), 0.f}}}); for (std::size_t i = 100; i < 500; i++) { - src.tags.push_back(gr::Tag(static_cast(i), {{"sample_rate", static_cast(i)}})); + src._tags.push_back(gr::Tag(static_cast(i), {{"sample_rate", static_cast(i)}})); } - src.tags.push_back({550, {{"sample_rate", 550.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(20))}}}); - src.tags.push_back({560, {{"sample_rate", 560.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(10))}}}); + src._tags.push_back({550, {{"sample_rate", 550.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(20))}, // + {std::string(gr::tag::TRIGGER_NAME.shortKey()), "name20"}, {std::string(gr::tag::TRIGGER_OFFSET.shortKey()), 0.f}}}); + src._tags.push_back({560, {{"sample_rate", 560.f}, {std::string(gr::tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(timeNow + std::chrono::seconds(10))}, // + {std::string(gr::tag::TRIGGER_NAME.shortKey()), "name10"}, {std::string(gr::tag::TRIGGER_OFFSET.shortKey()), 0.f}}}); + for (std::size_t i = 600; i < n_samples; i++) { - src.tags.push_back(gr::Tag(static_cast(i), {{"sample_rate", static_cast(i)}})); + src._tags.push_back(gr::Tag(static_cast(i), {{"sample_rate", static_cast(i)}})); } auto& monitorBulk = testGraph.emplaceBlock>({{"name", "TagMonitorBulk"}, {"n_samples_expected", n_samples}}); diff --git a/core/test/qa_Tags.cpp b/core/test/qa_Tags.cpp index 8fea5b69..11b70042 100644 --- a/core/test/qa_Tags.cpp +++ b/core/test/qa_Tags.cpp @@ -5,9 +5,9 @@ #include #include #include -#include #include #include +#include #include @@ -52,7 +52,7 @@ const boost::ut::suite TagTests = [] { // testTag.insert_or_assign(tag::SAMPLE_RATE(5.0)); // type-mismatch -> won't compile expect(testTag.at(tag::SAMPLE_RATE) == 4.0f); expect(tag::SAMPLE_RATE.shortKey() == "sample_rate"); - expect(tag::SAMPLE_RATE.key() == std::string{ GR_TAG_PREFIX }.append("sample_rate")); + expect(tag::SAMPLE_RATE.key() == std::string{GR_TAG_PREFIX}.append("sample_rate")); expect(testTag.get(tag::SAMPLE_RATE).has_value()); static_assert(!std::is_const_v); @@ -102,31 +102,26 @@ const boost::ut::suite TagPropagation = [] { auto runTest = [](bool verbose = true) { gr::Size_t n_samples = 1024; Graph testGraph; - const property_map srcParameter = { { "n_samples_max", n_samples }, { "name", "TagSource" }, { "signal_name", "tagStream" }, { "verbose_console", true && verbose } }; - auto &src = testGraph.emplaceBlock>(srcParameter); - src.tags = { + const property_map srcParameter = {{"n_samples_max", n_samples}, {"name", "TagSource"}, {"signal_name", "tagStream"}, {"verbose_console", true && verbose}}; + auto& src = testGraph.emplaceBlock>(srcParameter); + src._tags = { // TODO: allow parameter settings to include maps?!? - { 0, { { "key", "value@0" } } }, // - { 1, { { "key", "value@1" } } }, // - { 100, { { "key", "value@100" } } }, // - { 150, { { "key", "value@150" } } }, // - { 1000, { { "key", "value@1000" } } }, // - { 1001, { { "key", "value@1001" } } }, // - { 1002, { { "key", "value@1002" } } }, // - { 1023, { { "key", "value@1023" } } } // + {0, {{"key", "value@0"}}}, // + {1, {{"key", "value@1"}}}, // + {100, {{"key", "value@100"}}}, // + {150, {{"key", "value@150"}}}, // + {1000, {{"key", "value@1000"}}}, // + {1001, {{"key", "value@1001"}}}, // + {1002, {{"key", "value@1002"}}}, // + {1023, {{"key", "value@1023"}}} // }; expect(eq("tagStream"s, src.signal_name)) << "src signal_name -> needed for setting-via-tag forwarding"; - auto &monitorBulk = testGraph.emplaceBlock>( - { { "name", "TagMonitorBulk" }, { "n_samples_expected", n_samples }, { "verbose_console", true && verbose } }); - auto &monitorOne = testGraph.emplaceBlock>( - { { "name", "TagMonitorOne" }, { "n_samples_expected", n_samples }, { "verbose_console", false && verbose } }); - auto &monitorOneSIMD = testGraph.emplaceBlock>( - { { "name", "TagMonitorOneSIMD" }, { "n_samples_expected", n_samples }, { "verbose_console", false && verbose } }); - auto &sinkBulk = testGraph.emplaceBlock>( - { { "name", "TagSinkN" }, { "n_samples_expected", n_samples }, { "verbose_console", true && verbose } }); - auto &sinkOne = testGraph.emplaceBlock>( - { { "name", "TagSinkOne" }, { "n_samples_expected", n_samples }, { "verbose_console", true && verbose } }); + auto& monitorBulk = testGraph.emplaceBlock>({{"name", "TagMonitorBulk"}, {"n_samples_expected", n_samples}, {"verbose_console", true && verbose}}); + auto& monitorOne = testGraph.emplaceBlock>({{"name", "TagMonitorOne"}, {"n_samples_expected", n_samples}, {"verbose_console", false && verbose}}); + auto& monitorOneSIMD = testGraph.emplaceBlock>({{"name", "TagMonitorOneSIMD"}, {"n_samples_expected", n_samples}, {"verbose_console", false && verbose}}); + auto& sinkBulk = testGraph.emplaceBlock>({{"name", "TagSinkN"}, {"n_samples_expected", n_samples}, {"verbose_console", true && verbose}}); + auto& sinkOne = testGraph.emplaceBlock>({{"name", "TagSinkOne"}, {"n_samples_expected", n_samples}, {"verbose_console", true && verbose}}); // src ─> monitorBulk ─> monitorOne ─> monitorOneSIMD ┬─> sinkBulk // └─> sinkOne @@ -136,7 +131,7 @@ const boost::ut::suite TagPropagation = [] { expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(monitorOneSIMD).to<"in">(sinkBulk))); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(monitorOneSIMD).to<"in">(sinkOne))); - scheduler::Simple sched{ std::move(testGraph) }; + scheduler::Simple sched{std::move(testGraph)}; expect(sched.runAndWait().has_value()); // settings forwarding @@ -160,11 +155,47 @@ const boost::ut::suite TagPropagation = [] { expect(!sinkBulk.log_samples || eq(sinkBulk.samples.size(), n_samples)) << "sinkBulk did not log enough input samples"; expect(!sinkOne.log_samples || eq(sinkOne.samples.size(), n_samples)) << "sinkOne did not log enough input samples"; - expect(equal_tag_lists(src.tags, monitorBulk.tags, "signal_name"s)) << "monitorBulk did not receive the required tags"; - expect(equal_tag_lists(src.tags, monitorOne.tags, "signal_name"s)) << "monitorOne did not receive the required tags"; - expect(equal_tag_lists(src.tags, monitorOneSIMD.tags, "signal_name"s)) << "monitorOneSIMD did not receive the required tags"; - expect(equal_tag_lists(src.tags, sinkBulk.tags, "signal_name"s)) << "sinkBulk did not receive the required tags"; - expect(equal_tag_lists(src.tags, sinkOne.tags, "signal_name"s)) << "sinkOne did not receive the required tags"; + expect(equal_tag_lists(src._tags, monitorBulk._tags, "signal_name"s)) << "monitorBulk did not receive the required tags"; + expect(equal_tag_lists(src._tags, monitorOne._tags, "signal_name"s)) << "monitorOne did not receive the required tags"; + expect(equal_tag_lists(src._tags, monitorOneSIMD._tags, "signal_name"s)) << "monitorOneSIMD did not receive the required tags"; + expect(equal_tag_lists(src._tags, sinkBulk._tags, "signal_name"s)) << "sinkBulk did not receive the required tags"; + expect(equal_tag_lists(src._tags, sinkOne._tags, "signal_name"s)) << "sinkOne did not receive the required tags"; + }; + + "TagSource"_test = [&runTest] { runTest.template operator()(true); }; + + "TagSource"_test = [&runTest] { runTest.template operator()(true); }; +}; + +const boost::ut::suite RepeatedTags = [] { + using namespace boost::ut; + using namespace gr; + using namespace gr::testing; + + auto runTest = [](bool verbose = true) { + gr::Size_t n_samples = 30; + Graph testGraph; + const property_map srcParameter = {{"n_samples_max", n_samples}, {"name", "TagSource"}, {"verbose_console", true && verbose}, {"repeat_tags", true}}; + auto& src = testGraph.emplaceBlock>(srcParameter); + src._tags = {{2, {{"key", "value@2"}}}, {3, {{"key", "value@3"}}}, {5, {{"key", "value@5"}}}, {8, {{"key", "value@8"}}}}; + + auto& monitorOne = testGraph.emplaceBlock>({{"name", "TagMonitorOne"}, {"n_samples_expected", n_samples}, {"verbose_console", false && verbose}}); + auto& sinkOne = testGraph.emplaceBlock>({{"name", "TagSinkOne"}, {"n_samples_expected", n_samples}, {"verbose_console", false && verbose}}); + + // src -> monitorOne -> sinkOne + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).template to<"in">(monitorOne))); + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(monitorOne).to<"in">(sinkOne))); + + scheduler::Simple sched{std::move(testGraph)}; + expect(sched.runAndWait().has_value()); + + expect(eq(src._tags.size(), 4UZ)); + expect(eq(monitorOne._tags.size(), 13UZ)); + expect(eq(sinkOne._tags.size(), 13UZ)); + for (std::size_t i = 0; i < monitorOne._tags.size(); i++) { + expect(monitorOne._tags[i].map.at("key") == src._tags[i % src._tags.size()].map.at("key")); + expect(sinkOne._tags[i].map.at("key") == src._tags[i % src._tags.size()].map.at("key")); + } }; "TagSource"_test = [&runTest] { runTest.template operator()(true); }; @@ -172,6 +203,4 @@ const boost::ut::suite TagPropagation = [] { "TagSource"_test = [&runTest] { runTest.template operator()(true); }; }; -int -main() { /* tests are statically executed */ -} +int main() { /* tests are statically executed */ } diff --git a/meta/include/gnuradio-4.0/meta/formatter.hpp b/meta/include/gnuradio-4.0/meta/formatter.hpp index 578ca940..abbe1edc 100644 --- a/meta/include/gnuradio-4.0/meta/formatter.hpp +++ b/meta/include/gnuradio-4.0/meta/formatter.hpp @@ -3,20 +3,31 @@ #include #include +#include #include #include -#include #include +#include #include #include +namespace gr { +namespace time { +[[nodiscard]] inline std::string getIsoTime() noexcept { + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + return fmt::format("{:%Y-%m-%dT%H:%M:%S}.{:06}", // ms-precision ISO time-format + fmt::localtime(std::chrono::system_clock::to_time_t(now)), // + std::chrono::duration_cast(now.time_since_epoch()).count() % 1'000); +} +} // namespace time +} // namespace gr + template struct fmt::formatter> { char presentation = 'g'; // default format template - constexpr auto - parse(ParseContext &ctx) { + constexpr auto parse(ParseContext& ctx) { auto it = ctx.begin(), end = ctx.end(); if (it != end && (*it == 'f' || *it == 'F' || *it == 'e' || *it == 'E' || *it == 'g' || *it == 'G')) { presentation = *it++; @@ -28,8 +39,7 @@ struct fmt::formatter> { } template - constexpr auto - format(const std::complex &value, FormatContext &ctx) const { + constexpr auto format(const std::complex& value, FormatContext& ctx) const { // format according to: https://fmt.dev/papers/p2197r0.html#examples const auto imag = value.imag(); switch (presentation) { @@ -72,14 +82,12 @@ struct fmt::formatter> { template struct fmt::formatter> { template - constexpr auto - parse(ParseContext &ctx) { + constexpr auto parse(ParseContext& ctx) { return ctx.begin(); } template - constexpr auto - format(const gr::UncertainValue &value, FormatContext &ctx) const { + constexpr auto format(const gr::UncertainValue& value, FormatContext& ctx) const { if constexpr (gr::meta::complex_like) { return fmt::format_to(ctx.out(), "({} ± {})", value.value, value.uncertainty); } else { @@ -91,14 +99,12 @@ struct fmt::formatter> { template<> struct fmt::formatter { template - constexpr auto - parse(ParseContext &ctx) { + constexpr auto parse(ParseContext& ctx) { return ctx.begin(); } template - constexpr auto - format(const gr::property_map &value, FormatContext &ctx) const { + constexpr auto format(const gr::property_map& value, FormatContext& ctx) const { return fmt::format_to(ctx.out(), "{{ {} }}", fmt::join(value, ", ")); } }; @@ -107,17 +113,19 @@ template<> struct fmt::formatter> { char presentation = 'c'; - constexpr auto - parse(format_parse_context &ctx) -> decltype(ctx.begin()) { + constexpr auto parse(format_parse_context& ctx) -> decltype(ctx.begin()) { auto it = ctx.begin(), end = ctx.end(); - if (it != end && (*it == 's' || *it == 'c')) presentation = *it++; - if (it != end && *it != '}') throw fmt::format_error("invalid format"); + if (it != end && (*it == 's' || *it == 'c')) { + presentation = *it++; + } + if (it != end && *it != '}') { + throw fmt::format_error("invalid format"); + } return it; } template - auto - format(const std::vector &v, FormatContext &ctx) const -> decltype(ctx.out()) { + auto format(const std::vector& v, FormatContext& ctx) const -> decltype(ctx.out()) { auto sep = (presentation == 'c' ? ", " : " "); size_t len = v.size(); fmt::format_to(ctx.out(), "["); @@ -134,15 +142,11 @@ struct fmt::formatter> { template struct fmt::formatter> { - constexpr auto - parse(format_parse_context &ctx) -> decltype(ctx.begin()) { - return ctx.begin(); - } + constexpr auto parse(format_parse_context& ctx) -> decltype(ctx.begin()) { return ctx.begin(); } // Formats the source_location, using 'f' for file and 'l' for line template - auto - format(const std::expected &ret, FormatContext &ctx) const -> decltype(ctx.out()) { + auto format(const std::expected& ret, FormatContext& ctx) const -> decltype(ctx.out()) { if (ret.has_value()) { return fmt::format_to(ctx.out(), "", ret.value()); } else {