액션 서버 및 클라이언트 작성하기 (Python)

목표: Python으로 액션 서버와 클라이언트를 구현합니다.

배경

액션은 ROS 2에서의 비동기 통신 형태입니다. 액션 클라이언트액션 서버 에 목표 요청을 보냅니다. 액션 서버 는 목표 피드백과 결과를 액션 클라이언트 에 보냅니다.

필수 사항

이 튜토리얼을 진행하려면 이전 튜토리얼에서 정의한 action_tutorials_interfaces 패키지와 Fibonacci.action 인터페이스가 필요합니다. 액션 생성하기.

작업

1 액션 서버 작성

우리는 액션 서버를 작성하는 것에 집중할 것이며, 이 서버는 액션 생성하기 튜토리얼에서 만든 액션을 사용하여 피보나치 수열을 계산합니다.

지금까지 패키지를 만들고 ros2 run 을 사용하여 노드를 실행했습니다. 그러나 이 튜토리얼에서는 액션 서버를 단일 파일로 제한하겠습니다. 액션 튜토리얼 패키지의 완전한 구현을 보려면 action_tutorials 를 확인하십시오.

홈 디렉토리에서 새 파일을 열어 fibonacci_action_server.py 라고 이름을 지정하고 다음 코드를 추가하십시오.

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

8행은 FibonacciActionServer 클래스를 정의하며 이 클래스는 Node 의 하위 클래스입니다. 클래스는 Node 생성자를 호출하여 노드 이름을 fibonacci_action_server 로 지정하여 초기화됩니다.

생성자에서 새로운 액션 서버도 인스턴스화합니다.

        super().__init__('fibonacci_action_server')

액션 서버에는 네 가지 인수가 필요합니다.

  1. 액션 클라이언트를 추가할 ROS 2 노드: self

  2. 액션 유형: Fibonacci (5행에서 가져옴)

  3. 액션 이름: 'fibonacci'

  4. 수락된 목표를 실행하는 콜백 함수: self.execute_callback. 이 콜백은 액션 유형에 대한 결과 메시지를 반환해야합니다.

우리는 또한 클래스 내에서 execute_callback 메소드를 정의합니다.

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

이 메소드는 수락된 목표를 실행하는 데 호출됩니다.

이제 액션 서버를 실행해 봅시다.

python3 fibonacci_action_server.py

다른 터미널에서 목표를 보내려면 명령 줄 인터페이스를 사용할 수 있습니다.

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

액션 서버를 실행 중인 터미널에서 “Executing goal…”이라는 메시지와 목표 상태가 설정되지 않았다는 경고 메시지를 볼 수 있어야 합니다. 기본적으로 목표 핸들 상태가 실행 콜백에서 설정되지 않으면 aborted 상태로 간주됩니다.

목표가 수락되었음을 나타내려면 목표 핸들의 succeed() 메서드를 사용할 수 있습니다.

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

액션 서버를 다시 시작하고 다른 목표를 보내면 목표가 SUCCEEDED 상태로 완료되는 것을 확인할 수 있어야 합니다.

이제 목표 실행을 실제로 계산하고 요청된 피보나치 수열을 반환하도록 만들어 봅시다.

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

수열을 계산한 후 결과 메시지 필드에 할당하고 반환하기 전에 이를 수행합니다.

다시 액션 서버를 시작하고 다른 목표를 보내면 목표가 올바른 결과 시퀀스로 완료되는 것을 확인할 수 있어야 합니다.

1.2 피드백 게시

액션에서 제공하는 좋은 기능 중 하나는 목표 실행 중에 액션 클라이언트에 피드백을 제공할 수 있다는 것입니다. 액션 서버는 목표 핸들의 publish_feedback() 메서드를 호출하여 액션 클라이언트에 피드백을 게시할 수 있습니다.

우리는 sequence 변수를 대체하고 피드백 메시지를 사용하여 시퀀스를 저장합니다. for 루프의 갱신 후에 피드백 메시지를 게시하고 감정 효과를 위해 잠시 대기합니다.

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

액션 서버를 다시 시작한 후 명령 줄 도구를 사용하여 --feedback 옵션과 함께 피보나치 액션을 보내면 피드백이 이제 게시되는 것을 확인할 수 있습니다.

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2 액션 클라이언트 작성

액션 클라이언트도 단일 파일로 제한하겠습니다. 새 파일을 열고 fibonacci_action_client.py 라고 이름을 지정하고 다음 보일러플레이트 코드를 추가하십시오.

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

우리는 FibonacciActionClient 라는 클래스를 정의했으며 이 클래스는 Node 의 하위 클래스입니다. 클래스는 Node 생성자를 호출하여 노드 이름을 fibonacci_action_client 로 지정하여 초기화됩니다.

