DOBOT + 大寰AG95 + python上位机控制 (ROS)

0x01 开发需求

需求很简单,通过ROS对连接在CR机械臂法兰盘上的大寰AG-95进行控制。

如果是直接在机械臂上写项目,那么直接在写机械臂lua程序(该lua程序直接运行在机械臂控制柜上)过程中调用官方开发完成的包就好了。但是当我们选择使用ROS对机械臂进行控制的时候,这个包就不顶用了

ROS -(TCP控制指令)-> CR机械臂

0x02 DOBOT CR机械臂末端485透传说明

TCP连接到端口60000,随后发送数据给到该端口,数据会通过末端485转发到AG-95,反过来AG-95的485反馈也会回馈到TCP客户端。

0x03 Modbus-RTU

查阅大寰夹爪的文档,发现实际上大寰的控制协议就是Modbus-RTU协议。这里对Modbus-RTU协议做简要介绍:

| 01 | 06 | 01 00 | 00 A5 | 48 4D 
|地址|指令码|指令|数据|CRC16低|CRC16高

而手爪使用Modbus-RTU协议和手爪的上位机(机械臂)通信。

结合TCP-485透传,机械臂的上位机可以通过Modbus-RTU Over TCP和机械臂ip:60000号端口进行通讯,使用ModbusPull工具可以进行测试。具体测试可以参考“初始化手爪”栏目。后续即可通过普通的TCP客户端通过60000号端口控制末端夹爪,便于在ROS等环境下进行开发。

注:Modbus-TCP和Modbus-RTU Over TCP的区别在于:前者无校验位(因为TCP/IP已经实现了CRC校验),后者仍然需要发送CRC校验位。

当然,从实际开发角度来说,你也可以使用pymodbus这种库而非自行实现modbus-rtu协议。

0x04 功能实现

初始化手爪

可以裸发送 “01 06 01 00 00 A5 48 4D ” 实现。

或者:将256(HEX:01 00)地址设置为165(A5),手爪完成初始化。

初始化状态读取地址:0x0200(DEC:512),结果含义:0 -> 未初始化, 1 -> 初始化完成, 2 -> 初始化中

读取可以发送 "01 03 02 00 00 01 85 B2"(查询ID为1,地址0x0200,一位),随后判断返回值即可。

返回结果例子:

01 03 02 00 01 79 84   # 初始化完成
01 03 02 00 02 39 85   # 初始化中

设置手爪位置

位置从0到100(允许一位小数),先把该值乘以10,得到设置值

随后将该设置值写入线圈0x103(DEC:259)即可。

01 06 01 03 00 00 78 36 # 设置为0
01 06 01 03 03 E8 78 88 # 设置为100

如果是直接发,修改 01 03 后的两个HEX值即可(03 E8 = 1000),随后修改最后俩CRC16校验码。

读取就反过来,读取该线圈即可得到夹爪当前位置。

设置手爪力度

力度从0到100,Int,不需要和位置一样将值乘以10

随后将该数值写入线圈0x101(DEC:257)。

01 06 01 01 00 64 D8 1D 

读取就反过来,读取该线圈即可得到夹爪当前力度。

读取夹爪状态

状态线圈0x201(DEC:513),读取值意义如下:

0 -> 运动中
1 -> 到达设置位置
2 -> 夹住了物品
3 -> 物品掉落
-1 -> 错误

也可以发送原始值获取状态。

-> 01 03 02 01 00 01 D4 72
接收结果:
01 03 02 00 01 79 84 
        |-----| -> 这里的00 01就是结果

0x05 示例代码

这个是带ros的版本,如果不需要ros做控制直接删减ros部分也是一样的用。

import math
import threading
import time

from dh_ag95.srv import dh_ag95_init, dh_ag95_initResponse, dh_ag95_setpos, dh_ag95_setposResponse
from dh_ag95.srv import dh_ag95_setforce, dh_ag95_setforceResponse
from dh_ag95.srv import dh_ag95_get_status, dh_ag95_get_statusResponse
import rospy
import socket

modbus_rtu_over_tcp = socket.socket()
modbus_rtu_over_tcp_status = False
lock = threading.Lock()


def CRC16(datas: bytes) -> bytes:
    """
    转换为高低CRC16,方便MODBUS发送
    :param datas:
    :return: 返回高低八位, 按照modbus格式排序
    """
    crc16 = 0xFFFF
    poly = 0xA001
    for data in datas:
        crc16 = data ^ crc16
        for i in range(8):
            if 1 & crc16 == 1:
                crc16 = crc16 >> 1
                crc16 = crc16 ^ poly
            else:
                crc16 = crc16 >> 1
    result = bytes([crc16 & 0xFF, (crc16 >> 8) & 0xFF])
    return result


def with_CRC16(raw: bytes) -> bytes:
    crc = CRC16(raw)
    return raw + crc


def verify_CRC16(raw_: bytes) -> bool:
    """
    读取并校验。校验取后2位
    :param raw_: 原始数据,带crc
    :return: 是否ok
    """
    raw = CRC16(raw_[:-2])
    crc = raw[-2:]
    return raw == crc


