Skip to content

4.14 机器人订阅发布接口

说明

SubPub 模块提供机器人控制器 WebSocket 的订阅 / 发布通道管理能力,负责建立连接、订阅机器人状态 / 寄存器 / IO 等主题,并通过回调或阻塞接口接收实时数据。借助该模块可在上位机监听机器人运行信息、实现数据可视化及与外部系统联动。

4.14.1 连接到 WebSocket 服务器

方法名sub_pub.connect() -> StatusCodeEnum
描述连接到机器人控制器的 WebSocket 服务器
请求参数
返回值StatusCodeEnum:连接状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.2 断开 WebSocket 服务器

方法名sub_pub.disconnect() -> StatusCodeEnum
描述断开与机器人控制器 WebSocket 服务器的连接
请求参数
返回值StatusCodeEnum:断开状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.3 添加机器人状态订阅

方法名sub_pub.subscribe_status( topics : List[RobotTopicType], frequency : int = 200) -> StatusCodeEnum
描述添加机器人状态数据订阅
请求参数topics : List[RobotTopicType] 需要订阅的机器人主题类型列表
frequency : int 订阅频率,单位 Hz,默认 200
返回值StatusCodeEnum:订阅状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.4 添加寄存器订阅

方法名sub_pub.subscribe_register( reg_type : RegTopicType, reg_ids : List[int], frequency : int = 200) -> StatusCodeEnum
描述添加寄存器数据订阅
请求参数reg_type : RegTopicType 寄存器类型
reg_ids : List [int] 需要订阅的寄存器 ID 列表
frequency : int 订阅频率,单位 Hz,默认 200
返回值StatusCodeEnum:订阅状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.5 添加 IO 订阅

方法名sub_pub.subscribe_io( io_list : List[tuple[IOTopicType, int]], frequency : int = 200) -> StatusCodeEnum
描述订阅 IO 信号数据,包括数字输入 / 输出等
请求参数io_list : List[tuple[IOTopicType, int]] IO 列表,每个元素为 (IO 类型, IO ID)
frequency : int 订阅频率,单位 Hz,默认 200
返回值StatusCodeEnum:订阅状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.6 开始接收消息

方法名sub_pub.start_receiving( on_message_received : Callable[[Dict[str, Any]], None]) -> StatusCodeEnum
描述开始接收订阅消息,并通过回调函数处理接收到的数据
请求参数on_message_received : Callable [[Dict [str, Any]], None] 消息接收回调函数
返回值StatusCodeEnum:接收状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.7 接收订阅消息的错误

方法名sub_pub.handle_receive_error() -> StatusCodeEnum
描述处理接收订阅消息的错误,当接收消息回调函数抛出异常时跳出循环
请求参数
返回值StatusCodeEnum:接收状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

4.14.8 接收下一条消息

方法名sub_pub.receive() -> tuple[Dict[str, Any], StatusCodeEnum]
描述接收下一条消息并返回
请求参数
返回值tuple [Dict [str, Any], StatusCodeEnum]:接收到的消息字典和状态码
兼容的机器人软件版本协作 (Copper): v7.7.0.0+
工业 (Bronze): v7.7.0.0+

示例代码

sub_pub.py
py
#!python
"""
Copyright © 2016 Agilebot Robotics Ltd. All rights reserved.
Instruction: SubPub使用示例 / SubPub usage example
"""

import asyncio
import logging

from Agilebot import Arm, IOTopicType, RegTopicType, RobotTopicType, StatusCodeEnum

# 配置日志 / Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def message_handler(message):
    """
    消息处理函数 / Message handler function

    :param message: 接收到的消息字典 / Received message dictionary
    """
    logger.info(f"收到消息 / Received message: {message}")


