OSOYOO PI CARキット ROS2化 #7 – 自律制御検討 YOLOv5 オブジェクト検知 その2

OSOYOO PI CAR

前回は、YOLOv5のインストールと動作確認で時間切れとなってしまいました。
今回は、実際にYOLOv5をROS2上に搭載していこうと思います。
前回の実験でYOLOv5を常に動作させておくのは、性能的に無理がありそうですし、常に検知させる必要もないと考えます。
そこで、オブジェクト検知は自律制御ノードから要求があったときにだけ応答を返す「サービス」として設計することにしました。
具体的には、「/detect_object_server」ノード内に「/detect_object_server/detect_object」サービスを作ります。

仕組みとしては、カメラ画像(/image_raw メッセージ)を常に取得しておき、オブジェクト検知要求時には、その時点の最新画像で検知を行うようなサービスを設計します。
オブジェクト検知対象は、その時の最新画像が対象となるはずなので、直接カメラ画像を解析することにしました。

オブジェクト検知時のリクエスト/レスポンス(サーバーメッセージ)ですが下記のように定義しました。ROS2のSRV定義をそのまま記載します
緑字部がリクエストを示し、赤字部がレスポンスを示します。

float64 confidence

DetectedObject[] objects









検知対象確信度:0.0-1.0

検知オブジェクト情報。検知したオブジェクトの数だけ情報を格納する。

DetectObjectは、以下のメンバーを持つ
name : 検知オブジェクト名称
sx:オブジェクト開始x座標
sy:オブジェクト開始y座標
ex:オブジェクト終了x座標
ey:オブジェクト終了y座標
confidence:確信度(0.0-1.0)

※座標は、画像の幅、高さ=1.0とした座標位置を示す
DetectObject.srv

たとえば、リクエスト confidenceに0.6を指定した場合、0.6以上の確信度のあるオブジェクトのみを対象に抽出した結果を返します。検知対象が何もなければ、空のリストを返します。

〇 仕組み

仕組みとしては、カメラ画像(/image_raw)をsubscribeして最新画像を常にキャプチャーしておきます。オブジェクト検知の要求受信時に、最新画像をYOLO v5に分析させて結果をレスポンスとして返します。YOLO v5と学習データは、ROS2ワークスペースに下記のように配置しておきます。

.
├── build
├── install
├── src
├── yolov5                <= yolov5 git clone環境
└── picar.pt              <= 学習データ

実行は、今まで通り、ROS2ワークスペースでpicarのランチャーを起動して起動します

$ ros2 launch picar_bringup picar.launch.py

メソッドは以下のように設計しています。
__init__()は、コンストラクタです。クラス内で使用する変数や各モジュールの初期化を行います。
callback_detect_object()は、「detect_object_server/detect_object」のサービス応答手続きです。
callback_image()は、最新カメラ画像のsubscribe通知手続きです。
処理上の主要箇所に色を付けています。

  • 青:
    Yolo v5を実行します。__init__内の箇所は、Yolo v5の初期化を行ってます。callback_detect_object()内の箇所ではYolo v5を実行しています。
  • 緑:
    最新画像をコピーして保持しています。
    subscribe()したメッセージはsubscribe()のコールバック内でのみ使用可能です。
    他のメソッドで使用するために、少し演算コストが高いですが、copy.deepcopy(msg)でメッセージをコピーしています。
#!/usr/bin/env python3
import torch
import cv2
import rclpy
import copy
from rclpy.node import Node
from cv_bridge import CvBridge

from sensor_msgs.msg import Image
from picar_interfaces.msg import DetectedObject
from picar_interfaces.srv import DetectObject

class DetectObjectServerNode(Node):
    def __init__(self):
        super().__init__("detect_object_server")

        # this crates cv2 briddge.
        self.cv_bridge_ = CvBridge()

        # this intializes status.
        self.update_ = False

        # this configures detection parameters.
        yolo_path = './yolov5'
        yolo_pt = './picar.pt'
        self.model_ = torch.hub.load(yolo_path, 'custom',
            path=yolo_pt, source='local');
        self.model_conf_ = 0.6

        # this subscribe image.
        self.image_subscription = self.create_subscription(
            Image, '/image_raw', self.callback_image, 10)

        # this creates detect_object service.
        self.detect_server_ = self.create_service(
            DetectObject, "detect_object_server/detect_object",
            self.callback_detect_object)

        self.get_logger().info("detect object server has been started.")

    def callback_detect_object(self, request, response):
        response.objects = []
        if self.update_ == False:
            return response

        try:
            img = self.cv_bridge_.imgmsg_to_cv2(self.latest_msg_)
        except CvBridgeError as err:
            return response

        self.model_.conf = request.confidence
        results = self.model_(img)
        for idx, obj in enumerate(results.pandas().xyxyn[0].itertuples()):
            new_object = DetectedObject()
            new_object.name = obj.name
            new_object.sx = obj.xmin
            new_object.sy = obj.ymin
            new_object.ex = obj.xmax
            new_object.ey = obj.ymax
            new_object.confidence = obj.confidence
            response.objects.append(new_object)
        return response

    def callback_image(self, msg):
        self.latest_msg_ = copy.deepcopy(msg)
        self.update_ = True

