Skip to content

Commit

Permalink
Merge pull request #2 from CWRUbotix/InvincibleRMC-patch-1
Browse files Browse the repository at this point in the history
Update industrial_ci_action.yml
  • Loading branch information
InvincibleRMC authored Jul 27, 2024
2 parents 8b8f033 + 5c87103 commit a0aaac1
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 104 deletions.
1 change: 1 addition & 0 deletions .github/workflows/industrial_ci_action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
matrix:
env:
- {ROS_DISTRO: iron, ROS_REPO: main}
- {ROS_DISTRO: jazzy, ROS_REPO: main}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 0 additions & 4 deletions .mypy.ini

This file was deleted.

88 changes: 44 additions & 44 deletions launch/ros2_video_streamer_node_launch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Tuple
from typing import Any

from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.launch_context import LaunchContext
Expand All @@ -17,77 +17,77 @@ def generate_launch_description() -> LaunchDescription:
# TODO remove args and kwargs?
def launch_setup(
context: LaunchContext,
*args: Tuple[Any],
**kwargs: Dict[Any, Any],
*args: tuple[Any],
**kwargs: dict[Any, Any],
) -> LaunchDescription:
"""Generate array to be included in launch description."""
# Declare and early evaluate camera_name argument
# Only strictly necessary to set streamer node name & namespace,
# but makes other substitutions nicer too
camera_name_argument: DeclareLaunchArgument = DeclareLaunchArgument(
"camera_name",
default_value="simulated_cam",
description="Name of the camera, used to autogenerate"
+ "`image_topic_name`, `info_topic_name`, & `file_name`. Will find video"
+ "files named `<camera_name>.mp4`.",
'camera_name',
default_value='simulated_cam',
description='Name of the camera, used to autogenerate'
+ '`image_topic_name`, `info_topic_name`, & `file_name`. Will find video'
+ 'files named `<camera_name>.mp4`.',
)
camera_name_str = LaunchConfiguration("camera_name").perform(context)
camera_name_str = LaunchConfiguration('camera_name').perform(context)

launch_arguments: list[DeclareLaunchArgument] = [
DeclareLaunchArgument(
"node_name",
default_value="streamer_node",
'node_name',
default_value='streamer_node',
),
DeclareLaunchArgument(
"image_topic_name",
default_value=f"/{camera_name_str}/image_raw",
'image_topic_name',
default_value=f'/{camera_name_str}/image_raw',
),
DeclareLaunchArgument(
"info_topic_name",
default_value=f"/{camera_name_str}/camera_info",
'info_topic_name',
default_value=f'/{camera_name_str}/camera_info',
),
DeclareLaunchArgument(
"config_file_name",
default_value="",
description="Name of the config file. Defaults to empty string, which"
+ "means no config file. Contents published on `CameraInfo` message.",
'config_file_name',
default_value='',
description='Name of the config file. Defaults to empty string, which'
+ 'means no config file. Contents published on `CameraInfo` message.',
),
DeclareLaunchArgument("loop", default_value="true"),
DeclareLaunchArgument('loop', default_value='true'),
DeclareLaunchArgument(
"frame_id",
default_value="",
description="`frame_id` field in the `CameraInfo` topic",
'frame_id',
default_value='',
description='`frame_id` field in the `CameraInfo` topic',
),
DeclareLaunchArgument(
"type",
description="Type of media source, (e.g. image or video)",
'type',
description='Type of media source, (e.g. image or video)',
),
DeclareLaunchArgument(
"file_name",
default_value=f"{camera_name_str}.mp4",
description="Name of file",
'file_name',
default_value=f'{camera_name_str}.mp4',
description='Name of file',
),
DeclareLaunchArgument(
"start",
default_value="0",
description="Where the Video is starting from default is frame 0",
'start',
default_value='0',
description='Where the Video is starting from default is frame 0',
),
]

