Skip to content

Commit

Permalink
new block: BasicFile[Sink, Source] (#353)
Browse files Browse the repository at this point in the history
* new block: BasicFile[Sink, Source]
* incorporates review comments by @daniestevez

Signed-off-by: Ralph J. Steinhagen <[email protected]>
  • Loading branch information
RalphSteinhagen committed Jun 3, 2024
1 parent 2fab62d commit bea174e
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 2 deletions.
1 change: 1 addition & 0 deletions blocks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_subdirectory(basic)
add_subdirectory(fileio)
add_subdirectory(filter)
add_subdirectory(fourier)
add_subdirectory(http)
Expand Down
7 changes: 7 additions & 0 deletions blocks/fileio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
add_library(gr-fileio INTERFACE)
target_link_libraries(gr-fileio INTERFACE gnuradio-core)
target_include_directories(gr-fileio INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include/> $<INSTALL_INTERFACE:include/>)

if (ENABLE_TESTING)
add_subdirectory(test)
endif ()
307 changes: 307 additions & 0 deletions blocks/fileio/include/gnuradio-4.0/fileio/BasicFileIo.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
#ifndef BASICFILEIO_HPP
#define BASICFILEIO_HPP

#include <fmt/chrono.h>
#include <fmt/format.h>
#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/BlockRegistry.hpp>
#include <magic_enum.hpp>

#include <chrono>
#include <complex>
#include <filesystem>
#include <fstream>
#include <span>
#include <string_view>

namespace gr::blocks::fileio {

namespace detail {
[[nodiscard]] inline std::string getIsoTime() noexcept {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
return fmt::format("{:%Y-%m-%dT%H:%M:%S}.{:06}", // ms-precision ISO time-format
fmt::localtime(std::chrono::system_clock::to_time_t(now)), //
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() % 1'000);
}

inline void ensureDirectoryExists(const std::filesystem::path& filePath) { std::filesystem::create_directories(filePath.parent_path()); }

inline std::vector<std::filesystem::path> getSortedFilesContaining(const std::string& fileName) {
std::filesystem::path filePath(fileName);
if (!std::filesystem::exists(filePath.parent_path())) {
throw gr::exception(fmt::format("path/file '{}' does not exist.", fileName));
}

std::vector<std::filesystem::path> matchingFiles;
std::copy_if(std::filesystem::directory_iterator(filePath.parent_path()), std::filesystem::directory_iterator{}, std::back_inserter(matchingFiles), [&](const auto& entry) { return entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos; });

std::sort(matchingFiles.begin(), matchingFiles.end());
return matchingFiles;
}

[[nodiscard]] inline std::uintmax_t getFileSize(const std::filesystem::path& filePath) {
if (!std::filesystem::exists(filePath)) {
throw std::runtime_error(fmt::format("file '{}' does not exist.", filePath.string()));
}
return std::filesystem::file_size(filePath);
}

[[maybe_unused]] inline std::vector<std::string> deleteFilesContaining(const std::string& fileName) {
std::filesystem::path filePath(fileName);
if (!std::filesystem::exists(filePath.parent_path())) {
return {};
}

std::vector<std::string> deletedFiles;
for (const auto& entry : std::filesystem::directory_iterator(filePath.parent_path())) {
if (entry.is_regular_file() && entry.path().string().find(filePath.filename().string()) != std::string::npos) {
deletedFiles.push_back(entry.path().string());
std::filesystem::remove(entry.path());
}
}

return deletedFiles;
}

} // namespace detail

enum class Mode { overwrite, append, multi };

template<typename T>
struct BasicFileSink : public gr::Block<BasicFileSink<T>> {
using Description = Doc<R""(A sink block for writing a stream to a binary file.
The file can be played back using a 'BasicFileSource' or read by any program that supports binary files (e.g. Python, C, C++, MATLAB).
For complex types, the binary file contains [float, double]s in IQIQIQ order. No metadata is included with the binary data.)
Important: this implementation assumes a host-order, CPU architecture specific byte order!)"">;
template<typename U, gr::meta::fixed_string description = "", typename... Arguments>
using A = gr::Annotated<U, description, Arguments...>; // optional shortening

PortIn<T> in;

A<std::string, "file name", Doc<"base filename, prefixed if ">, Visible> file_name;
Mode _mode = Mode::overwrite;
A<std::string, "mode", Doc<"mode: \"overwrite\", \"append\", \"multi\"">, Visible> mode = std::string(magic_enum::enum_name(_mode));
A<gr::Size_t, "max bytes per file", Doc<"max bytes per file, 0: infinite ">, Visible> max_bytes_per_file = 0U;

std::size_t _totalBytesWritten{0UZ};
std::size_t _totalBytesWrittenFile{0UZ};
std::ofstream _file;
std::size_t _fileCounter{0UZ};
std::string _actualFileName;

void settingsChanged(const property_map& /*oldSettings*/, const property_map& /*newSettings*/) {
_mode = magic_enum::enum_cast<Mode>(mode, magic_enum::case_insensitive).value_or(_mode);
if (lifecycle::isActive(this->state())) {
closeFile();
openNextFile();
}
}

void start() {
_totalBytesWritten = 0UZ;
openNextFile();
}

void stop() { closeFile(); }

[[nodiscard]] constexpr work::Status processBulk(ConsumableSpan auto& dataIn) {
if (max_bytes_per_file.value != 0U && _totalBytesWrittenFile >= max_bytes_per_file.value) {
closeFile();
openNextFile();
}

std::size_t nBytesMax = dataIn.size() * sizeof(T);
if (max_bytes_per_file.value != 0U) {
nBytesMax = std::min(nBytesMax, static_cast<std::size_t>(max_bytes_per_file.value - _totalBytesWrittenFile));
}
_file.write(reinterpret_cast<const char*>(dataIn.data()), static_cast<std::streamsize>(nBytesMax));
if (!dataIn.consume(nBytesMax / sizeof(T))) {
throw gr::exception("could not consume input samples");
}

if (!_file) {
throw gr::exception(fmt::format("failed to write to file '{}'.", _actualFileName));
}

_totalBytesWritten += nBytesMax;
_totalBytesWrittenFile += nBytesMax;

return work::Status::OK;
}

private:
void closeFile() {
if (_file.is_open()) {
_file.close();
}
}
void openNextFile() {
closeFile();
_totalBytesWrittenFile = 0UZ;

detail::ensureDirectoryExists(file_name.value);

std::filesystem::path filePath(file_name.value);
if (!std::filesystem::exists(filePath.parent_path())) {
throw gr::exception(fmt::format("path/file '{}' does not exist.", file_name.value));
}

// Open file handle based on mode
switch (_mode) {
case Mode::overwrite: {
_actualFileName = file_name.value;
_file.open(_actualFileName, std::ios::binary | std::ios::trunc);
} break;
case Mode::append: {
_actualFileName = file_name.value;
_file.open(_actualFileName, std::ios::binary | std::ios::app);
} break;
case Mode::multi: {
// _fileCounter ensures that the filenames are unique and still sortable by date-time, with an additional counter to handle rapid successive file creation.
_actualFileName = filePath.parent_path() / (detail::getIsoTime() + "_" + std::to_string(_fileCounter++) + "_" + filePath.filename().string());
_file.open(_actualFileName, std::ios::binary);
break;
}
default: throw gr::exception("unsupported file mode.");
}

if (!_file) {
throw gr::exception(fmt::format("failed to open file '{}'.", _actualFileName));
}
}
};

template<typename T>
struct BasicFileSource : public gr::Block<BasicFileSource<T>> {
using Description = Doc<R""(A source block for reading a binary file and outputting the data.
This source is the counterpart to 'BasicFileSink'.
For complex types, the binary file contains [float, double]s in IQIQIQ order. No metadata is expected in the binary data.
Important: this implementation assumes a host-order, CPU architecture specific byte order!)"">;

template<typename U, gr::meta::fixed_string description = "", typename... Arguments>
using A = gr::Annotated<U, description, Arguments...>; // optional shortening

PortOut<T> out;

A<std::string, "file name", Doc<"Base filename, prefixed if necessary">, Visible> file_name;
Mode _mode = Mode::overwrite;
A<std::string, "mode", Doc<"mode: \"overwrite\", \"append\", \"multi\"">, Visible> mode = std::string(magic_enum::enum_name(_mode));
A<bool, "repeat", Doc<"true: repeat back-to-back">> repeat = false;
A<gr::Size_t, "offset", Doc<"file start offset in bytes">, Visible> offset = 0U;
A<gr::Size_t, "length", Doc<"max number of samples items to read (0: infinite)">, Visible> length = 0U;
A<std::string, "trigger name", Doc<"name of trigger added to each file chunk">> trigger_name = "BasicFileSource::start";

std::ifstream _file;
std::vector<std::filesystem::path> _filesToRead;
bool _emittedStartTrigger = false;
std::size_t _totalBytesRead = 0UZ;
std::size_t _totalBytesReadFile = 0UZ;
std::size_t _currentFileIndex = 0UZ;
std::string _currentFileName;

void settingsChanged(const property_map& /*oldSettings*/, const property_map& /*newSettings*/) { //
_mode = magic_enum::enum_cast<Mode>(mode, magic_enum::case_insensitive).value_or(_mode);
}

void start() {
_currentFileIndex = 0UZ;
_totalBytesRead = 0UZ;
_filesToRead.clear();

std::filesystem::path filePath(file_name.value);
if (!std::filesystem::exists(filePath.parent_path())) {
throw gr::exception(fmt::format("path/file '{}' does not exist.", file_name.value));
}

switch (_mode) {
case Mode::overwrite:
case Mode::append: {
_filesToRead.push_back(filePath);
} break;
case Mode::multi: {
_filesToRead = detail::getSortedFilesContaining(file_name.value);
} break;
default: throw gr::exception("unsupported file mode.");
}

openNextFile();
}

void stop() { closeFile(); }

[[nodiscard]] constexpr work::Status processBulk(PublishableSpan auto& dataOut) noexcept {
if (!_file.is_open()) {
return work::Status::DONE;
}
std::size_t nOutAvailable = dataOut.size() * sizeof(T);
if (length.value != 0U) {
nOutAvailable = std::min(nOutAvailable, (length.value * sizeof(T) - _totalBytesReadFile));
}

std::size_t bytesRead = static_cast<std::size_t>(_file.read(reinterpret_cast<char*>(dataOut.data()), static_cast<std::streamsize>(nOutAvailable)).gcount());
if (!_emittedStartTrigger && !trigger_name.value.empty()) {
out.publishTag(
property_map{
{std::string(tag::TRIGGER_NAME.shortKey()), trigger_name.value}, //
{std::string(tag::TRIGGER_TIME.shortKey()), settings::convertTimePointToUint64Ns(std::chrono::system_clock::now())}, //
{std::string(tag::TRIGGER_OFFSET.shortKey()), 0.f} //
},
static_cast<Tag::signed_index_type>(0));
_emittedStartTrigger = true;
}

dataOut.publish(bytesRead / sizeof(T));
_totalBytesRead += bytesRead;
_totalBytesReadFile += bytesRead;

if (bytesRead < nOutAvailable || (length.value != 0U && (_totalBytesReadFile >= length.value * sizeof(T)))) {
closeFile();
if (_currentFileIndex < _filesToRead.size()) {
openNextFile();
return work::Status::OK;
} else if (repeat) {
_currentFileIndex = 0UZ;
openNextFile();
return work::Status::OK;
} else {
return work::Status::DONE;
}
}

return work::Status::OK;
}

private:
void closeFile() {
if (_file.is_open()) {
_file.close();
}
}
void openNextFile() {
if (_currentFileIndex >= _filesToRead.size()) {
return;
}
_totalBytesReadFile = 0UZ;
_emittedStartTrigger = false;

_currentFileName = _filesToRead[_currentFileIndex].string();
_file.open(_currentFileName, std::ios::binary);
if (!_file) {
throw gr::exception(fmt::format("failed to open file '{}'.", _currentFileName));
}
if (offset.value != 0U) {
_file.seekg(offset.value * sizeof(T), std::ios::beg);
}
_currentFileIndex++;
}
};

} // namespace gr::blocks::fileio

ENABLE_REFLECTION_FOR_TEMPLATE(gr::blocks::fileio::BasicFileSink, in, file_name, mode, max_bytes_per_file)
ENABLE_REFLECTION_FOR_TEMPLATE(gr::blocks::fileio::BasicFileSource, out, file_name, mode, repeat, offset, length, trigger_name)

const inline auto registerBasicFileIo = gr::registerBlock<gr::blocks::fileio::BasicFileSink, uint8_t, uint16_t, uint32_t, uint64_t, int8_t, int16_t, int32_t, int64_t, float, double, gr::UncertainValue<float>, gr::UncertainValue<double>, std::complex<float>, std::complex<double>>(gr::globalBlockRegistry()) //
| gr::registerBlock<gr::blocks::fileio::BasicFileSource, uint8_t, uint16_t, uint32_t, uint64_t, int8_t, int16_t, int32_t, int64_t, float, double, gr::UncertainValue<float>, gr::UncertainValue<double>, std::complex<float>, std::complex<double>>(gr::globalBlockRegistry());

#endif // BASICFILEIO_HPP
1 change: 1 addition & 0 deletions blocks/fileio/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_ut_test(qa_FileIo)
Loading

0 comments on commit bea174e

Please sign in to comment.