def main(args=None):
    rclpy.init(args=args)
    node = DetectObjectServerNode()
    rclpy.spin(node)
    rclpy.shutdown()


if __name__ == "__main__":
    main()

detect_object_serverノードができたので、既存のpicar.launch.pyに登録しておきます。
緑部が追加部分となります。

from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():
    ld = LaunchDescription()

    picar_vehicle_node = Node (
        package="picar_py_pkg",
        executable="picar_vehicle",
        name="picar_vehicle"
    )

    picar_v4l2_camera_node = Node (
        package="v4l2_camera",
        executable="v4l2_camera_node",
        name="v4l2_camera_node"
    )

    picar_rangefinder_node = Node (
        package="picar_py_pkg",
        executable="picar_rangefinder",
        name="picar_rangefinder"
    )

    picar_gimbal_node = Node (
        package="picar_py_pkg",
        executable="picar_gimbal",
        name="picar_gimbal"
    )

    picar_object_server_node = Node (
        package="picar_py_pkg",
        executable="picar_object_server",
        name="picar_object_server"
    )

    rosbridge_server = IncludeLaunchDescription(
        PathJoinSubstitution(
            [FindPackageShare("rosbridge_server"),
                "launch", "rosbridge_websocket_launch.xml"]),
    )

    ld.add_action(picar_vehicle_node)
    ld.add_action(picar_v4l2_camera_node)
    ld.add_action(picar_rangefinder_node)
    ld.add_action(picar_gimbal_node)
    ld.add_action(picar_object_server_node)
    ld.add_action(rosbridge_server)
    return ld

現状は自律制御するノードは存在しませんので、代わりにROS2のコマンド:ros2 service callを使用して、下記のように動作確認を行いました。

まずは、ランチャーを使って、下記のようにpicarを起動します。

$ ros2 launch picar_bringup picar.launch.py

次にサービスコールを行い、オブジェクト検知を試験します。

$ ros2 service call /detect_object_server/detect_object picar_interfaces/srv/DetectObject "{confidence: 0.6}"
requester: making request: picar_interfaces.srv.DetectObject_Request(confidence=0.6)

response:
picar_interfaces.srv.DetectObject_Response(objects=[picar_interfaces.msg.DetectedObject(name='tv', sx=0.0, sy=0.001197083736769855, ex=0.29605355858802795, ey=0.3411560356616974, confidence=0.8254019618034363), picar_interfaces.msg.DetectedObject(name='keyboard', sx=0.23415414988994598, sy=0.5285433530807495, ex=0.5003020167350769, ey=0.7530882358551025, confidence=0.796567976474762)])

この辺りのモノづくり環境は、ROS2ではよく考えられていて、標準で用意されているユーティリティを使ってサービスコールの評価が可能です。
ただ、やはりサービスコールに3~4秒かかるので、実際に自律制御で使う際には、これを考慮した制御が必要です。
サービスコール応答時間に関しては、もっと高速なプロセッサーを使えば短縮可能だと思います。

物体検知を動作させるには、検知対象を学習させる必要があります。
学習は、YOLOv5で提供される「train.py」を使用して行いますが、環境の準備、データの準備等少々大変なので、検知対象を決めたところで用意しようと思います。
私は、たまたま仕事の関連で、「てつまぐ」さんのYOLOv5初級講座をUdemyで受講する機会がありました。
私の学習環境は、その際に教えていただいたことをベースに「Google Drive + Google Colaboratory」上に設計しています。
自分オリジナルのアイデアや知識をベースとしたものではないので、今回は、触れずにおこうと思います。

※ わかりやすい講義でした。YOLOに興味のある方には、おすすめです。

YOLO v5をROS2のノードとして組みこむことができました。
次回は、少しYOLOを寝かせておいて、別の自律制御に必要な技術をお勉強しようと思います。
また、よろしければ見ていただけるとありがたいです。

タイトルとURLをコピーしました