4.14 로봇 구독 및 게시 인터페이스
개요
SubPub 모듈은 로봇 컨트롤러의 WebSocket 기반 게시/subscribe 채널을 처리합니다. 연결을 설정하고 로봇 상태, 레지스터 및 I/O,와 같은 주제를 구독하며 콜백 또는 호출 차단을 통해 실시간 데이터를 전달합니다.
핵심 기능
- WebSocket 서버 연결 및 연결 끊기 지원
- 로봇 상태 데이터 구독 지원
- 데이터 등록 구독 지원
- IO 신호 데이터 구독 지원
- 콜백 기능을 통한 메시지 수신 지원
- 다음 메시지 수신 차단 지원
- 구독 빈도 설정 지원
- 메시지 수신 오류 처리 지원
사용 사례
- 호스트 PC에서 로봇 동작 상태 실시간 모니터링
- 로봇 데이터 시각화 구현
- 외부 시스템과의 데이터 연계
- 레지스터 및 IO 상태 실시간 모니터링
- 로봇 모니터링 및 제어 시스템 구축
- 로봇 상태에 대한 실시간 알람 및 알림 구현
4.14.1 WebSocket 서버에 연결
| 메서드 이름 | sub_pub.connect() -> StatusCodeEnum |
|---|---|
| 설명 | 로봇 컨트롤러 WebSocket 서버에 연결 |
| 요청 매개변수 | 없음 |
| 반환 값 | StatusCodeEnum: 연결 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.2 WebSocket 연결 끊기
| 메서드 이름 | sub_pub.disconnect() -> StatusCodeEnum |
|---|---|
| 설명 | 로봇 컨트롤러 WebSocket 서버 연결 끊기 |
| 요청 매개변수 | 없음 |
| 반환 값 | StatusCodeEnum: 연결 끊김 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.3 로봇 상태 구독 추가
| 메서드 이름 | sub_pub.subscribe_status( topics : List[RobotTopicType], frequency : int = 200) -> StatusCodeEnum |
|---|---|
| 설명 | 로봇 상태 데이터 구독 추가 |
| 요청 매개변수 | topics : List[RobotTopicType] 로봇 토픽 유형 목록 frequency : int 구독 빈도(Hz, 기본값 200) |
| 반환 값 | StatusCodeEnum: 구독 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.4 등록 구독 추가
| 메서드 이름 | sub_pub.subscribe_register( reg_type : RegTopicType, reg_ids : List[int], frequency : int = 200) -> StatusCodeEnum |
|---|---|
| 설명 | 등록 데이터 구독 추가 |
| 요청 매개변수 | reg_type : RegTopicType 레지스터 유형 reg_ids : List[int] 레지스터 ID 목록 frequency : int 구독 주파수(Hz, 기본값 200) |
| 반환 값 | StatusCodeEnum: 구독 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.5 IO 구독 추가
| 메서드 이름 | sub_pub.subscribe_io( io_list : List[tuple[IOTopicType, int]], frequency : int = 200) -> StatusCodeEnum |
|---|---|
| 설명 | 디지털 입력을 포함한 IO 신호 데이터 구독/outputs |
| 요청 매개변수 | io_list : List[tuple[IOTopicType, int]] IO 목록(각 요소는 (IO type, IO ID) 입니다) frequency : int 구독 빈도(Hz, 기본값 200) |
| 반환 값 | StatusCodeEnum: 구독 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.6 메시지 수신 시작
| 메서드 이름 | sub_pub.start_receiving( on_message_received : Callable[[Dict[str, Any]], None]) -> StatusCodeEnum |
|---|---|
| 설명 | 콜백 함수를 통해 구독 메시지 수신을 시작하고 수신된 데이터를 처리합니다. |
| 요청 매개변수 | on_message_received : Callable[[Dict[str, Any]], None] 메시지 수신 콜백 함수 |
| 반환 값 | StatusCodeEnum: 상태 코드 수신 중 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.7 수신 메시지 오류 처리
| 메서드 이름 | sub_pub.handle_receive_error() -> StatusCodeEnum |
|---|---|
| 설명 | 구독 메시지 수신 시 오류를 처리합니다. 수신 콜백에서 예외가 발생하면 루프를 종료합니다. |
| 요청 매개변수 | 없음 |
| 반환 값 | StatusCodeEnum: 상태 코드 수신 중 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
4.14.8 다음 메시지 수신
| 메서드 이름 | sub_pub.receive() -> tuple[Dict[str, Any], StatusCodeEnum] |
|---|---|
| 설명 | 다음 메시지를 수신하고 반환합니다. |
| 요청 매개변수 | 없음 |
| 반환 값 | tuple[Dict[str, Any], StatusCodeEnum]: 수신된 메시지 사전 및 상태 코드 |
| 호환 로봇 소프트웨어 버전 | 협업(Copper): v7.7.0.0+ 산업용(Bronze): v7.7.0.0+ |
예제 코드
py
#!python
"""
Copyright © 2016 Agilebot Robotics Ltd. All rights reserved.
Instruction: SubPub使用示例 / SubPub usage example
"""
import asyncio
import logging
from Agilebot import Arm, IOTopicType, RegTopicType, RobotTopicType, StatusCodeEnum
# 配置日志 / Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def message_handler(message):
"""
消息处理函数 / Message handler function
:param message: 接收到的消息字典 / Received message dictionary
"""
logger.info(f"收到消息 / Received message: {message}")
async def main():
"""
主函数,演示SubPub的使用 / Main function demonstrating SubPub usage
"""
# 创建Arm实例 / Create Arm instance
arm = Arm()
# 连接到控制器 / Connect to controller
ret = arm.connect("10.27.1.254")
if ret != StatusCodeEnum.OK:
logger.error(f"连接失败 / Connection failed: {ret}")
return
logger.info("Arm连接成功 / Arm connected successfully")
try:
# 连接WebSocket / Connect WebSocket
ret = await arm.sub_pub.connect()
if ret != StatusCodeEnum.OK:
logger.error(f"WebSocket连接失败 / WebSocket connection failed: {ret}")
return
logger.info("WebSocket连接成功 / WebSocket connected successfully")
# 订阅机器人状态 / Subscribe to robot status
topic_types = [
RobotTopicType.JOINT_POSITION,
RobotTopicType.CARTESIAN_POSITION,
RobotTopicType.ROBOT_STATUS,
RobotTopicType.CTRL_STATUS,
RobotTopicType.SERVO_STATUS,
RobotTopicType.INTERPRETER_STATUS,
RobotTopicType.TRAJECTORY_RECORD,
RobotTopicType.USER_OP_MODE,
RobotTopicType.USER_FRAME,
RobotTopicType.TOOL_FRAME,
RobotTopicType.VELOCITY_RATIO,
RobotTopicType.TRAJECTORY_RECORD,
]
ret = await arm.sub_pub.subscribe_status(topic_types, frequency=200)
if ret != StatusCodeEnum.OK:
logger.error(f"订阅机器人状态失败 / Failed to subscribe to robot status: {ret}")
return
logger.info("机器人状态订阅成功 / Robot status subscription successful")
# 订阅寄存器 / Subscribe to registers
ret = await arm.sub_pub.subscribe_register(RegTopicType.R, [1, 2, 3], frequency=200)
if ret != StatusCodeEnum.OK:
logger.error(f"订阅寄存器失败 / Failed to subscribe to registers: {ret}")
return
logger.info("寄存器订阅成功 / Register subscription successful")
# 订阅IO信号 / Subscribe to IO signals
io_list = [(IOTopicType.DI, 1), (IOTopicType.DO, 1)]
ret = await arm.sub_pub.subscribe_io(io_list, frequency=200)
if ret != StatusCodeEnum.OK:
logger.error(f"订阅IO失败 / Failed to subscribe to IO: {ret}")
return
logger.info("IO订阅成功 / IO subscription successful")
# 单次接收消息示例 / Single message receive example
logger.info("尝试单次接收消息 / Attempting single message receive")
try:
# 设置超时 / Set timeout
message, ret = await asyncio.wait_for(arm.sub_pub.receive(), timeout=5.0)
if ret == StatusCodeEnum.OK:
logger.info(f"单次接收消息成功 / Single message receive successful: {message}")
else:
logger.error(f"单次接收消息失败 / Single message receive failed: {ret}")
except asyncio.TimeoutError:
logger.warning("接收消息超时 / Message receive timeout")
# 启动消息接收 / Start message receiving
ret = await arm.sub_pub.start_receiving(message_handler)
if ret != StatusCodeEnum.OK:
logger.error(f"启动消息接收失败 / Failed to start message receiving: {ret}")
return
logger.info("开始接收消息 / Started receiving messages")
# 运行一段时间 / Run for a while
logger.info("运行10秒钟... / Running for 10 seconds...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"运行过程中发生错误 / Error occurred during execution: {str(e)}")
finally:
# 断开连接 / Disconnect
await arm.sub_pub.disconnect()
arm.disconnect()
logger.info("连接已断开 / Connection disconnected")
if __name__ == "__main__":
# 运行异步主函数 / Run async main function
asyncio.run(main())