Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishablePortSpan #415

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

[[nodiscard]] work::Status processBulk(std::span<const T> inData) noexcept {
std::optional<property_map> tagData;
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
assert(this->mergedInputTag().index == 0);
tagData = this->mergedInputTag().map;
}
Expand Down
2 changes: 1 addition & 1 deletion blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ This block supports multiple output ports and was tested against the 'rtlsdr' an
if (newSettings.contains("rx_antennae")) {
setAntennae();
}
if (newSettings.contains("rx_center_frequency ") || newSettings.contains("sample_rate")) {
if (newSettings.contains("rx_center_frequency") || newSettings.contains("sample_rate")) {
setCenterFrequency();
}
if (newSettings.contains("rx_gains")) {
Expand Down
40 changes: 16 additions & 24 deletions blocks/testing/include/gnuradio-4.0/testing/ImChartMonitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,44 @@
#include "gnuradio-4.0/BlockRegistry.hpp"
#include <algorithm>

#include <gnuradio-4.0/algorithm/dataset/DataSetUtils.hpp>
#include <gnuradio-4.0/algorithm/ImChart.hpp>
#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/HistoryBuffer.hpp>
#include <gnuradio-4.0/algorithm/ImChart.hpp>
#include <gnuradio-4.0/algorithm/dataset/DataSetUtils.hpp>

#include <fmt/format.h>
#include <fmt/ranges.h>

namespace gr::testing {

template<typename T>
requires(std::is_arithmetic_v<T> || gr::DataSetLike<T>)
requires(std::is_arithmetic_v<T> || gr::DataSetLike<T>)
struct ImChartMonitor : public Block<ImChartMonitor<T>, BlockingIO<false>, Drawable<UICategory::ChartPane, "console">> {
using ClockSourceType = std::chrono::system_clock;
PortIn<T> in;
float sample_rate = 1000.0f;
std::string signal_name = "unknown signal";

HistoryBuffer<T> _historyBufferX{ 1000 };
HistoryBuffer<T> _historyBufferY{ 1000 };
HistoryBuffer<Tag> _historyBufferTags{ 1000 };
HistoryBuffer<T> _historyBufferX{1000};
HistoryBuffer<T> _historyBufferY{1000};
HistoryBuffer<Tag> _historyBufferTags{1000};

void
start() {
void start() {
fmt::println("started sink {} aka. '{}'", this->unique_name, this->name);
in.max_samples = 10UZ;
}

void
stop() {
fmt::println("stopped sink {} aka. '{}'", this->unique_name, this->name);
}
void stop() { fmt::println("stopped sink {} aka. '{}'", this->unique_name, this->name); }

constexpr void
processOne(const T &input) noexcept {
constexpr void processOne(const T& input) noexcept {
if constexpr (std::is_arithmetic_v<T>) {
in.max_samples = static_cast<std::size_t>(2.f * sample_rate / 25.f);
const T Ts = T(1.0f) / T(sample_rate);
_historyBufferX.push_back(_historyBufferX[1] + static_cast<T>(Ts));
}
_historyBufferY.push_back(input);

if (this->input_tags_present()) { // received tag
if (this->inputTagsPresent()) { // received tag
_historyBufferTags.push_back(this->mergedInputTag());
_historyBufferTags[1].index = 0;
this->_mergedInputTag.map.clear(); // TODO: provide proper API for clearing tags
Expand All @@ -55,8 +50,7 @@ struct ImChartMonitor : public Block<ImChartMonitor<T>, BlockingIO<false>, Drawa
}
}

work::Status
draw(const property_map &config = {}) noexcept {
work::Status draw(const property_map& config = {}) noexcept {
[[maybe_unused]] const work::Status status = this->invokeWork(); // calls work(...) -> processOne(...) (all in the same thread as this 'draw()'

if constexpr (std::is_arithmetic_v<T>) {
Expand All @@ -75,29 +69,27 @@ struct ImChartMonitor : public Block<ImChartMonitor<T>, BlockingIO<false>, Drawa
std::vector<T> reversedY(_historyBufferY.rbegin(), _historyBufferY.rend());
std::vector<T> reversedTag(_historyBufferX.size());
if constexpr (std::is_floating_point_v<T>) {
std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(),
[](const Tag &tag, const T &yValue) { return tag.index < 0 ? std::numeric_limits<T>::quiet_NaN() : yValue; });
std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), [](const Tag& tag, const T& yValue) { return tag.index < 0 ? std::numeric_limits<T>::quiet_NaN() : yValue; });
} else {
std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(),
[](const Tag &tag, const T &yValue) { return tag.index < 0 ? std::numeric_limits<T>::lowest() : yValue; });
std::transform(_historyBufferTags.rbegin(), _historyBufferTags.rend(), _historyBufferY.rbegin(), reversedTag.begin(), [](const Tag& tag, const T& yValue) { return tag.index < 0 ? std::numeric_limits<T>::lowest() : yValue; });
}

auto adjustRange = [](T min, T max) {
min = std::min(min, T(0));
max = std::max(max, T(0));
const T margin = (max - min) * static_cast<T>(0.2);
return std::pair<double, double>{ min - margin, max + margin };
return std::pair<double, double>{min - margin, max + margin};
};

auto chart = gr::graphs::ImChart<130, 28>({ { *xMin, *xMax }, adjustRange(*yMin, *yMax) });
auto chart = gr::graphs::ImChart<130, 28>({{*xMin, *xMax}, adjustRange(*yMin, *yMax)});
chart.draw(reversedX, reversedY, signal_name);
chart.draw<gr::graphs::Style::Marker>(reversedX, reversedTag, "Tags");
chart.draw();
} else if constexpr (gr::DataSetLike<T>) {
if (_historyBufferY.empty()) {
return status;
}
gr::dataset::draw(_historyBufferY[0], { .reset_view = config.contains("reset_view") ? graphs::ResetChartView::RESET : graphs::ResetChartView::KEEP });
gr::dataset::draw(_historyBufferY[0], {.reset_view = config.contains("reset_view") ? graphs::ResetChartView::RESET : graphs::ResetChartView::KEEP});
}

return status;
Expand Down
10 changes: 5 additions & 5 deletions blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct TagMonitor : public Block<TagMonitor<T, UseProcessVariant>> {
constexpr T processOne(const T& input) noexcept
requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE)
{
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
const Tag& tag = this->mergedInputTag();
if (verbose_console) {
print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, _nSamplesProduced));
Expand All @@ -274,7 +274,7 @@ struct TagMonitor : public Block<TagMonitor<T, UseProcessVariant>> {
[[nodiscard]] constexpr V processOne(const V& input) noexcept // to note: the SIMD-version does not support adding tags mid-way since this is chunked at V::size()
requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE_SIMD)
{
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
const Tag& tag = this->mergedInputTag();
if (verbose_console) {
print_tag(tag, fmt::format("{}::processOne(...)\t received tag at {:6}", this->name, _nSamplesProduced));
Expand Down Expand Up @@ -303,7 +303,7 @@ struct TagMonitor : public Block<TagMonitor<T, UseProcessVariant>> {
constexpr work::Status processBulk(std::span<const T> input, std::span<T> output) noexcept
requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK)
{
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
const Tag& tag = this->mergedInputTag();
if (verbose_console) {
print_tag(tag, fmt::format("{}::processBulk(...{}, ...{})\t received tag at {:6}", this->name, input.size(), output.size(), _nSamplesProduced));
Expand Down Expand Up @@ -361,7 +361,7 @@ struct TagSink : public Block<TagSink<T, UseProcessVariant>> {
constexpr void processOne(const T& input) // N.B. non-SIMD since we need a sample-by-sample accurate tag detection
requires(UseProcessVariant == ProcessFunction::USE_PROCESS_ONE)
{
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
const Tag& tag = this->mergedInputTag();
if (verbose_console) {
print_tag(tag, fmt::format("{}::processOne(...1) \t received tag at {:6}", this->name, _nSamplesProduced));
Expand All @@ -383,7 +383,7 @@ struct TagSink : public Block<TagSink<T, UseProcessVariant>> {
constexpr work::Status processBulk(std::span<const T> input)
requires(UseProcessVariant == ProcessFunction::USE_PROCESS_BULK)
{
if (this->input_tags_present()) {
if (this->inputTagsPresent()) {
const Tag& tag = this->mergedInputTag();
if (verbose_console) {
print_tag(tag, fmt::format("{}::processBulk(...{})\t received tag at {:6}", this->name, input.size(), _nSamplesProduced));
Expand Down
21 changes: 7 additions & 14 deletions blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,16 @@ struct source : public gr::Block<source<T, min, count>> {

gr::Size_t n_samples_max;

friend constexpr std::size_t
available_samples(const source &self) noexcept {
return self.n_samples_max - n_samples_produced;
}
friend constexpr std::size_t available_samples(const source& self) noexcept { return self.n_samples_max - n_samples_produced; }

[[nodiscard]] constexpr auto
processOne_simd(auto N) const noexcept -> gr::meta::simdize<T, decltype(N)::value> {
[[nodiscard]] constexpr auto processOne_simd(auto N) const noexcept -> gr::meta::simdize<T, decltype(N)::value> {
n_samples_produced += N;
gr::meta::simdize<T, N> x{};
benchmark::force_to_memory(x);
return x;
}

[[nodiscard]] constexpr T
processOne() const noexcept {
[[nodiscard]] constexpr T processOne() const noexcept {
n_samples_produced++;
T x{};
benchmark::force_to_memory(x);
Expand All @@ -52,11 +47,10 @@ struct sink : public gr::Block<sink<T, N_MIN, N_MAX>> {
int64_t _last_tag_position = -1;

template<gr::meta::t_or_simd<T> V>
[[nodiscard]] constexpr auto
processOne(V a) noexcept {
[[nodiscard]] constexpr auto processOne(V a) noexcept {
// optional user-level tag processing
if (this->input_tags_present()) {
if (this->input_tags_present() && this->mergedInputTag().map.contains("N_SAMPLES_MAX")) {
if (this->inputTagsPresent()) {
if (this->inputTagsPresent() && this->mergedInputTag().map.contains("N_SAMPLES_MAX")) {
const auto value = this->mergedInputTag().map.at("N_SAMPLES_MAX");
if (std::holds_alternative<uint64_t>(value)) { // should be std::size_t but emscripten/pmtv seem to have issues with it
should_receive_n_samples = std::get<uint64_t>(value);
Expand All @@ -75,8 +69,7 @@ struct sink : public gr::Block<sink<T, N_MIN, N_MAX>> {
};

template<std::size_t N, typename base, typename aggregate>
constexpr auto
cascade(aggregate &&src, std::function<base()> generator = [] { return base(); }) {
constexpr auto cascade(aggregate&& src, std::function<base()> generator = [] { return base(); }) {
if constexpr (N <= 1) {
return src;
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen

[[nodiscard]] constexpr bool isBlocking() const noexcept { return blockingIO; }

[[nodiscard]] constexpr bool input_tags_present() const noexcept { return !_mergedInputTag.map.empty(); };
[[nodiscard]] constexpr bool inputTagsPresent() const noexcept { return !_mergedInputTag.map.empty(); };

[[nodiscard]] Tag mergedInputTag() const noexcept { return _mergedInputTag; }

Expand Down Expand Up @@ -783,7 +783,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
}

constexpr void forwardTags() noexcept {
if (input_tags_present()) {
if (inputTagsPresent()) {
if constexpr (Derived::tag_policy == TagPropagationPolicy::TPP_ALL_TO_ALL) {
for_each_port([this](PortLike auto& outPort) noexcept { outPort.publishTag(mergedInputTag().map, 0); }, outputPorts<PortType::STREAM>(&self()));
}
Expand Down
12 changes: 9 additions & 3 deletions core/include/gnuradio-4.0/BlockTraits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ struct DummyPublishableSpan {
};
static_assert(PublishableSpan<DummyPublishableSpan<int>>);

template<typename T>
struct DummyPublishablePortSpan: public DummyPublishableSpan<T> {
void publishTag(property_map&, gr::Tag::signed_index_type) {}
};
static_assert(PublishablePortSpan<DummyPublishablePortSpan<int>>);

// clang-format on

struct to_any_vector {
Expand Down Expand Up @@ -392,15 +398,15 @@ constexpr auto* port_to_processBulk_argument_helper() {
if constexpr (isVectorOfSpansReturned) {
return static_cast<std::span<std::span<typename Port::value_type::value_type>>*>(nullptr);
} else {
return static_cast<std::span<DummyPublishableSpan<typename Port::value_type::value_type>>*>(nullptr);
return static_cast<std::span<DummyPublishablePortSpan<typename Port::value_type::value_type>>*>(nullptr);
}
}

} else { // single port
if constexpr (Port::kIsInput) {
return static_cast<DummyConsumablePortSpan<typename Port::value_type>*>(nullptr);
} else if constexpr (Port::kIsOutput) {
return static_cast<DummyPublishableSpan<typename Port::value_type>*>(nullptr);
return static_cast<DummyPublishablePortSpan<typename Port::value_type>*>(nullptr);
}
}
}
Expand Down Expand Up @@ -440,7 +446,7 @@ concept can_processBulk = can_processBulk_helper<TBlock, detail::port_to_process
* must be std::span<T> and *not* a type satisfying PublishableSpan<T>.
*/
template<typename TDerived, std::size_t I>
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyConsumablePortSpan, traits::block::stream_input_port_types<TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishableSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishableSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> bad_outputs) {
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyConsumablePortSpan, traits::block::stream_input_port_types<TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> bad_outputs) {
{ detail::can_processBulk_invoke_test(d, inputs, outputs, std::make_index_sequence<stream_input_port_types<TDerived>::size>(), std::make_index_sequence<stream_output_port_types<TDerived>::size>()) } -> std::same_as<work::Status>;
// TODO: Is this check redundant?
not requires { []<std::size_t... InIdx, std::size_t... OutIdx>(std::index_sequence<InIdx...>, std::index_sequence<OutIdx...>) -> decltype(d.processBulk(std::get<InIdx>(inputs)..., std::get<OutIdx>(bad_outputs)...)) { return {}; }(std::make_index_sequence<traits::block::stream_input_port_types<TDerived>::size>(), std::make_index_sequence<traits::block::stream_output_port_types<TDerived>::size>()); };
Expand Down
Loading
Loading