diff --git a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp index 0b6a4116..d6cbadda 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp @@ -482,7 +482,7 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. [[nodiscard]] work::Status processBulk(std::span inData) noexcept { std::optional tagData; - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { assert(this->mergedInputTag().index == 0); tagData = this->mergedInputTag().map; } diff --git a/blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp b/blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp index 310b7c26..1f4d8655 100644 --- a/blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp +++ b/blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp @@ -70,7 +70,7 @@ This block supports multiple output ports and was tested against the 'rtlsdr' an if (newSettings.contains("rx_antennae")) { setAntennae(); } - if (newSettings.contains("rx_center_frequency ") || newSettings.contains("sample_rate")) { + if (newSettings.contains("rx_center_frequency") || newSettings.contains("sample_rate")) { setCenterFrequency(); } if (newSettings.contains("rx_gains")) { diff --git a/blocks/testing/include/gnuradio-4.0/testing/ImChartMonitor.hpp b/blocks/testing/include/gnuradio-4.0/testing/ImChartMonitor.hpp index 8a71a631..db97250a 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/ImChartMonitor.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/ImChartMonitor.hpp @@ -4,10 +4,10 @@ #include "gnuradio-4.0/BlockRegistry.hpp" #include -#include -#include #include #include +#include +#include #include #include @@ -15,30 +15,25 @@ namespace gr::testing { template - requires(std::is_arithmetic_v || gr::DataSetLike) +requires(std::is_arithmetic_v || gr::DataSetLike) struct ImChartMonitor : public Block, BlockingIO, Drawable> { using ClockSourceType = std::chrono::system_clock; PortIn in; float sample_rate = 1000.0f; std::string signal_name = "unknown signal"; - HistoryBuffer _historyBufferX{ 1000 }; - HistoryBuffer _historyBufferY{ 1000 }; - HistoryBuffer _historyBufferTags{ 1000 }; + HistoryBuffer _historyBufferX{1000}; + HistoryBuffer _historyBufferY{1000}; + HistoryBuffer _historyBufferTags{1000}; - void - start() { + void start() { fmt::println("started sink {} aka. '{}'", this->unique_name, this->name); in.max_samples = 10UZ; } - void - stop() { - fmt::println("stopped sink {} aka. '{}'", this->unique_name, this->name); - } + void stop() { fmt::println("stopped sink {} aka. '{}'", this->unique_name, this->name); } - constexpr void - processOne(const T &input) noexcept { + constexpr void processOne(const T& input) noexcept { if constexpr (std::is_arithmetic_v) { in.max_samples = static_cast(2.f * sample_rate / 25.f); const T Ts = T(1.0f) / T(sample_rate); @@ -46,7 +41,7 @@ struct ImChartMonitor : public Block, BlockingIO, Drawa } _historyBufferY.push_back(input); - if (this->input_tags_present()) { // received tag + if (this->inputTagsPresent()) { // received tag _historyBufferTags.push_back(this->mergedInputTag()); _historyBufferTags[1].index = 0; this->_mergedInputTag.map.clear(); // TODO: provide proper API for clearing tags @@ -55,8 +50,7 @@ struct ImChartMonitor : public Block, BlockingIO, Drawa } } - work::Status - draw(const property_map &config = {}) noexcept { + work::Status draw(const property_map& config = {}) noexcept { [[maybe_unused]] const work::Status status = this->invokeWork(); // calls work(...) -> processOne(...) (all in the same thread as this 'draw()' if constexpr (std::is_arithmetic_v) { @@ -75,21 +69,19 @@ struct ImChartMonitor : public Block, BlockingIO, Drawa std::vector reversedY(_historyBufferY.rbegin(), _historyBufferY.rend()); std::vector reversedTag(_historyBufferX.size()); if constexpr (std::is_floating_point_v) { - std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), - [](const Tag &tag, const T &yValue) { return tag.index < 0 ? std::numeric_limits::quiet_NaN() : yValue; }); + std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), [](const Tag& tag, const T& yValue) { return tag.index < 0 ? std::numeric_limits::quiet_NaN() : yValue; }); } else { - std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), - [](const Tag &tag, const T &yValue) { return tag.index < 0 ? std::numeric_limits::lowest() : yValue; }); + std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), [](const Tag& tag, const T& yValue) { return tag.index < 0 ? std::numeric_limits::lowest() : yValue; }); } auto adjustRange = [](T min, T max) { min = std::min(min, T(0)); max = std::max(max, T(0)); const T margin = (max - min) * static_cast(0.2); - return std::pair{ min - margin, max + margin }; + return std::pair{min - margin, max + margin}; }; - auto chart = gr::graphs::ImChart<130, 28>({ { *xMin, *xMax }, adjustRange(*yMin, *yMax) }); + auto chart = gr::graphs::ImChart<130, 28>({{*xMin, *xMax}, adjustRange(*yMin, *yMax)}); chart.draw(reversedX, reversedY, signal_name); chart.draw(reversedX, reversedTag, "Tags"); chart.draw(); @@ -97,7 +89,7 @@ struct ImChartMonitor : public Block, BlockingIO, Drawa if (_historyBufferY.empty()) { return status; } - gr::dataset::draw(_historyBufferY[0], { .reset_view = config.contains("reset_view") ? graphs::ResetChartView::RESET : graphs::ResetChartView::KEEP }); + gr::dataset::draw(_historyBufferY[0], {.reset_view = config.contains("reset_view") ? graphs::ResetChartView::RESET : graphs::ResetChartView::KEEP}); } return status; diff --git a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp index 93f3b277..b3834b24 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp @@ -254,7 +254,7 @@ struct TagMonitor : public Block> { constexpr T processOne(const T& input) noexcept requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) { - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, _nSamplesProduced)); @@ -274,7 +274,7 @@ struct TagMonitor : public Block> { [[nodiscard]] constexpr V processOne(const V& input) noexcept // to note: the SIMD-version does not support adding tags mid-way since this is chunked at V::size() requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE_SIMD) { - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, _nSamplesProduced)); @@ -303,7 +303,7 @@ struct TagMonitor : public Block> { constexpr work::Status processBulk(std::span input, std::span output) noexcept requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) { - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processBulk(...{}, ...{})\t received tag at {:6}", this->name, input.size(), output.size(), _nSamplesProduced)); @@ -361,7 +361,7 @@ struct TagSink : public Block> { constexpr void processOne(const T& input) // N.B. non-SIMD since we need a sample-by-sample accurate tag detection requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE) { - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processOne(...1) \t received tag at {:6}", this->name, _nSamplesProduced)); @@ -383,7 +383,7 @@ struct TagSink : public Block> { constexpr work::Status processBulk(std::span input) requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK) { - if (this->input_tags_present()) { + if (this->inputTagsPresent()) { const Tag& tag = this->mergedInputTag(); if (verbose_console) { print_tag(tag, fmt::format("{}::processBulk(...{})\t received tag at {:6}", this->name, input.size(), _nSamplesProduced)); diff --git a/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp b/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp index 73e13047..7b1bbc8d 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp @@ -21,21 +21,16 @@ struct source : public gr::Block> { gr::Size_t n_samples_max; - friend constexpr std::size_t - available_samples(const source &self) noexcept { - return self.n_samples_max - n_samples_produced; - } + friend constexpr std::size_t available_samples(const source& self) noexcept { return self.n_samples_max - n_samples_produced; } - [[nodiscard]] constexpr auto - processOne_simd(auto N) const noexcept -> gr::meta::simdize { + [[nodiscard]] constexpr auto processOne_simd(auto N) const noexcept -> gr::meta::simdize { n_samples_produced += N; gr::meta::simdize x{}; benchmark::force_to_memory(x); return x; } - [[nodiscard]] constexpr T - processOne() const noexcept { + [[nodiscard]] constexpr T processOne() const noexcept { n_samples_produced++; T x{}; benchmark::force_to_memory(x); @@ -52,11 +47,10 @@ struct sink : public gr::Block> { int64_t _last_tag_position = -1; template V> - [[nodiscard]] constexpr auto - processOne(V a) noexcept { + [[nodiscard]] constexpr auto processOne(V a) noexcept { // optional user-level tag processing - if (this->input_tags_present()) { - if (this->input_tags_present() && this->mergedInputTag().map.contains("N_SAMPLES_MAX")) { + if (this->inputTagsPresent()) { + if (this->inputTagsPresent() && this->mergedInputTag().map.contains("N_SAMPLES_MAX")) { const auto value = this->mergedInputTag().map.at("N_SAMPLES_MAX"); if (std::holds_alternative(value)) { // should be std::size_t but emscripten/pmtv seem to have issues with it should_receive_n_samples = std::get(value); @@ -75,8 +69,7 @@ struct sink : public gr::Block> { }; template -constexpr auto -cascade(aggregate &&src, std::function generator = [] { return base(); }) { +constexpr auto cascade(aggregate&& src, std::function generator = [] { return base(); }) { if constexpr (N <= 1) { return src; } else { diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 8a0fd931..1092ed52 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -621,7 +621,7 @@ class Block : public lifecycle::StateMachine, public std::tuple, public std::tuple(&self())); } diff --git a/core/include/gnuradio-4.0/BlockTraits.hpp b/core/include/gnuradio-4.0/BlockTraits.hpp index 9cf9cbfb..dd332994 100644 --- a/core/include/gnuradio-4.0/BlockTraits.hpp +++ b/core/include/gnuradio-4.0/BlockTraits.hpp @@ -345,6 +345,12 @@ struct DummyPublishableSpan { }; static_assert(PublishableSpan>); +template +struct DummyPublishablePortSpan: public DummyPublishableSpan { + void publishTag(property_map&, gr::Tag::signed_index_type) {} +}; +static_assert(PublishablePortSpan>); + // clang-format on struct to_any_vector { @@ -392,7 +398,7 @@ constexpr auto* port_to_processBulk_argument_helper() { if constexpr (isVectorOfSpansReturned) { return static_cast>*>(nullptr); } else { - return static_cast>*>(nullptr); + return static_cast>*>(nullptr); } } @@ -400,7 +406,7 @@ constexpr auto* port_to_processBulk_argument_helper() { if constexpr (Port::kIsInput) { return static_cast*>(nullptr); } else if constexpr (Port::kIsOutput) { - return static_cast*>(nullptr); + return static_cast*>(nullptr); } } } @@ -440,7 +446,7 @@ concept can_processBulk = can_processBulk_helper and *not* a type satisfying PublishableSpan. */ template -concept processBulk_requires_ith_output_as_span = can_processBulk && (I < traits::block::stream_output_port_types::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types>::template apply inputs, typename meta::transform_conditional>::template apply outputs, typename meta::transform_conditional>::template apply bad_outputs) { +concept processBulk_requires_ith_output_as_span = can_processBulk && (I < traits::block::stream_output_port_types::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types>::template apply inputs, typename meta::transform_conditional>::template apply outputs, typename meta::transform_conditional>::template apply bad_outputs) { { detail::can_processBulk_invoke_test(d, inputs, outputs, std::make_index_sequence::size>(), std::make_index_sequence::size>()) } -> std::same_as; // TODO: Is this check redundant? not requires { [](std::index_sequence, std::index_sequence) -> decltype(d.processBulk(std::get(inputs)..., std::get(bad_outputs)...)) { return {}; }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); }; diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 3c760748..916806b6 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -268,6 +268,11 @@ concept ConsumablePortSpan = ConsumableSpan && requires(T span) { { *span.tags.begin() } -> std::same_as; }; +template +concept PublishablePortSpan = PublishableSpan && requires(T span, property_map& tagData, Tag::signed_index_type index) { + { span.publishTag(tagData, index) } -> std::same_as; +}; + /** * @brief 'ports' are interfaces that allows data to flow between blocks in a graph, similar to RF connectors. * Each block can have zero or more input/output ports. When connecting ports, either a single-step or a two-step @@ -296,6 +301,7 @@ template using with_name_and_descriptor = Port; + using ThisPortFullType = Port; static_assert(portDirection != PortDirection::ANY, "ANY reserved for queries and not port direction declarations"); static_assert(portType != PortType::ANY, "ANY reserved for queries and not port type declarations"); @@ -365,13 +371,29 @@ struct Port { break; } } - std::ignore = tags.tryConsume(tagsToConsume); // this is a default fallback: tags may be both explicitely and implicitly consumed + std::ignore = tags.tryConsume(tagsToConsume); // this is a default fallback: tags may be both explicitly and implicitly consumed } } }; - static_assert(ConsumablePortSpan>); + template + using WriterSpanType = decltype(std::declval().template tryReserve(1UZ)); + + template + struct PublishablePortOutputRange : public WriterSpanType { + ThisPortFullType* _port; + // explicit PublishablePortOutputRange(WriterSpanType& span) : WriterSpanType(span) {}; + + template + explicit PublishablePortOutputRange(TSpan&& span, ThisPortFullType* port) : WriterSpanType(std::forward(span)), _port(port){}; + + ~PublishablePortOutputRange() = default; + + void publishTag(property_map& tagData, Tag::signed_index_type index) { _port->publishTag(tagData, index); } + }; + static_assert(PublishablePortSpan>); + std::span tags; // Range of tags for the currently processed stream range; only used in input ports Tag::signed_index_type streamIndex{}; // Absolute offset of the first sample in the currently processed stream span; only used in input ports @@ -591,18 +613,20 @@ struct Port { return taggedStream; } - template - auto reserve(std::size_t nSamples) + template + PublishablePortOutputRange reserve(std::size_t nSamples) requires(kIsOutput) { - return streamWriter().template reserve(nSamples); + auto result = PublishablePortOutputRange(streamWriter().template reserve(nSamples), this); + return result; } - template - auto tryReserve(std::size_t nSamples) + template + PublishablePortOutputRange tryReserve(std::size_t nSamples) requires(kIsOutput) { - return streamWriter().template tryReserve(nSamples); + auto result = PublishablePortOutputRange(streamWriter().template tryReserve(nSamples), this); + return result; } /** diff --git a/core/test/qa_Block.cpp b/core/test/qa_Block.cpp index c2e0e180..76c4242b 100644 --- a/core/test/qa_Block.cpp +++ b/core/test/qa_Block.cpp @@ -114,7 +114,7 @@ static_assert(gr::HasProcessOneFunction>); static_assert(!gr::HasProcessBulkFunction>); -enum class ProcessBulkVariant { SPAN_SPAN, SPAN_PUBLISHABLE, SPAN_PUBLISHABLE2, CONSUMABLE_SPAN, CONSUMABLE_SPAN2, CONSUMABLE_PUBLISHABLE, CONSUMABLE_PUBLISHABLE2 }; +enum class ProcessBulkVariant { SPAN_SPAN, SPAN_PUBLISHABLE, SPAN_PUBLISHABLE2, CONSUMABLE_SPAN, CONSUMABLE_SPAN2, CONSUMABLE_PUBLISHABLE, CONSUMABLE_PUBLISHABLE2, CONSUMABLE_PUBLISHABLE_PORT, CONSUMABLE_PUBLISHABLE_PORT2 }; template struct BlockSignaturesProcessBulkSpan : public gr::Block> { @@ -124,49 +124,54 @@ struct BlockSignaturesProcessBulkSpan : public gr::Block, std::span) requires(processVariant == ProcessBulkVariant::SPAN_SPAN) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(std::span, gr::PublishableSpan auto&) requires(processVariant == ProcessBulkVariant::SPAN_PUBLISHABLE) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(std::span, gr::PublishableSpan auto) requires(processVariant == ProcessBulkVariant::SPAN_PUBLISHABLE2) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(gr::ConsumableSpan auto, std::span) requires(processVariant == ProcessBulkVariant::CONSUMABLE_SPAN) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(gr::ConsumableSpan auto&, std::span) requires(processVariant == ProcessBulkVariant::CONSUMABLE_SPAN2) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(gr::ConsumableSpan auto&, gr::PublishableSpan auto&) requires(processVariant == ProcessBulkVariant::CONSUMABLE_PUBLISHABLE) { - // do some bulk-type processing return gr::work::Status::OK; } gr::work::Status processBulk(gr::ConsumableSpan auto, gr::PublishableSpan auto) requires(processVariant == ProcessBulkVariant::CONSUMABLE_PUBLISHABLE2) { - // do some bulk-type processing + return gr::work::Status::OK; + } + + gr::work::Status processBulk(gr::ConsumablePortSpan auto&, gr::PublishablePortSpan auto&) + requires(processVariant == ProcessBulkVariant::CONSUMABLE_PUBLISHABLE_PORT) + { + return gr::work::Status::OK; + } + + gr::work::Status processBulk(gr::ConsumablePortSpan auto, gr::PublishablePortSpan auto) + requires(processVariant == ProcessBulkVariant::CONSUMABLE_PUBLISHABLE_PORT2) + { return gr::work::Status::OK; } }; @@ -183,6 +188,8 @@ static_assert(gr::HasProcessBulkFunction>); static_assert(gr::HasProcessBulkFunction>); static_assert(!gr::HasProcessBulkFunction>); // TODO: fix the signature is required to work +static_assert(gr::HasProcessBulkFunction>); +static_assert(!gr::HasProcessBulkFunction>); // TODO: fix the signature is required to work static_assert(gr::traits::block::processBulk_requires_ith_output_as_span, 0>); static_assert(!gr::traits::block::processBulk_requires_ith_output_as_span, 0>); diff --git a/core/test/qa_Tags.cpp b/core/test/qa_Tags.cpp index 8c913e4a..6817c5b0 100644 --- a/core/test/qa_Tags.cpp +++ b/core/test/qa_Tags.cpp @@ -35,13 +35,13 @@ or next chunk, whichever is closer. Also adds an "offset" key to the tag map sig double sampling_rate = 1.0; constexpr static gr::TagPropagationPolicy tag_policy = gr::TagPropagationPolicy::TPP_DONT; - gr::work::Status processBulk(const gr::ConsumablePortSpan auto inSamples, gr::PublishableSpan auto& outSamples) { + gr::work::Status processBulk(const gr::ConsumablePortSpan auto inSamples, gr::PublishablePortSpan auto& outSamples) { std::copy(inSamples.begin(), inSamples.end(), outSamples.begin()); std::size_t tagsForwarded = 0; for (gr::Tag tag : inSamples.tags) { if (tag.index < (inPort.streamIndex + (static_cast(inSamples.size()) + 1) / 2)) { tag.insert_or_assign("offset", sampling_rate * static_cast(tag.index - inPort.streamIndex)); - outPort.publishTag(tag.map, 0); + outSamples.publishTag(tag.map, 0); tagsForwarded++; } else { break;