From bea174e745820dd8cffef6e8b6f74a1c617ab01c Mon Sep 17 00:00:00 2001 From: "Ralph J. Steinhagen" <46007894+RalphSteinhagen@users.noreply.github.com> Date: Mon, 3 Jun 2024 19:42:36 +0200 Subject: [PATCH] new block: BasicFile[Sink, Source] (#353) * new block: BasicFile[Sink, Source] * incorporates review comments by @daniestevez Signed-off-by: Ralph J. Steinhagen --- blocks/CMakeLists.txt | 1 + blocks/fileio/CMakeLists.txt | 7 + .../gnuradio-4.0/fileio/BasicFileIo.hpp | 307 ++++++++++++++++++ blocks/fileio/test/CMakeLists.txt | 1 + blocks/fileio/test/qa_FileIo.cpp | 151 +++++++++ core/benchmarks/CMakeLists.txt | 2 +- core/test/CMakeLists.txt | 2 +- 7 files changed, 469 insertions(+), 2 deletions(-) create mode 100644 blocks/fileio/CMakeLists.txt create mode 100644 blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp create mode 100644 blocks/fileio/test/CMakeLists.txt create mode 100644 blocks/fileio/test/qa_FileIo.cpp diff --git a/blocks/CMakeLists.txt b/blocks/CMakeLists.txt index d1005303..57f517a8 100644 --- a/blocks/CMakeLists.txt +++ b/blocks/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(basic) +add_subdirectory(fileio) add_subdirectory(filter) add_subdirectory(fourier) add_subdirectory(http) diff --git a/blocks/fileio/CMakeLists.txt b/blocks/fileio/CMakeLists.txt new file mode 100644 index 00000000..6fb41e2a --- /dev/null +++ b/blocks/fileio/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(gr-fileio INTERFACE) +target_link_libraries(gr-fileio INTERFACE gnuradio-core) +target_include_directories(gr-fileio INTERFACE $ $) + +if (ENABLE_TESTING) + add_subdirectory(test) +endif () diff --git a/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp b/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp new file mode 100644 index 00000000..b3421d4b --- /dev/null +++ b/blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp @@ -0,0 +1,307 @@ +#ifndef BASICFILEIO_HPP +#define BASICFILEIO_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace gr::blocks::fileio { + +namespace detail { +[[nodiscard]] inline std::string getIsoTime() noexcept { + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + return fmt::format("{:%Y-%m-%dT%H:%M:%S}.{:06}", // ms-precision ISO time-format + fmt::localtime(std::chrono::system_clock::to_time_t(now)), // + std::chrono::duration_cast(now.time_since_epoch()).count() % 1'000); +} + +inline void ensureDirectoryExists(const std::filesystem::path& filePath) { std::filesystem::create_directories(filePath.parent_path()); } + +inline std::vector getSortedFilesContaining(const std::string& fileName) { + std::filesystem::path filePath(fileName); + if (!std::filesystem::exists(filePath.parent_path())) { + throw gr::exception(fmt::format("path/file '{}' does not exist.", fileName)); + } + + std::vector matchingFiles; + std::copy_if(std::filesystem::directory_iterator(filePath.parent_path()), std::filesystem::directory_iterator{}, std::back_inserter(matchingFiles), [&](const auto& entry) { return entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos; }); + + std::sort(matchingFiles.begin(), matchingFiles.end()); + return matchingFiles; +} + +[[nodiscard]] inline std::uintmax_t getFileSize(const std::filesystem::path& filePath) { + if (!std::filesystem::exists(filePath)) { + throw std::runtime_error(fmt::format("file '{}' does not exist.", filePath.string())); + } + return std::filesystem::file_size(filePath); +} + +[[maybe_unused]] inline std::vector deleteFilesContaining(const std::string& fileName) { + std::filesystem::path filePath(fileName); + if (!std::filesystem::exists(filePath.parent_path())) { + return {}; + } + + std::vector deletedFiles; + for (const auto& entry : std::filesystem::directory_iterator(filePath.parent_path())) { + if (entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos) { + deletedFiles.push_back(entry.path().string()); + std::filesystem::remove(entry.path()); + } + } + + return deletedFiles; +} + +} // namespace detail + +enum class Mode { overwrite, append, multi }; + +template +struct BasicFileSink : public gr::Block> { + using Description = Doc; + template + using A = gr::Annotated; // optional shortening + + PortIn in; + + A, Visible> file_name; + Mode _mode = Mode::overwrite; + A, Visible> mode = std::string(magic_enum::enum_name(_mode)); + A, Visible> max_bytes_per_file = 0U; + + std::size_t _totalBytesWritten{0UZ}; + std::size_t _totalBytesWrittenFile{0UZ}; + std::ofstream _file; + std::size_t _fileCounter{0UZ}; + std::string _actualFileName; + + void settingsChanged(const property_map& /*oldSettings*/, const property_map& /*newSettings*/) { + _mode = magic_enum::enum_cast(mode, magic_enum::case_insensitive).value_or(_mode); + if (lifecycle::isActive(this->state())) { + closeFile(); + openNextFile(); + } + } + + void start() { + _totalBytesWritten = 0UZ; + openNextFile(); + } + + void stop() { closeFile(); } + + [[nodiscard]] constexpr work::Status processBulk(ConsumableSpan auto& dataIn) { + if (max_bytes_per_file.value != 0U && _totalBytesWrittenFile >= max_bytes_per_file.value) { + closeFile(); + openNextFile(); + } + + std::size_t nBytesMax = dataIn.size() * sizeof(T); + if (max_bytes_per_file.value != 0U) { + nBytesMax = std::min(nBytesMax, static_cast(max_bytes_per_file.value - _totalBytesWrittenFile)); + } + _file.write(reinterpret_cast(dataIn.data()), static_cast(nBytesMax)); + if (!dataIn.consume(nBytesMax / sizeof(T))) { + throw gr::exception("could not consume input samples"); + } + + if (!_file) { + throw gr::exception(fmt::format("failed to write to file '{}'.", _actualFileName)); + } + + _totalBytesWritten += nBytesMax; + _totalBytesWrittenFile += nBytesMax; + + return work::Status::OK; + } + +private: + void closeFile() { + if (_file.is_open()) { + _file.close(); + } + } + void openNextFile() { + closeFile(); + _totalBytesWrittenFile = 0UZ; + + detail::ensureDirectoryExists(file_name.value); + + std::filesystem::path filePath(file_name.value); + if (!std::filesystem::exists(filePath.parent_path())) { + throw gr::exception(fmt::format("path/file '{}' does not exist.", file_name.value)); + } + + // Open file handle based on mode + switch (_mode) { + case Mode::overwrite: { + _actualFileName = file_name.value; + _file.open(_actualFileName, std::ios::binary | std::ios::trunc); + } break; + case Mode::append: { + _actualFileName = file_name.value; + _file.open(_actualFileName, std::ios::binary | std::ios::app); + } break; + case Mode::multi: { + // _fileCounter ensures that the filenames are unique and still sortable by date-time, with an additional counter to handle rapid successive file creation. + _actualFileName = filePath.parent_path() / (detail::getIsoTime() + "_" + std::to_string(_fileCounter++) + "_" + filePath.filename().string()); + _file.open(_actualFileName, std::ios::binary); + break; + } + default: throw gr::exception("unsupported file mode."); + } + + if (!_file) { + throw gr::exception(fmt::format("failed to open file '{}'.", _actualFileName)); + } + } +}; + +template +struct BasicFileSource : public gr::Block> { + using Description = Doc; + + template + using A = gr::Annotated; // optional shortening + + PortOut out; + + A, Visible> file_name; + Mode _mode = Mode::overwrite; + A, Visible> mode = std::string(magic_enum::enum_name(_mode)); + A> repeat = false; + A, Visible> offset = 0U; + A, Visible> length = 0U; + A> trigger_name = "BasicFileSource::start"; + + std::ifstream _file; + std::vector _filesToRead; + bool _emittedStartTrigger = false; + std::size_t _totalBytesRead = 0UZ; + std::size_t _totalBytesReadFile = 0UZ; + std::size_t _currentFileIndex = 0UZ; + std::string _currentFileName; + + void settingsChanged(const property_map& /*oldSettings*/, const property_map& /*newSettings*/) { // + _mode = magic_enum::enum_cast(mode, magic_enum::case_insensitive).value_or(_mode); + } + + void start() { + _currentFileIndex = 0UZ; + _totalBytesRead = 0UZ; + _filesToRead.clear(); + + std::filesystem::path filePath(file_name.value); + if (!std::filesystem::exists(filePath.parent_path())) { + throw gr::exception(fmt::format("path/file '{}' does not exist.", file_name.value)); + } + + switch (_mode) { + case Mode::overwrite: + case Mode::append: { + _filesToRead.push_back(filePath); + } break; + case Mode::multi: { + _filesToRead = detail::getSortedFilesContaining(file_name.value); + } break; + default: throw gr::exception("unsupported file mode."); + } + + openNextFile(); + } + + void stop() { closeFile(); } + + [[nodiscard]] constexpr work::Status processBulk(PublishableSpan auto& dataOut) noexcept { + if (!_file.is_open()) { + return work::Status::DONE; + } + std::size_t nOutAvailable = dataOut.size() * sizeof(T); + if (length.value != 0U) { + nOutAvailable = std::min(nOutAvailable, (length.value * sizeof(T) - _totalBytesReadFile)); + } + + std::size_t bytesRead = static_cast(_file.read(reinterpret_cast(dataOut.data()), static_cast(nOutAvailable)).gcount()); + if (!_emittedStartTrigger && !trigger_name.value.empty()) { + out.publishTag( + property_map{ + {std::string(tag::TRIGGER_NAME.shortKey()), trigger_name.value}, // + {std::string(tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(std::chrono::system_clock::now())}, // + {std::string(tag::TRIGGER_OFFSET.shortKey()), 0.f} // + }, + static_cast(0)); + _emittedStartTrigger = true; + } + + dataOut.publish(bytesRead / sizeof(T)); + _totalBytesRead += bytesRead; + _totalBytesReadFile += bytesRead; + + if (bytesRead < nOutAvailable || (length.value != 0U && (_totalBytesReadFile >= length.value * sizeof(T)))) { + closeFile(); + if (_currentFileIndex < _filesToRead.size()) { + openNextFile(); + return work::Status::OK; + } else if (repeat) { + _currentFileIndex = 0UZ; + openNextFile(); + return work::Status::OK; + } else { + return work::Status::DONE; + } + } + + return work::Status::OK; + } + +private: + void closeFile() { + if (_file.is_open()) { + _file.close(); + } + } + void openNextFile() { + if (_currentFileIndex >= _filesToRead.size()) { + return; + } + _totalBytesReadFile = 0UZ; + _emittedStartTrigger = false; + + _currentFileName = _filesToRead[_currentFileIndex].string(); + _file.open(_currentFileName, std::ios::binary); + if (!_file) { + throw gr::exception(fmt::format("failed to open file '{}'.", _currentFileName)); + } + if (offset.value != 0U) { + _file.seekg(offset.value * sizeof(T), std::ios::beg); + } + _currentFileIndex++; + } +}; + +} // namespace gr::blocks::fileio + +ENABLE_REFLECTION_FOR_TEMPLATE(gr::blocks::fileio::BasicFileSink, in, file_name, mode, max_bytes_per_file) +ENABLE_REFLECTION_FOR_TEMPLATE(gr::blocks::fileio::BasicFileSource, out, file_name, mode, repeat, offset, length, trigger_name) + +const inline auto registerBasicFileIo = gr::registerBlock, gr::UncertainValue, std::complex, std::complex>(gr::globalBlockRegistry()) // + | gr::registerBlock, gr::UncertainValue, std::complex, std::complex>(gr::globalBlockRegistry()); + +#endif // BASICFILEIO_HPP diff --git a/blocks/fileio/test/CMakeLists.txt b/blocks/fileio/test/CMakeLists.txt new file mode 100644 index 00000000..43b4e130 --- /dev/null +++ b/blocks/fileio/test/CMakeLists.txt @@ -0,0 +1 @@ +add_ut_test(qa_FileIo) diff --git a/blocks/fileio/test/qa_FileIo.cpp b/blocks/fileio/test/qa_FileIo.cpp new file mode 100644 index 00000000..d1acd1af --- /dev/null +++ b/blocks/fileio/test/qa_FileIo.cpp @@ -0,0 +1,151 @@ +#include + +#include + +#include +#include + +#include + +namespace { +using namespace std::chrono_literals; +template +auto createWatchdog(Scheduler& sched, std::chrono::seconds timeOut = 2s, std::chrono::milliseconds pollingPeriod = 40ms) { + auto externalInterventionNeeded = std::make_shared(false); + + std::thread watchdogThread([&sched, externalInterventionNeeded, timeOut, pollingPeriod]() { + auto timeout = std::chrono::steady_clock::now() + timeOut; + while (std::chrono::steady_clock::now() < timeout) { + if (sched.state() == gr::lifecycle::State::STOPPED) { + return; + } + std::this_thread::sleep_for(pollingPeriod); + } + fmt::println("watchdog kicked in"); + externalInterventionNeeded->store(true, std::memory_order_relaxed); + sched.requestStop(); + fmt::println("requested scheduler to stop"); + }); + + return std::make_pair(std::move(watchdogThread), externalInterventionNeeded); +} + +template +void runTest(const gr::blocks::fileio::Mode mode, const std::shared_ptr& threadPool) { + using namespace boost::ut; + using namespace gr::blocks::fileio; + using namespace gr::testing; + using scheduler = gr::scheduler::Simple<>; + + constexpr gr::Size_t nSamples = 1024U; + const gr::Size_t maxFileSize = mode == gr::blocks::fileio::Mode::multi ? 256U : 0U; + std::string modeName{magic_enum::enum_name(mode)}; + std::string fileName = fmt::format("/tmp/gr4_file_sink_test/TestFileName_{}.bin", modeName); + gr::blocks::fileio::detail::deleteFilesContaining(fileName); + + "BasicFileSink"_test = [&] { // NOSONAR capture all + std::string testCaseName = fmt::format("BasicFileSink: failed for type '{}' and '{}", gr::meta::type_name(), modeName); + gr::Graph flow; + + auto& source = flow.emplaceBlock>({{"n_samples_max", nSamples}}); + auto& fileSink = flow.emplaceBlock>({{"file_name", fileName}, {"mode", modeName}, {"max_bytes_per_file", maxFileSize}}); + expect(eq(gr::ConnectionResult::SUCCESS, flow.template connect<"out">(source).template to<"in">(fileSink))); + + auto sched = scheduler{std::move(flow), threadPool}; + auto [watchdogThread, externalInterventionNeeded] = createWatchdog(sched, 2s); + expect(sched.runAndWait().has_value()) << testCaseName; + + if (watchdogThread.joinable()) { + watchdogThread.join(); + } + expect(!externalInterventionNeeded->load(std::memory_order_relaxed)) << testCaseName; + expect(eq(source.count, nSamples)) << testCaseName; + expect(eq(fileSink._totalBytesWritten / sizeof(DataType), nSamples)) << testCaseName; + + std::vector files = gr::blocks::fileio::detail::getSortedFilesContaining(fileName); + if (mode == gr::blocks::fileio::Mode::multi) { + // greater-equal 'ge' because files can be legitimally zero-sized + expect(ge(files.size(), (nSamples * sizeof(DataType)) / maxFileSize)) << testCaseName; + } else { + expect(eq(files.size(), 1U)) << testCaseName; + } + for (const auto& file : files) { + auto fileSize = gr::blocks::fileio::detail::getFileSize(file); + if (mode == gr::blocks::fileio::Mode::multi) { + // less-equal 'le' because files can be legitimally zero-sized + expect(le(fileSize, maxFileSize)) << testCaseName; + } else { + expect(eq(fileSize, nSamples * sizeof(DataType))) << testCaseName; + } + } + }; + + // N.B. test directory contains the output files from the previous sink test + "BasicFileSource"_test = [&] { // NOSONAR capture all + std::string testCaseName = fmt::format("BasicFileSource: failed for type '{}' and '{}", gr::meta::type_name(), modeName); + gr::Graph flow; + auto& fileSource = flow.emplaceBlock>({{"file_name", fileName}, {"mode", modeName}}); + auto& sink = flow.emplaceBlock>(); + + expect(eq(gr::ConnectionResult::SUCCESS, flow.template connect<"out">(fileSource).template to<"in">(sink))); + + auto schedRead = scheduler{std::move(flow), threadPool}; + auto [watchdogThreadRead, externalInterventionNeededRead] = createWatchdog(schedRead, 2s); + expect(schedRead.runAndWait().has_value()) << testCaseName; + + if (watchdogThreadRead.joinable()) { + watchdogThreadRead.join(); + } + expect(!externalInterventionNeededRead->load(std::memory_order_relaxed)) << testCaseName; + expect(eq(sink.count, nSamples)) << testCaseName; + expect(eq(fileSource._totalBytesRead, nSamples * sizeof(DataType))) << testCaseName; + }; + + // Test for `offset` and `length` parameters + "BasicFileSource with offset and length"_test = [&] { // NOSONAR capture all + constexpr gr::Size_t offsetSamples = 8U; + constexpr gr::Size_t lengthSamples = 8U; + std::string testCaseName = fmt::format("BasicFileSource with offset and length: failed for type '{}' and '{}", gr::meta::type_name(), modeName); + gr::Graph flow; + auto& fileSource = flow.emplaceBlock>({{"file_name", fileName}, {"mode", modeName}, {"offset", offsetSamples}, {"length", lengthSamples}}); + auto& sink = flow.emplaceBlock>(); + + expect(eq(gr::ConnectionResult::SUCCESS, flow.template connect<"out">(fileSource).template to<"in">(sink))); + + auto schedRead = scheduler{std::move(flow), threadPool}; + auto [watchdogThreadRead, externalInterventionNeededRead] = createWatchdog(schedRead, 2s); + expect(schedRead.runAndWait().has_value()) << testCaseName; + + if (watchdogThreadRead.joinable()) { + watchdogThreadRead.join(); + } + expect(!externalInterventionNeededRead->load(std::memory_order_relaxed)) << testCaseName; + + auto nonEmptyFileCount = static_cast(std::ranges::count_if(gr::blocks::fileio::detail::getSortedFilesContaining(fileName), [](const auto& file) { return std::filesystem::file_size(file) > 0; })); + expect(eq(sink.count, nonEmptyFileCount * lengthSamples)) << testCaseName; + expect(eq(fileSource._totalBytesRead, nonEmptyFileCount * lengthSamples * sizeof(DataType))) << testCaseName; + }; + + expect(!gr::blocks::fileio::detail::deleteFilesContaining(fileName).empty()); +} + +} // anonymous namespace + +const boost::ut::suite<"basic file IO tests"> basicFileIOTests = [] { + using namespace std::chrono_literals; + using namespace boost::ut; + using namespace gr; + + auto threadPool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); + + constexpr auto kArithmeticTypes = std::tuple, gr::UncertainValue, std::complex, std::complex>(); + + using enum gr::blocks::fileio::Mode; + "overwrite mode"_test = [&threadPool](const T&) { runTest(overwrite, threadPool); } | kArithmeticTypes; + + "append mode"_test = [&threadPool](const T&) { runTest(append, threadPool); } | kArithmeticTypes; + + "create new mode"_test = [&threadPool](const T&) { runTest(multi, threadPool); } | kArithmeticTypes; +}; + +int main() { /* not needed for UT */ } diff --git a/core/benchmarks/CMakeLists.txt b/core/benchmarks/CMakeLists.txt index a96323c0..18e1b8a4 100644 --- a/core/benchmarks/CMakeLists.txt +++ b/core/benchmarks/CMakeLists.txt @@ -1,6 +1,6 @@ function (add_gr_benchmark BM_NAME) add_benchmark(${BM_NAME}) - target_link_libraries(${BM_NAME} PRIVATE gnuradio-core refl-cpp fmt gr-basic gr-math gr-testing) + target_link_libraries(${BM_NAME} PRIVATE gnuradio-core refl-cpp fmt gr-basic gr-fileio gr-math gr-testing) target_compile_options(${BM_NAME} PRIVATE -O3) # performance related benchmarks should be optimised during compile-time endfunction() diff --git a/core/test/CMakeLists.txt b/core/test/CMakeLists.txt index 8e59b2ab..06ef5858 100644 --- a/core/test/CMakeLists.txt +++ b/core/test/CMakeLists.txt @@ -5,7 +5,7 @@ configure_file(build_configure.hpp.in build_configure.hpp @ONLY) function(setup_test_no_asan TEST_NAME) target_include_directories(${TEST_NAME} PRIVATE ${CMAKE_BINARY_DIR}/include ${CMAKE_CURRENT_BINARY_DIR}) - target_link_libraries(${TEST_NAME} PRIVATE gnuradio-options gnuradio-core fmt refl-cpp yaml-cpp::yaml-cpp fftw gr-basic gr-math gr-testing ut) + target_link_libraries(${TEST_NAME} PRIVATE gnuradio-options gnuradio-core fmt refl-cpp yaml-cpp::yaml-cpp fftw gr-basic gr-fileio gr-math gr-testing ut) add_test(NAME ${TEST_NAME} COMMAND ${CMAKE_CROSSCOMPILING_EMULATOR} ${CMAKE_CURRENT_BINARY_DIR}/${TEST_NAME}) endfunction()