또한 클래스 생성자에서 우리는 이전 튜토리얼 액션 생성하기 에서 정의한 사용자 정의 액션 정의를 사용하여 액션 클라이언트를 생성합니다.

        super().__init__('fibonacci_action_client')

액션 클라이언트를 만들려면 세 가지 인수를 전달합니다.

  1. 액션 클라이언트를 추가할 ROS 2 노드: self

  2. 액션 유형: Fibonacci

  3. 액션 이름: 'fibonacci'

우리의 액션 클라이언트는 동일한 액션 이름과 유형의 액션 서버와 통신할 수 있습니다.

또한 FibonacciActionClient 클래스에서 send_goal 메소드를 정의합니다.

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

이 메소드는 액션 서버가 사용 가능할 때까지 기다리고 목표를 서버로 보냅니다. 나중에 기다릴 수 있는 퓨처를 반환합니다.

클래스 정의 이후에는 ROS 2를 초기화하고 FibonacciActionClient 노드의 인스턴스를 생성하고 목표를 보내고 해당 목표가 완료될 때까지 대기하는 main() 함수를 정의합니다.

마지막으로 Python 프로그램의 진입점에서 main() 을 호출합니다.

이제 액션 클라이언트를 테스트 해 봅시다. 먼저 이전에 빌드한 액션 서버를 실행하세요.

python3 fibonacci_action_server.py

다른 터미널에서 액션 클라이언트를 실행하세요.

python3 fibonacci_action_client.py

액션 서버가 목표를 성공적으로 실행하는 동안 메시지가 출력되어야 합니다.

액션 클라이언트는 목표를 보낼 수 있지만 언제 완료되었는지 어떻게 알 수 있을까요? 결과 정보를 얻을 수 있습니다. 먼저 목표를 보낸 목표 핸들을 가져와야 합니다. 그런 다음 목표 핸들을 사용하여 결과를 요청할 수 있습니다.

다음은 이 예제의 전체 코드입니다.

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

ActionClient.send_goal_async() 메서드는 목표 핸들로의 퓨처를 반환합니다. 먼저 퓨처가 완료될 때 호출될 콜백을 등록합니다.

        self._send_goal_future.add_done_callback(self.goal_response_callback)

퓨처는 액션 서버가 목표 요청을 수락하거나 거부할 때 완료됩니다. 이때 goal_response_callback 를 자세히 살펴봅시다. 목표가 거부되었는지 확인하고 조기에 반환하도록 퓨처가 완료되었는지 확인할 수 있습니다.

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

목표 핸들을 얻었으므로 get_result_async() 메서드를 사용하여 결과를 요청할 수 있습니다. 목표가 준비되면 완료될 퓨처가 반환됩니다. 목표 응답과 마찬가지로 목표 결과에 대한 콜백을 등록해 보겠습니다.

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

콜백에서는 결과 시퀀스를 기록하고 깨끗한 종료를 위해 ROS 2를 종료합니다.

액션 서버가 별도의 터미널에서 실행 중인 상태에서 피보나치 액션 클라이언트를 실행해 보세요!

python3 fibonacci_action_client.py

목표가 수락되었고 최종 결과가 기록되는 메시지가 표시되어야 합니다.

2.2 피드백 받기

액션 클라이언트는 목표를 보낼 수 있습니다. 좋아요! 그러나 액션 서버에서 보내는 목표에 대한 피드백을 얻을 수 있는 것이 좋을 것입니다.

다음은 이 예제의 전체 코드입니다.

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

피드백 메시지에 대한 콜백 함수는 다음과 같습니다.

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

콜백에서는 메시지의 피드백 부분을 가져오고 화면에 partial_sequence 필드를 출력합니다.

액션 클라이언트에서 액션 클라이언트에 콜백을 등록하는 방법은 목표를 보낼 때 액션 클라이언트에 콜백을 추가로 전달하는 것입니다.

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

이제 모든 준비가 되었습니다. 액션 클라이언트를 실행하면 화면에 피드백이 출력되어야 합니다.

요약

이 튜토리얼에서는 Python 액션 서버 및 액션 클라이언트를 단계별로 작성하고 목표, 피드백 및 결과를 교환하도록 구성했습니다.

관련 콘텐츠

  • Python으로 액션 서버 및 클라이언트를 작성하는 방법은 다양합니다. ros2/examples 저장소의 minimal_action_serverminimal_action_client 패키지를 확인하십시오.

  • ROS 액션에 대한 자세한 정보는 디자인 문서 를 참조하십시오.