def AG95_Init(address: int) -> bool:
    lock.acquire()
    if not address:
        address = 1
    # read init status first.
    while True:
        modbus_rtu_over_tcp.send(bytes([address, 0x03, 0x02, 0x00, 0x00, 0x01, 0x85, 0xB2]))
        status = modbus_rtu_over_tcp.recv(10)
        if not verify_CRC16(status):
            continue  # 重发
        if int(status[4]) == 1:  # 初始化完成
            lock.release()
            return True  # 已经完成初始化
        else:
            break
    raw = with_CRC16(bytes([address, 0x06, 0x01, 0x00, 0x00, 0xA5, 0x48, 0x4D]))
    modbus_rtu_over_tcp.send(raw)
    # read modbus status
    timeout = time.time() + 5
    while timeout > time.time():
        # read status
        modbus_rtu_over_tcp.send(bytes([address, 0x03, 0x02, 0x00, 0x00, 0x01, 0x85, 0xB2]))
        status = modbus_rtu_over_tcp.recv(10)
        if not verify_CRC16(status):  # CRC校验不通过
            continue  # 重发
        # 对得到的数值进行解析
        if int(status[4]) == 1:  # 初始化完成
            lock.release()
            return True
        rospy.sleep(0.25)
    lock.release()
    return False  # 超时


def AG95_Force(address: int, force: int):
    """
    设置AG95的力度
    :param address: 地址
    :param force: 整数,取值范围0-100
    :return:
    """
    if not address:
        address = 1
    if not 0 <= force <= 100:
        raise ValueError("force invalid.")
    raw = bytes([address, 0x06, 0x01, 0x01, 0x00, force])
    raw = with_CRC16(raw)
    lock.acquire()
    modbus_rtu_over_tcp.send(raw)
    # 读取AG95的当前力度,确认设置完成。

    lock.release()


def AG95_GetPos(address: int) -> int:
    """
    读取AG95夹爪当前的坐标
    :param address:
    :return:
    """


def AG95_Pos(address: int, pos: float) -> bool:
    """
    设置夹爪到对应位置,完成后返回。
    :param address: 地址
    :param pos: 小数,仅允许精确到小数点后一位。
    :return: bool, 是否运动成功
    """
    if not address:
        address = 1
    raw = bytes([address, 0x06, 0x01, 0x03])
    # 计算pos, 100-> 100.0 -> 1000;
    pos = round(pos, 1)  # 四舍五入
    pos = int(pos * 10)
    if not 0 <= pos <= 1000:
        raise ValueError("pos value invalid. should in 0 to 1000")
    raw += pos.to_bytes(length=2, byteorder="big")
    raw = with_CRC16(raw)
    lock.acquire()
    modbus_rtu_over_tcp.send(raw)
    # 读取AG95当前坐标
    timeout = time.time() + 5
    while timeout <= time.time():
        rospy.sleep(0.05)
        modbus_rtu_over_tcp.send(bytes([0x01, 0x03, 0x02, 0x02, 0x00, 0x01, 0x24, 0x72]))
        # 发送完成后等待接收
        status = modbus_rtu_over_tcp.recv(7)
        if not verify_CRC16(status):  # 校验CRC
            continue
        current_pos = int.from_bytes(status[3: 5], byteorder="big")
        if math.isclose(current_pos, pos):
            rospy.sleep(0.4)
            lock.release()
            return True
    lock.release()
    return False


def AG95_getStatus(address: int) -> int:
    """
    读取AG95夹爪当前的状态
    :param address:
    :return:
    返回值含义:
        0 -> 运动中
        1 -> 到达设置位置
        2 -> 夹住了物品
        3 -> 物品掉落
        -1 -> 错误
    """
    if not address:
        address = 1
    raw = bytes([address, 0x03, 0x02, 0x01, 0x00, 0x01, 0xD4, 0x72])  # 固定的,就不CRC校验了
    lock.acquire()
    modbus_rtu_over_tcp.send(raw)
    response = modbus_rtu_over_tcp.recv(7)  # 读取返回例子: 01 03 02 00 01 79 84
    lock.release()
    return int.from_bytes(response[3: 5], byteorder="big")


def handle_init(req: dh_ag95_init):
    res = AG95_Init(req.address)
    return dh_ag95_initResponse(int(res))


def handle_set_pos(req: dh_ag95_setpos):
    res = AG95_Pos(req.address, float(req.new_pos))
    return dh_ag95_setposResponse(int(res))


def handle_set_force(req: dh_ag95_setforce):
    res = AG95_Force(req.address, req.force)
    return dh_ag95_setforceResponse(int(1))


def handle_get_status(req: dh_ag95_get_status):
    return dh_ag95_get_statusResponse(AG95_getStatus(req.address))


if __name__ == "__main__":
    robot_ip = rospy.get_param("robot_ip", "192.168.5.1")
    robot_port = int(rospy.get_param("robot_port", 60000))
    modbus_rtu_over_tcp.connect((robot_ip, robot_port))
    modbus_rtu_over_tcp_status = True
    rospy.init_node('dh_ag95')
    rospy.Service('dh_ag95/init', dh_ag95_init, handle_init)
    rospy.Service('dh_ag95/set_pos', dh_ag95_setpos, handle_set_pos)
    rospy.Service('dh_ag95/set_force', dh_ag95_setforce, handle_set_force)
    rospy.Service('dh_ag95/get_status', dh_ag95_get_status, handle_get_status)
    print("waiting...")
    rospy.spin()
    modbus_rtu_over_tcp.close()

添加新评论