# 참고자료
Write a ROS2 Service Client with Python - ROS2 Tutorial 11 (youtube.com)
1. 절차
2. 클라이언트 함수
1) client =
self.create_client("srv_type", "service_name", "qos_profile", "callback_group")
- 클라이언트 서비스 생성 함수
- srv_type :
서비스 타입. 클라이언트가 사용할 서비스의 요청과 응답메세지 구조를 정의하는 클래스
인터페이스와 타입을 정의 - service_name :
클라이언트가 연결할 서비스의 이름. 이름은 서비스 서버와 일치해야 함 - qos_profile :
서비스 품질, 버퍼값 설정 - callback_group :
클라이언트 콜백이 실행될 콜백함수
2) client.wait_for_service("time_out")
- 생성된 서비스 클라이언트 함
- 서버가 존재는지 확인 하는 함수
- "time_out" 기간동안 확인하고 없으면
- 서비스가 존재하지 않으면 False 반환
def call_set_pen_service(self, r, g, b, width, off) :
client = self.create_client(SetPen, "/turtle1/set_pen" )
while not client.wait_for_service(1.0) :
self.get_logger().warn("waiting for service ...")
3) future = client.call_async(<메세지 타입>)
- 클라이언트가 서버에 비동기적으로 서비스 요청을 보냄
- 클라이언트가 요청을 보내면 그 동안 다른 작업을 처리하는 비동기 함수
- <메세지 타입>을 서버에 전송하고 futrue 객체를 반환함
- future 객체는 서비스 응답의 완료 상태와 결과를 확인할 수 있음
- future 객체는 요청이 들어올 경우 실행하는 함수를 지정등 가능
3 - 1) future.add_done_callback(<callback>)
- 해당 퓨처 객체가 완료 되거나 취소 되을 때, 호출된<callback> 함수를 등록
- callback 함수에 클라이언트가 서버에게 요청에 대해 응답한 값을 인자로 넘김
2. 서버 함수
1) self.create_service("srv_name", "service_name", "callback","callback_group", "qos_profile")
- srv_type :
서비스 타입 정의 - service_name :
생성할 서비스 명 정의. 클라이언트는 이 이름을 통해 서버와 연결 - callback :
클라이언트에서 요청이 들어올 때 실행되는 함수 - callback_group :
옵션. 입력하지 않아도 됨
지정하지 않을 경우 기본 콜백 그룹이 사용
클라이언트 콜백이 실행될 콜백함수 - qos_profile :
옵션. 입력하지 않아도
서비스 품질, 버퍼값 설정
3. 파이썬 함수
1) partial( )
- 기존의 함수에 인자를 고정한 새로운 함수를 만드는 함수
from functiontools import partial
def fun(<인자_1>, <인자_2>) :
if <인자_1> == Ture :
return True, <인자2>
if <인자_1> == False :
return False, <인자2>
new_fun = partial(fun, True)
print(new_fun(<인자2>))
4. 코드 및 실행결과
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from turtlesim.srv import SetPen
from functools import partial
class TurtleControllerNode(Node):
def __init__(self) :
super().__init__("turtle_controller")
self.previous_x_ = 0
self.cmd_vel_publisher_ = self.create_publisher(
Twist,
"/turtle1/cmd_vel",
10
)
self.pose_subscriber_ = self.create_subscription(
Pose,
"/turtle1/pose",
self.pose_callback,
10
)
self.get_logger().info("turtle controlller has been started")
def pose_callback(self, pose :Pose) :
cmd = Twist()
if pose.x > 9.0 or pose.x < 2.0 or pose.y > 9.0 or pose.y < 2.0 :
cmd.linear.x = 1.0
cmd.angular.z =0.9
else :
cmd.linear.x = 5.0
cmd.angular.z = 0.0
self.cmd_vel_publisher_.publish(cmd)
if pose.x > 5.5 and self.previous_x_ <= 5.5 :
self.previous_x_ = pose.x
self.get_logger().info("Set color to red")
self.call_set_pen_service(255, 0, 0, 3, 0)
elif pose.x <= 5.5 and self.previous_x_ > 5.5 :
self.previous_x_ = pose.x
self.get_logger().info("set color to green")
self.call_set_pen_service(0, 255, 0, 3, 0)
def call_set_pen_service(self, r, g, b, width, off) :
client = self.create_client(SetPen, "/turtle1/set_pen" )
while not client.wait_for_service(1.0) :
self.get_logger().warn("waiting for service ...")
request = SetPen.Request()
request.r = r
request.g = g
request.b = b
request.width = width
request.off = off
future = client.call_async(request)
future.add_done_callback(partial(self.callback_set_pen))
def callback_set_pen(self,future) :
try :
response = future.result()
except Exception as e:
self.get_logger().error("service call failed : %r" %(e, ))
def main(args=None) :
rclpy.init(args=args)
node = TurtleControllerNode()
rclpy.spin(node)
rclpy.shutdown()
'프로그래밍 > ROS' 카테고리의 다른 글
ROS2 - realsense2 인터페이스 분석 (0) | 2024.08.26 |
---|---|
라즈베리파이5 /우분투24.04에 VSC 설치 (0) | 2024.08.20 |
ROS2 서비스 1 - 개요/명령어/실습 (0) | 2024.08.20 |
ROS2 intel Realsense - 라이브러리 분석 (0) | 2024.08.18 |
ROS2에 intel realsense D455 실행하기 (0) | 2024.08.14 |