async def main():
    """
    主函数,演示SubPub的使用 / Main function demonstrating SubPub usage
    """
    # 创建Arm实例 / Create Arm instance
    arm = Arm()

    # 连接到控制器 / Connect to controller
    ret = arm.connect("10.27.1.254")
    if ret != StatusCodeEnum.OK:
        logger.error(f"连接失败 / Connection failed: {ret}")
        return

    logger.info("Arm连接成功 / Arm connected successfully")

    try:
        # 连接WebSocket / Connect WebSocket
        ret = await arm.sub_pub.connect()
        if ret != StatusCodeEnum.OK:
            logger.error(f"WebSocket连接失败 / WebSocket connection failed: {ret}")
            return

        logger.info("WebSocket连接成功 / WebSocket connected successfully")

        # 订阅机器人状态 / Subscribe to robot status
        topic_types = [
            RobotTopicType.JOINT_POSITION,
            RobotTopicType.CARTESIAN_POSITION,
            RobotTopicType.ROBOT_STATUS,
            RobotTopicType.CTRL_STATUS,
            RobotTopicType.SERVO_STATUS,
            RobotTopicType.INTERPRETER_STATUS,
            RobotTopicType.TRAJECTORY_RECORD,
            RobotTopicType.USER_OP_MODE,
            RobotTopicType.USER_FRAME,
            RobotTopicType.TOOL_FRAME,
            RobotTopicType.VELOCITY_RATIO,
            RobotTopicType.TRAJECTORY_RECORD,
        ]

        ret = await arm.sub_pub.subscribe_status(topic_types, frequency=200)
        if ret != StatusCodeEnum.OK:
            logger.error(f"订阅机器人状态失败 / Failed to subscribe to robot status: {ret}")
            return

        logger.info("机器人状态订阅成功 / Robot status subscription successful")

        # 订阅寄存器 / Subscribe to registers
        ret = await arm.sub_pub.subscribe_register(RegTopicType.R, [1, 2, 3], frequency=200)
        if ret != StatusCodeEnum.OK:
            logger.error(f"订阅寄存器失败 / Failed to subscribe to registers: {ret}")
            return

        logger.info("寄存器订阅成功 / Register subscription successful")

        # 订阅IO信号 / Subscribe to IO signals
        io_list = [(IOTopicType.DI, 1), (IOTopicType.DO, 1)]

        ret = await arm.sub_pub.subscribe_io(io_list, frequency=200)
        if ret != StatusCodeEnum.OK:
            logger.error(f"订阅IO失败 / Failed to subscribe to IO: {ret}")
            return

        logger.info("IO订阅成功 / IO subscription successful")

        # 单次接收消息示例 / Single message receive example
        logger.info("尝试单次接收消息 / Attempting single message receive")
        try:
            # 设置超时 / Set timeout
            message, ret = await asyncio.wait_for(arm.sub_pub.receive(), timeout=5.0)
            if ret == StatusCodeEnum.OK:
                logger.info(f"单次接收消息成功 / Single message receive successful: {message}")
            else:
                logger.error(f"单次接收消息失败 / Single message receive failed: {ret}")
        except asyncio.TimeoutError:
            logger.warning("接收消息超时 / Message receive timeout")

        # 启动消息接收 / Start message receiving
        ret = await arm.sub_pub.start_receiving(message_handler)
        if ret != StatusCodeEnum.OK:
            logger.error(f"启动消息接收失败 / Failed to start message receiving: {ret}")
            return

        logger.info("开始接收消息 / Started receiving messages")

        # 运行一段时间 / Run for a while
        logger.info("运行10秒钟... / Running for 10 seconds...")
        await asyncio.sleep(10)

    except Exception as e:
        logger.error(f"运行过程中发生错误 / Error occurred during execution: {str(e)}")

    finally:
        # 断开连接 / Disconnect
        await arm.sub_pub.disconnect()
        arm.disconnect()
        logger.info("连接已断开 / Connection disconnected")


if __name__ == "__main__":
    # 运行异步主函数 / Run async main function
    asyncio.run(main())