Skip to content

Commit

Permalink
new block: Soapy Source
Browse files Browse the repository at this point in the history
* implements a SoapySDR::Device-like via C-API wrapper as a workaround to ensure ABI compatibility for GR4 being compiled with libc++ and the Soapy wrapper with GCC and vice-versa.
* added corresponding unit-test and soapy-lime-based example for clarity.

N.B. tested for drivers: audio, rtlsdr, lime

Signed-off-by: Ralph J. Steinhagen <[email protected]>
  • Loading branch information
RalphSteinhagen committed Jun 14, 2024
1 parent dbec3d4 commit 3d5b3a8
Show file tree
Hide file tree
Showing 9 changed files with 1,620 additions and 4 deletions.
9 changes: 5 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ if (EMSCRIPTEN)
set(ENABLE_BLOCK_PLUGINS OFF)
endif ()

if (ENABLE_BLOCK_PLUGINS)
set(ENABLE_BLOCK_REGISTRY ON)
endif ()

message(STATUS "Is block registry enabled? (faster compile-times and when runtime or Python wrapping APIs are not required) ${ENABLE_BLOCK_REGISTRY}")
message(STATUS "Is plugin system enabled? ${ENABLE_BLOCK_PLUGINS}")

Expand Down Expand Up @@ -225,6 +221,11 @@ FetchContent_Declare(

FetchContent_MakeAvailable(fmt pmt ut yaml-cpp vir-simd cpp-httplib)

# Fetch SoapySDR -- needed since the distribution version is incompatible w.r.t. stdlibc++ vs. libc++
if (CMAKE_CXX_COMPILER_ID MATCHES "(GNU|Clang)") # WIP
find_package(SoapySDR CONFIG)
endif ()

add_library(pmtv INTERFACE)
target_include_directories(pmtv INTERFACE ${pmt_SOURCE_DIR}/include/)

Expand Down
1 change: 1 addition & 0 deletions blocks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ add_subdirectory(filter)
add_subdirectory(fourier)
add_subdirectory(http)
add_subdirectory(math)
add_subdirectory(soapy)
add_subdirectory(testing)
13 changes: 13 additions & 0 deletions blocks/soapy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Check for optional SoapySDR local dependency https://github.com/pothosware/SoapySDR
if (TARGET SoapySDR)
add_library(gr-soapy INTERFACE)
target_include_directories(gr-soapy INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include/> $<INSTALL_INTERFACE:include/>)
target_link_libraries(gr-soapy INTERFACE gnuradio-core SoapySDR)

add_subdirectory(src)
if (ENABLE_TESTING)
add_subdirectory(test)
endif ()
else ()
message(WARNING "SoapySDR development files not found: ${SoapySDR_FOUND} - skipping SoapySDR support")
endif ()
282 changes: 282 additions & 0 deletions blocks/soapy/include/gnuradio-4.0/soapy/Soapy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
#ifndef SOAPY_HPP
#define SOAPY_HPP

#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/BlockRegistry.hpp>

#include <SoapySDR/Formats.h>

#include <SoapySDR/Device.hpp> // needed for existing C++ return types that are ABI stable (i.e. header-only defined)

#include <gnuradio-4.0/Profiler.hpp>
#include <map>
#include <string>
#include <vector>

#include "SoapyRaiiWrapper.hpp" // using SoapySDR's C-API as intermediate interface to mitigate ABI-issues between stdlibc++ and libc++

namespace gr::blocks::soapy {

template<typename T, std::size_t nPorts = std::dynamic_extent>
struct SoapyBlock : public Block<SoapyBlock<T, nPorts> /*, BlockingIO<false>*/> {
using Description = Doc<R""(A Soapy source block that interfaces with SDR hardware using the SoapySDR library.
This block supports multiple output ports and was tested against the 'rtlsdr' and 'lime' device driver.
)"">;

template<typename U, gr::meta::fixed_string description = "", typename... Arguments>
using A = Annotated<U, description, Arguments...>; // optional shortening

using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
using TSizeChecker = Limits<0U, std::numeric_limits<std::uint32_t>::max(), std::has_single_bit<std::uint32_t>>;
using TBasePort = PortOut<T>;
using TPortType = std::conditional_t<nPorts == 1U, TBasePort, std::conditional_t<nPorts == std::dynamic_extent, std::vector<TBasePort>, std::array<TBasePort, nPorts>>>;

TPortType out;

A<std::string, "device driver name", Visible> device;
A<std::string, "add. device parameter", Visible> device_parameter;
A<float, "sample rate", Unit<"samples/s">, Doc<"sampling rate in samples per second (Hz)">, Visible> sample_rate = 1'000'000.f;
A<std::vector<gr::Size_t>, "RX channel ID mapping vector", Visible> rx_channels = initDefaultValues<true>(gr::Size_t(0U));
A<std::vector<std::string>, "RX channel antenna mapping", Visible> rx_antennae;
A<std::vector<double>, "RX center frequency", Unit<"Hz">, Doc<"RX-RF center frequency">, Visible> rx_center_frequency = initDefaultValues(107'000'000.);
A<std::vector<double>, "RX bandwidth", Unit<"Hz">, Doc<"RX-RF bandwidth">, Visible> rx_bandwdith = initDefaultValues(double(sample_rate / 2));
A<std::vector<double>, "Rx gain", Unit<"dB">, Doc<"RX channel gain">, Visible> rx_gains = initDefaultValues(10.);

// low-level ABI
A<std::uint32_t, "max polling chunk size", Doc<"ideally N x 512">, Visible, TSizeChecker> max_chunck_size = 512U << 4U;
A<std::uint32_t, "polling time out", Unit<"us">, Doc<"soapy polling time-out">> max_time_out_us = 1'000;
A<gr::Size_t, "max overflow count", Doc<"0: disable">> max_overflow_count = 10U;
A<gr::Size_t, "max fragment count", Doc<"0: disable">> max_fragment_count = 100U;

Device _device{};
Device::Stream<T, SOAPY_SDR_RX> _rxStream{};
gr::Size_t _fragmentCount = 0U;
gr::Size_t _overFlowCount = 0U;

void settingsChanged(const property_map& oldSettings, const property_map& newSettings) {
bool needReinit = false;
if ((newSettings.contains("device") && (oldSettings.at("device") != newSettings.at("device"))) //
|| (newSettings.contains("rx_channels") && (oldSettings.at("rx_channels") != newSettings.at("rx_channels"))) //
|| (newSettings.contains("sample_rate"))) {
needReinit = true;
}

if (newSettings.contains("rx_antennae")) {
setAntennae();
}
if (newSettings.contains("rx_center_frequency ") || newSettings.contains("sample_rate")) {
setCenterFrequency();
}
if (newSettings.contains("rx_gains")) {
setGains();
}

if (needReinit && lifecycle::isActive(this->state())) {
// only force re-init if running and/or paused
reinitDevice();
}
}

void start() { reinitDevice(); }

void pause() { _rxStream.deactivate(); }

void resume() { _rxStream.activate(); }

void stop() { _device.reset(); }

constexpr work::Status processBulk(PublishableSpan auto& output)
requires(nPorts == 1U)
{
// special case single ouput -> simplifies connect API because this doesn't require sub-indices
auto maxSamples = static_cast<std::uint32_t>(output.size()); // max available samples
maxSamples = std::min(maxSamples, max_chunck_size.value);

int flags = 0;
long long time_ns = 0; // driver specifc

// non-blocking/blocking depending on the value of max_time_out_us (0...)
int ret = _rxStream.readStream(flags, time_ns, max_time_out_us, std::span<T>(output).subspan(0, maxSamples));
// for detailed debugging: detail::printSoapyReturnDebugInfo(ret, flags, time_ns);

auto status = handleDeviceStreamingErrors(ret, flags);
if (ret >= 0 && status == work::Status::OK) {
output.publish(static_cast<std::size_t>(ret));
return work::Status::OK;
}

// no data or some failure occured
output.publish(0UZ);
return status;
}

template<PublishableSpan TOutputBuffer>
constexpr work::Status processBulk(std::span<TOutputBuffer>& outputs)
requires(nPorts > 1U)
{
// general case multiple ouputs
auto maxSamples = static_cast<std::uint32_t>(outputs[0].size()); // max available samples
maxSamples = std::min(maxSamples, max_chunck_size.value);

int flags = 0;
int ret = SOAPY_SDR_TIMEOUT;
long long time_ns = 0; // driver specifc
if constexpr (nPorts == 2UZ) {
ret = _rxStream.readStream(flags, time_ns, max_time_out_us, //
std::span<T>(outputs[0]).subspan(0, maxSamples), //
std::span<T>(outputs[1]).subspan(0, maxSamples));
} else if constexpr (nPorts == 3UZ) {
ret = _rxStream.readStream(flags, time_ns, max_time_out_us, //
std::span<T>(outputs[0]).subspan(0, maxSamples), //
std::span<T>(outputs[1]).subspan(0, maxSamples), //
std::span<T>(outputs[2]).subspan(0, maxSamples));
} else if constexpr (nPorts == 4UZ) {
ret = _rxStream.readStream(flags, time_ns, max_time_out_us, //
std::span<T>(outputs[0]).subspan(0, maxSamples), //
std::span<T>(outputs[1]).subspan(0, maxSamples), //
std::span<T>(outputs[2]).subspan(0, maxSamples), //
std::span<T>(outputs[3]).subspan(0, maxSamples));
} else {
// fully dynamic case
std::vector<std::span<T>> output(rx_channels->size());
for (std::size_t i = 0UZ; i < rx_channels->size(); ++i) {
output[i] = std::span<T>(outputs[i]).subspan(0, maxSamples);
}
ret = _rxStream.readStreamIntoBufferList(flags, time_ns, max_time_out_us, output);
}
// for detailed debugging: detail::printSoapyReturnDebugInfo(ret, flags, time_ns);

auto status = handleDeviceStreamingErrors(ret, flags);
if (ret >= 0 && status == work::Status::OK) {
std::ranges::for_each(outputs, [ret](auto& output) { output.publish(static_cast<std::size_t>(ret)); });
return work::Status::OK;
}

// no data or some failure occured
std::ranges::for_each(outputs, [](auto& output) { output.publish(0UZ); });
return status;
}

void reinitDevice() {
_rxStream.reset();
_device = Device(SoapySDR::Kwargs{{"driver", device.value}});

std::size_t nChannelMax = _device.getNumChannels(SOAPY_SDR_RX);
if (nChannelMax < rx_channels->size() || (nPorts != std::dynamic_extent && nChannelMax != rx_channels->size())) {
throw gr::exception(fmt::format("device {} max channel mismatch: specified: {} vs max. {}", device.value, rx_channels->size(), nChannelMax));
}

setSampleRate();
setAntennae();
setCenterFrequency();
setGains();
_rxStream = _device.setupStream<T, SOAPY_SDR_RX>(rx_channels.value);
_rxStream.activate();
}

void setSampleRate() {
std::size_t nChannels = rx_channels->size();
for (std::size_t i = 0UZ; i < nChannels; i++) {
_device.setSampleRate(SOAPY_SDR_RX, rx_channels->at(i), static_cast<double>(sample_rate));
}
}

void setAntennae() {
if (rx_antennae->empty()) {
return;
}
std::size_t nChannels = rx_channels->size();
std::size_t nAntaennae = rx_antennae->size();
for (std::size_t i = 0UZ; i < nChannels; i++) {
std::string antenna = rx_antennae->at(std::min(i, nAntaennae - 1UZ));
if (!antenna.empty()) {
_device.setAntenna(SOAPY_SDR_RX, rx_channels->at(i), antenna);
}
}
}

void setCenterFrequency() {
std::size_t nChannels = rx_channels->size();
std::size_t nFrequency = rx_center_frequency->size();
for (std::size_t i = 0UZ; i < nChannels; i++) {
_device.setCenterFrequency(SOAPY_SDR_RX, rx_channels->at(i), rx_center_frequency->at(std::min(i, nFrequency - 1UZ)));
}
}

void setBandwidth() {
std::size_t nChannels = rx_channels->size();
std::size_t nFrequency = rx_bandwdith->size();
for (std::size_t i = 0UZ; i < nChannels; i++) {
_device.setBandwidth(SOAPY_SDR_RX, rx_channels->at(i), rx_bandwdith->at(std::min(i, nFrequency - 1UZ)));
}
}

void setGains() {
std::size_t nChannels = rx_channels->size();
std::size_t nGains = rx_gains->size();
for (std::size_t i = 0UZ; i < nChannels; i++) {
_device.setGain(SOAPY_SDR_RX, rx_channels->at(i), rx_gains->at(std::min(i, nGains - 1UZ)));
}
}
work::Status handleDeviceStreamingErrors(int ret, int flags) {
if (ret >= 0) {
if (max_fragment_count > 0 && flags & SOAPY_SDR_MORE_FRAGMENTS) {
_fragmentCount++;
} else {
_fragmentCount = 0U;
}
_overFlowCount = 0U;
if (max_fragment_count > 0 && _fragmentCount > max_fragment_count) {
throw gr::exception(fmt::format("SOAPY_SDR_MORE_FRAGMENTS for Block {} Device {}: reached {} of requested max {} -> DAQ read-out/Scheduler/graph is too slow", this->name, device, _fragmentCount, max_fragment_count));
}
} else {
switch (ret) {
case SOAPY_SDR_TIMEOUT: return work::Status::OK;
case SOAPY_SDR_OVERFLOW: {
_overFlowCount++;
if (max_overflow_count > 0 && _overFlowCount > max_overflow_count) {
throw gr::exception(fmt::format("SOAPY_SDR_OVERFLOW for Block {} Device {}: reached {} of requested max {}", this->name, device, _overFlowCount, max_overflow_count));
}
return work::Status::OK;
}
case SOAPY_SDR_CORRUPTION: {
throw gr::exception(fmt::format("SOAPY_SDR_CORRUPTION for Block {} Device {}", this->name, device));
return work::Status::ERROR;
}
default: throw gr::exception(fmt::format("unknown SoapySDR return type: {}", ret));
}
}
return work::Status::OK;
}

template<bool increment = false, typename U>
static std::vector<U> initDefaultValues(U initialValue) {
std::vector<U> values;
values.resize(nPorts);
if constexpr (increment) {
std::ranges::generate(values, [i = U(0), initialValue]() mutable { return initialValue + i++; });
} else {
std::fill(values.begin(), values.end(), initialValue);
}
return values;
}
};
template<typename T>
using SoapySimpleSource = SoapyBlock<T, 1UZ>;
template<typename T>
using SoapyDualSimpleSource = SoapyBlock<T, 2UZ>;

static_assert(std::is_constructible_v<SoapyBlock<std::complex<float>>, gr::property_map>, "SoapyBlock not default constructible w/ property_map");
static_assert(std::is_constructible_v<SoapySimpleSource<std::complex<float>>, gr::property_map>, "SoapyBlock not default constructible w/ property_map");

} // namespace gr::blocks::soapy

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, std::size_t nPorts), (gr::blocks::soapy::SoapyBlock<T, nPorts>), out, //
device, device_parameter, sample_rate, //
rx_channels, rx_antennae, rx_center_frequency, rx_bandwdith, rx_gains, //
max_chunck_size, max_time_out_us, max_overflow_count)

const inline auto registerSoapy = gr::registerBlock<gr::blocks::soapy::SoapySimpleSource, uint8_t, int16_t, std::complex<float>>(gr::globalBlockRegistry()) //
| gr::registerBlock<gr::blocks::soapy::SoapyDualSimpleSource, uint8_t, int16_t, std::complex<float>>(gr::globalBlockRegistry());

#endif // SOAPY_HPP
Loading

0 comments on commit 3d5b3a8

Please sign in to comment.