streamer_node: Node = Node(
package="ros2_video_streamer",
executable="ros2_video_streamer_node",
name="streamer_node",
namespace=f"simulation/{camera_name_str}",
streamer_node = Node(
package='ros2_video_streamer',
executable='ros2_video_streamer_node',
name='streamer_node',
namespace=f'simulation/{camera_name_str}',
parameters=[
{"config_file_path": LaunchConfiguration("config_file_name")},
{"image_topic_name": LaunchConfiguration("image_topic_name")},
{"info_topic_name": LaunchConfiguration("info_topic_name")},
{"loop": LaunchConfiguration("loop")},
{"frame_id": LaunchConfiguration("frame_id")},
{"type": LaunchConfiguration("type")},
{"file_name": LaunchConfiguration("file_name")},
{"start": LaunchConfiguration("start")},
{'config_file_path': LaunchConfiguration('config_file_name')},
{'image_topic_name': LaunchConfiguration('image_topic_name')},
{'info_topic_name': LaunchConfiguration('info_topic_name')},
{'loop': LaunchConfiguration('loop')},
{'frame_id': LaunchConfiguration('frame_id')},
{'type': LaunchConfiguration('type')},
{'file_name': LaunchConfiguration('file_name')},
{'start': LaunchConfiguration('start')},
],
)

Expand Down
33 changes: 17 additions & 16 deletions launch/three_camera_launch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os

import launch
from ament_index_python.packages import get_package_share_directory

import launch
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource

Expand All @@ -12,9 +13,9 @@ def generate_launch_description() -> launch.launch_description.LaunchDescription
You'll need `bottom_cam.mp4`, `front_cam.mp4`, & `manip_cam.mp4` in `ros2_video_streamer`.
"""
front_cam_launcher: IncludeLaunchDescription = create_cam_launcher("front")
manip_cam_launcher: IncludeLaunchDescription = create_cam_launcher("manip")
bottom_cam_launcher: IncludeLaunchDescription = create_cam_launcher("bottom")
front_cam_launcher = create_cam_launcher('front')
manip_cam_launcher = create_cam_launcher('manip')
bottom_cam_launcher = create_cam_launcher('bottom')

return launch.launch_description.LaunchDescription(
[
Expand All @@ -32,31 +33,31 @@ def create_cam_launcher(
PythonLaunchDescriptionSource(
[
os.path.join(
get_package_share_directory("ros2_video_streamer"),
"launch",
"ros2_video_streamer_node_launch.py",
get_package_share_directory('ros2_video_streamer'),
'launch',
'ros2_video_streamer_node_launch.py',
)
]
),
# Need to set normally camera_name-derived args manually b/c ROS gets
# confused and makes them the same when we launch simultaneously
launch_arguments=[
("type", "video"),
('type', 'video'),
(
"camera_name",
f"{camera_name}_cam",
'camera_name',
f'{camera_name}_cam',
),
(
"image_topic_name",
f"/{camera_name}_cam/image_raw",
'image_topic_name',
f'/{camera_name}_cam/image_raw',
),
(
"info_topic_name",
f"/{camera_name}_cam/camera_info",
'info_topic_name',
f'/{camera_name}_cam/camera_info',
),
(
"file_name",
f"{camera_name}_cam.mp4",
'file_name',
f'{camera_name}_cam.mp4',
),
],
)
2 changes: 2 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>cv_bridge</depend>
<depend>python3-numpy</depend>

<export>
<build_type>ament_python</build_type>
Expand Down
69 changes: 36 additions & 33 deletions ros2_video_streamer/ros2_video_streamer_node.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import os

import cv2
import rclpy
from ament_index_python.packages import get_package_share_directory
from builtin_interfaces.msg import Time
import cv2
from cv2 import VideoCapture
from cv2.typing import MatLike
from cv_bridge import CvBridge
from numpy import generic
from numpy.typing import NDArray
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSPresetProfiles
from sensor_msgs.msg import CameraInfo, Image

Matlike = NDArray[generic]


class VideoStreamerNode(Node):
"""ROS Camera simulator Node; reads video file & pubs ROS Images."""

def __init__(self) -> None:
super().__init__(
"ros2_video_streamer_temp_name",
'ros2_video_streamer_temp_name',
parameter_overrides=[],
)

Expand All @@ -44,65 +47,65 @@ def __init__(self) -> None:
)

if not os.path.isfile(self.path):
raise RuntimeError(f"Invalid video path: {self.path}")
raise RuntimeError(f'Invalid video path: {self.path}')

if self.type == "video":
if self.type == 'video':
self.vc: VideoCapture = cv2.VideoCapture(self.path)
self.vc.set(
cv2.CAP_PROP_POS_MSEC,
self.start,
)
video_fps: float = self.vc.get(cv2.CAP_PROP_FPS)
elif self.type == "image":
elif self.type == 'image':
self.image = cv2.imread(self.path)
video_fps = 10
else:
raise ValueError(f"Unknown type: {self.type}")
raise ValueError(f'Unknown type: {self.type}')

self.timer = self.create_timer(
1.0 / video_fps,
self.image_callback,
)
self.get_logger().info(f"Publishing image at {video_fps} fps")
self.get_logger().info(f'Publishing image at {video_fps} fps')

def load_launch_parameters(
self,
) -> None:
"""Load the launch ROS parameters."""
self.declare_parameter(
"image_topic_name",
value="/simulated_cam/image_raw",
'image_topic_name',
value='/simulated_cam/image_raw',
)
self.declare_parameter(
"info_topic_name",
value="/simulated_cam/camera_info",
'info_topic_name',
value='/simulated_cam/camera_info',
)
self.declare_parameter(
"file_name",
value="simulated_cam.mp4",
'file_name',
value='simulated_cam.mp4',
)
# self.declare_parameter('config_file_path', value='')
self.declare_parameter("loop", value=True)
self.declare_parameter("frame_id", value="")
self.declare_parameter("type", value="")
self.declare_parameter("start", value=0)
self.declare_parameter('loop', value=True)
self.declare_parameter('frame_id', value='')
self.declare_parameter('type', value='')
self.declare_parameter('start', value=0)

self.image_topic_name = (
self.get_parameter("image_topic_name").get_parameter_value().string_value
self.get_parameter('image_topic_name').get_parameter_value().string_value
)
self.info_topic_name = (
self.get_parameter("info_topic_name").get_parameter_value().string_value
self.get_parameter('info_topic_name').get_parameter_value().string_value
)
self.file_name = self.get_parameter("file_name").get_parameter_value().string_value
self.file_name = self.get_parameter('file_name').get_parameter_value().string_value
# self.config_file_path = self.get_parameter('config_file_path')\
# .get_parameter_value().string_value
self.loop = self.get_parameter("loop").get_parameter_value().bool_value
self.frame_id_ = self.get_parameter("frame_id").get_parameter_value().string_value
self.type = self.get_parameter("type").get_parameter_value().string_value
self.start = self.get_parameter("start").get_parameter_value().integer_value
self.loop = self.get_parameter('loop').get_parameter_value().bool_value
self.frame_id_ = self.get_parameter('frame_id').get_parameter_value().string_value
self.type = self.get_parameter('type').get_parameter_value().string_value
self.start = self.get_parameter('start').get_parameter_value().integer_value

self.path = os.path.join(
get_package_share_directory("ros2_video_streamer"),
get_package_share_directory('ros2_video_streamer'),
self.file_name,
)

Expand Down Expand Up @@ -135,11 +138,11 @@ def load_launch_parameters(

def image_callback(self) -> None:
"""Process an image or frame of video."""
if self.type == "video":
if self.type == 'video':
rval, image = self.vc.read()

if not rval and not self.loop:
self.get_logger().info("End of video, closing node...")
self.get_logger().info('End of video, closing node...')
self.timer.cancel()
self.destroy_node()
exit()
Expand All @@ -152,10 +155,10 @@ def image_callback(self) -> None:
rval,
image,
) = self.vc.read()
elif self.type == "image":
elif self.type == 'image':
image = self.image
else:
raise ValueError(f"Unknown type: {self.type}")
raise ValueError(f'Unknown type: {self.type}')

time_msg = self.get_clock().now().to_msg()
img_msg = self.get_image_msg(image, time_msg)
Expand All @@ -166,7 +169,7 @@ def image_callback(self) -> None:

self.image_publisher_.publish(img_msg)

def get_image_msg(self, image: MatLike, time: Time) -> Image:
def get_image_msg(self, image: Matlike, time: Time) -> Image:
"""
Convert cv2 image to ROS2 Image with CvBridge cv2 -> image msg.
Expand All @@ -187,5 +190,5 @@ def main() -> None:
rclpy.shutdown()


if __name__ == "__main__":
if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from glob import glob
import os

from setuptools import find_packages, setup

Expand Down
2 changes: 1 addition & 1 deletion test/test_flake8.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from ament_flake8.main import main_with_errors
import pytest


@pytest.mark.flake8
Expand Down
Loading

0 comments on commit a0aaac1

Please sign in to comment.