Skip to content

Commit

Permalink
Prototype (packet record/replay working)
Browse files Browse the repository at this point in the history
  • Loading branch information
mojomex committed Jun 26, 2024
1 parent bbada11 commit d08dffd
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 90 deletions.
9 changes: 0 additions & 9 deletions nebula_common/include/nebula_common/aeva/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,8 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <functional>
#include <iomanip>
#include <memory>
#include <optional>
#include <sstream>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

namespace nebula::drivers::aeva
Expand Down Expand Up @@ -188,9 +181,7 @@ struct Aeries2Config : public SensorConfigurationBase
{
std::string sensor_ip;
std::string frame_id;
bool launch_hw;
bool setup_sensor;
uint16_t diag_span;
float dithering_enable_ego_speed;
std::string dithering_pattern_option;
float ele_offset_rad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ void AevaAries2Decoder::processPointcloudMessage(const aeva::PointCloudMessage &
if (static_cast<ssize_t>(i) == state.new_frame_index) {
std::scoped_lock lock(mtx_callback_);

callback_(std::move(cloud_state_.cloud), cloud_state_.timestamp);
if (callback_) {
callback_(std::move(cloud_state_.cloud), cloud_state_.timestamp);
}
// Cloud time gets reset below, on the first VALID point that will be
// put in the cloud. This guarantees that the earliest point(s) in the cloud
// have relative time 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include "nebula_common/nebula_common.hpp"
#include "nebula_common/util/parsing.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_aeva/connections/health.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_aeva/connections/pointcloud.hpp"
Expand All @@ -28,7 +27,6 @@
#include <nebula_common/loggers/logger.hpp>
#include <nlohmann/json.hpp>

#include <exception>
#include <memory>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -81,16 +79,15 @@ class AevaHwInterface
std::shared_ptr<loggers::Logger> logger,
const std::shared_ptr<const aeva::Aeries2Config> & config,
std::shared_ptr<PointcloudParser> pointcloud_api,
std::shared_ptr<TelemetryParser> telemetry_api = nullptr,
std::shared_ptr<ReconfigParser> reconfig_api = nullptr,
std::shared_ptr<HealthParser> health_api = nullptr)
std::shared_ptr<TelemetryParser> telemetry_api, std::shared_ptr<ReconfigParser> reconfig_api,
std::shared_ptr<HealthParser> health_api)
: logger_(std::move(logger)),
pointcloud_api_(std::move(pointcloud_api)),
telemetry_api_(std::move(telemetry_api)),
reconfig_api_(std::move(reconfig_api)),
health_api_(std::move(health_api))
{
if (config->setup_sensor) {
if (config->setup_sensor && reconfig_api_) {
logger_->info("Configuring sensor...");
onConfigChange(config);
logger_->info("Config OK");
Expand Down Expand Up @@ -134,6 +131,11 @@ class AevaHwInterface
pointcloud_api_->registerCallback(std::move(callback));
}

void registerRawCloudPacketCallback(connections::ObservableByteStream::callback_t callback)
{
pointcloud_api_->registerBytesCallback(std::move(callback));
}

void registerHealthCallback(HealthParser::callback_t callback)
{
health_api_->registerCallback(std::move(callback));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,21 @@
#pragma once

#include "nebula_common/aeva/types.hpp"
#include "nebula_common/util/expected.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/byte_stream.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/byte_view.hpp"

#include <bits/chrono.h>

#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <deque>
#include <exception>
#include <functional>
#include <iterator>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -130,7 +120,8 @@ T pull_and_parse(const std::vector<uint8_t> & stream)
template <typename T>
T pull_and_parse(PullableByteStream & stream)
{
auto buffer = stream.read(sizeof(T));
std::vector<uint8_t> buffer;
stream.read(buffer, sizeof(T));
return pull_and_parse<T>(buffer);
}

Expand Down Expand Up @@ -163,6 +154,10 @@ class AevaParser : public ObservableByteStream
explicit AevaParser(std::shared_ptr<PullableByteStream> incoming_byte_stream)
: incoming_(std::move(incoming_byte_stream))
{
if (!incoming_) {
throw std::runtime_error("Incoming byte stream cannot be null");
}

thread_ = std::thread([&]() {
while (true) onLowLevelMessage();
});
Expand All @@ -183,7 +178,8 @@ class AevaParser : public ObservableByteStream
{
std::lock_guard lock(mtx_bytes_callback_);

auto some_ip_raw = incoming_->read(sizeof(SomeIpHeader));
std::vector<uint8_t> some_ip_raw;
incoming_->read(some_ip_raw, sizeof(SomeIpHeader));
if (bytes_callback_) {
bytes_callback_(some_ip_raw);
}
Expand All @@ -192,7 +188,9 @@ class AevaParser : public ObservableByteStream
expect_eq(some_ip.service_id, 0xAEFAu, "Aeva service header mismatch");
expect_eq(some_ip.method_id, StreamId, "Unexpected method ID");
auto payload_length = some_ip.message_length - 12;
auto payload_raw = incoming_->read(payload_length);

std::vector<uint8_t> payload_raw;
incoming_->read(payload_raw, payload_length);
if (bytes_callback_) {
bytes_callback_(payload_raw);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
#include <boost/endian/conversion.hpp>

#include <algorithm>
#include <array>
#include <cstdint>
#include <cstring>
#include <iterator>
#include <map>
#include <memory>
#include <stdexcept>
#include <string>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include <vector>

namespace nebula::drivers::connections
Expand All @@ -31,7 +29,7 @@ namespace nebula::drivers::connections
class ObservableByteStream
{
public:
using callback_t = typename std::function<void(std::vector<uint8_t> buffer)>;
using callback_t = typename std::function<void(const std::vector<uint8_t> & buffer)>;

ObservableByteStream() = default;

Expand Down Expand Up @@ -60,7 +58,7 @@ class PullableByteStream

virtual ~PullableByteStream() = default;

virtual std::vector<uint8_t> && read(size_t n_bytes) = 0;
virtual void read(std::vector<uint8_t> & into, size_t n_bytes) = 0;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,13 @@
#pragma once

#include "nebula_common/util/expected.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/byte_stream.hpp"

#include <sys/types.h>

#include <cstddef>
#include <cstdint>
#include <functional>
#include <iostream>
#include <iterator>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

namespace nebula::drivers::connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -55,42 +51,43 @@ class StreamBuffer : public PullableByteStream
[&](auto bytes) { onBytesFromUnderlying(std::move(bytes)); });
}

std::vector<uint8_t> && read(size_t n_bytes) override
void read(std::vector<uint8_t> & into, size_t n_bytes) override
{
std::vector<uint8_t> result{};
into.clear();

// If there is a packet left from the last read that has not been read completely read
// (remainder_), then continue there. Otherwise, pop the next one off the queue.
auto bytes = remainder_ ? std::move(remainder_) : buffer_.pop();

// Sizes match perfectly, return popped vector
if (bytes->size() == n_bytes) {
result.swap(*bytes);
return std::move(result);
into.swap(*bytes);
return;
}

// Too many bytes popped off the queue, put remainder back
if (bytes->size() > n_bytes) {
// Cut the remaining bytes off of bytes (will become remainder_ later). `bytes` then contains
// exactly the `n_bytes` we want as the result, and `result` contains exactly the bytes we
// exactly the `n_bytes` we want as the into, and `into` contains exactly the bytes we
// want to become the `remainder_`. Thus, we can just swap the two vectors.
result.reserve(bytes->size() - n_bytes);
result.insert(result.end(), bytes->end() - static_cast<ssize_t>(n_bytes), bytes->end());
into.reserve(bytes->size() - n_bytes);
into.insert(into.end(), bytes->end() - static_cast<ssize_t>(n_bytes), bytes->end());
bytes->resize(bytes->size() - n_bytes);

result.swap(*bytes);
into.swap(*bytes);
remainder_ = std::move(bytes);
return std::move(result);
return;
}

// Too little bytes popped off the queue, fetch next part
if (bytes->size() < n_bytes) {
auto remaining_length = n_bytes - bytes->size();
std::vector<uint8_t> remaining_bytes = read(remaining_length);
result.swap(*bytes);
result.reserve(n_bytes);
result.insert(result.end(), remaining_bytes.begin(), remaining_bytes.end());
return std::move(result);
std::vector<uint8_t> remaining_bytes;
read(remaining_bytes, remaining_length);
into.swap(*bytes);
into.reserve(n_bytes);
into.insert(into.end(), remaining_bytes.begin(), remaining_bytes.end());
return;
}

assert(false); // Unreachable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@

#include <cstddef>
#include <cstdint>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

namespace nebula::drivers::connections
Expand All @@ -52,12 +46,11 @@ class TcpStream : public PullableByteStream
boost::asio::connect(socket_, endpoint_iterator);
}

std::vector<uint8_t> && read(size_t n_bytes) override
void read(std::vector<uint8_t> & into, size_t n_bytes) override
{
std::vector<uint8_t> buffer;
buffer.resize(n_bytes);
boost::asio::read(socket_, boost::asio::buffer(buffer), boost::asio::transfer_exactly(n_bytes));
return std::forward<std::vector<uint8_t>>(buffer);
into.clear();
into.resize(n_bytes);
boost::asio::read(socket_, boost::asio::buffer(into), boost::asio::transfer_exactly(n_bytes));
}

private:
Expand Down
File renamed without changes.
11 changes: 10 additions & 1 deletion nebula_ros/include/nebula_ros/aeva/aeva_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
#include <rclcpp/subscription.hpp>
#include <rclcpp_components/register_node_macro.hpp>

#include <nebula_msgs/msg/detail/nebula_packets__struct.hpp>
#include <nebula_msgs/msg/nebula_packets.hpp>
#include <sensor_msgs/msg/point_cloud2.hpp>

#include <boost/algorithm/string/join.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>

#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <vector>

namespace nebula::ros
{
Expand All @@ -52,6 +55,12 @@ class AevaRosWrapper final : public rclcpp::Node
Status declareAndGetSensorConfigParams();
Status validateAndSetConfig(std::shared_ptr<const drivers::aeva::Aeries2Config> & new_config);

void recordRawPacket(const std::vector<uint8_t> & bytes);

rclcpp::Publisher<nebula_msgs::msg::NebulaPackets>::SharedPtr packets_pub_{};
std::mutex mtx_current_scan_msg_;
nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_{};

rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr cloud_pub_{};
std::shared_ptr<nebula::ros::WatchdogTimer> cloud_watchdog_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

namespace nebula::ros
{
Expand Down
Loading

0 comments on commit d08dffd

Please sign in to comment.