DOBOT + 大寰AG95 + python上位机控制 (ROS)
0x01 开发需求
需求很简单,通过ROS对连接在CR机械臂法兰盘上的大寰AG-95进行控制。
如果是直接在机械臂上写项目,那么直接在写机械臂lua程序(该lua程序直接运行在机械臂控制柜上)过程中调用官方开发完成的包就好了。但是当我们选择使用ROS对机械臂进行控制的时候,这个包就不顶用了
ROS -(TCP控制指令)-> CR机械臂
0x02 DOBOT CR机械臂末端485透传说明
TCP连接到端口60000,随后发送数据给到该端口,数据会通过末端485转发到AG-95,反过来AG-95的485反馈也会回馈到TCP客户端。
0x03 Modbus-RTU
查阅大寰夹爪的文档,发现实际上大寰的控制协议就是Modbus-RTU协议。这里对Modbus-RTU协议做简要介绍:
| 01 | 06 | 01 00 | 00 A5 | 48 4D
|地址|指令码|指令|数据|CRC16低|CRC16高
而手爪使用Modbus-RTU协议和手爪的上位机(机械臂)通信。
结合TCP-485透传,机械臂的上位机可以通过Modbus-RTU Over TCP和机械臂ip:60000号端口进行通讯,使用ModbusPull工具可以进行测试。具体测试可以参考“初始化手爪”栏目。后续即可通过普通的TCP客户端通过60000号端口控制末端夹爪,便于在ROS等环境下进行开发。
注:Modbus-TCP和Modbus-RTU Over TCP的区别在于:前者无校验位(因为TCP/IP已经实现了CRC校验),后者仍然需要发送CRC校验位。
当然,从实际开发角度来说,你也可以使用pymodbus这种库而非自行实现modbus-rtu协议。
0x04 功能实现
初始化手爪
可以裸发送 “01 06 01 00 00 A5 48 4D ” 实现。
或者:将256(HEX:01 00)地址设置为165(A5),手爪完成初始化。
初始化状态读取地址:0x0200(DEC:512),结果含义:0 -> 未初始化, 1 -> 初始化完成, 2 -> 初始化中
读取可以发送 "01 03 02 00 00 01 85 B2"(查询ID为1,地址0x0200,一位),随后判断返回值即可。
返回结果例子:
01 03 02 00 01 79 84 # 初始化完成
01 03 02 00 02 39 85 # 初始化中
设置手爪位置
位置从0到100(允许一位小数),先把该值乘以10,得到设置值
随后将该设置值写入线圈0x103(DEC:259)即可。
01 06 01 03 00 00 78 36 # 设置为0
01 06 01 03 03 E8 78 88 # 设置为100
如果是直接发,修改 01 03 后的两个HEX值即可(03 E8 = 1000),随后修改最后俩CRC16校验码。
读取就反过来,读取该线圈即可得到夹爪当前位置。
设置手爪力度
力度从0到100,Int,不需要和位置一样将值乘以10
随后将该数值写入线圈0x101(DEC:257)。
01 06 01 01 00 64 D8 1D
读取就反过来,读取该线圈即可得到夹爪当前力度。
读取夹爪状态
状态线圈0x201(DEC:513),读取值意义如下:
0 -> 运动中
1 -> 到达设置位置
2 -> 夹住了物品
3 -> 物品掉落
-1 -> 错误
也可以发送原始值获取状态。
-> 01 03 02 01 00 01 D4 72
接收结果:
01 03 02 00 01 79 84
|-----| -> 这里的00 01就是结果
0x05 示例代码
这个是带ros的版本,如果不需要ros做控制直接删减ros部分也是一样的用。
import math
import threading
import time
from dh_ag95.srv import dh_ag95_init, dh_ag95_initResponse, dh_ag95_setpos, dh_ag95_setposResponse
from dh_ag95.srv import dh_ag95_setforce, dh_ag95_setforceResponse
from dh_ag95.srv import dh_ag95_get_status, dh_ag95_get_statusResponse
import rospy
import socket
modbus_rtu_over_tcp = socket.socket()
modbus_rtu_over_tcp_status = False
lock = threading.Lock()
def CRC16(datas: bytes) -> bytes:
"""
转换为高低CRC16,方便MODBUS发送
:param datas:
:return: 返回高低八位, 按照modbus格式排序
"""
crc16 = 0xFFFF
poly = 0xA001
for data in datas:
crc16 = data ^ crc16
for i in range(8):
if 1 & crc16 == 1:
crc16 = crc16 >> 1
crc16 = crc16 ^ poly
else:
crc16 = crc16 >> 1
result = bytes([crc16 & 0xFF, (crc16 >> 8) & 0xFF])
return result
def with_CRC16(raw: bytes) -> bytes:
crc = CRC16(raw)
return raw + crc
def verify_CRC16(raw_: bytes) -> bool:
"""
读取并校验。校验取后2位
:param raw_: 原始数据,带crc
:return: 是否ok
"""
raw = CRC16(raw_[:-2])
crc = raw[-2:]
return raw == crc
def AG95_Init(address: int) -> bool:
lock.acquire()
if not address:
address = 1
# read init status first.
while True:
modbus_rtu_over_tcp.send(bytes([address, 0x03, 0x02, 0x00, 0x00, 0x01, 0x85, 0xB2]))
status = modbus_rtu_over_tcp.recv(10)
if not verify_CRC16(status):
continue # 重发
if int(status[4]) == 1: # 初始化完成
lock.release()
return True # 已经完成初始化
else:
break
raw = with_CRC16(bytes([address, 0x06, 0x01, 0x00, 0x00, 0xA5, 0x48, 0x4D]))
modbus_rtu_over_tcp.send(raw)
# read modbus status
timeout = time.time() + 5
while timeout > time.time():
# read status
modbus_rtu_over_tcp.send(bytes([address, 0x03, 0x02, 0x00, 0x00, 0x01, 0x85, 0xB2]))
status = modbus_rtu_over_tcp.recv(10)
if not verify_CRC16(status): # CRC校验不通过
continue # 重发
# 对得到的数值进行解析
if int(status[4]) == 1: # 初始化完成
lock.release()
return True
rospy.sleep(0.25)
lock.release()
return False # 超时
def AG95_Force(address: int, force: int):
"""
设置AG95的力度
:param address: 地址
:param force: 整数,取值范围0-100
:return:
"""
if not address:
address = 1
if not 0 <= force <= 100:
raise ValueError("force invalid.")
raw = bytes([address, 0x06, 0x01, 0x01, 0x00, force])
raw = with_CRC16(raw)
lock.acquire()
modbus_rtu_over_tcp.send(raw)
# 读取AG95的当前力度,确认设置完成。
lock.release()
def AG95_GetPos(address: int) -> int:
"""
读取AG95夹爪当前的坐标
:param address:
:return:
"""
def AG95_Pos(address: int, pos: float) -> bool:
"""
设置夹爪到对应位置,完成后返回。
:param address: 地址
:param pos: 小数,仅允许精确到小数点后一位。
:return: bool, 是否运动成功
"""
if not address:
address = 1
raw = bytes([address, 0x06, 0x01, 0x03])
# 计算pos, 100-> 100.0 -> 1000;
pos = round(pos, 1) # 四舍五入
pos = int(pos * 10)
if not 0 <= pos <= 1000:
raise ValueError("pos value invalid. should in 0 to 1000")
raw += pos.to_bytes(length=2, byteorder="big")
raw = with_CRC16(raw)
lock.acquire()
modbus_rtu_over_tcp.send(raw)
# 读取AG95当前坐标
timeout = time.time() + 5
while timeout <= time.time():
rospy.sleep(0.05)
modbus_rtu_over_tcp.send(bytes([0x01, 0x03, 0x02, 0x02, 0x00, 0x01, 0x24, 0x72]))
# 发送完成后等待接收
status = modbus_rtu_over_tcp.recv(7)
if not verify_CRC16(status): # 校验CRC
continue
current_pos = int.from_bytes(status[3: 5], byteorder="big")
if math.isclose(current_pos, pos):
rospy.sleep(0.4)
lock.release()
return True
lock.release()
return False
def AG95_getStatus(address: int) -> int:
"""
读取AG95夹爪当前的状态
:param address:
:return:
返回值含义:
0 -> 运动中
1 -> 到达设置位置
2 -> 夹住了物品
3 -> 物品掉落
-1 -> 错误
"""
if not address:
address = 1
raw = bytes([address, 0x03, 0x02, 0x01, 0x00, 0x01, 0xD4, 0x72]) # 固定的,就不CRC校验了
lock.acquire()
modbus_rtu_over_tcp.send(raw)
response = modbus_rtu_over_tcp.recv(7) # 读取返回例子: 01 03 02 00 01 79 84
lock.release()
return int.from_bytes(response[3: 5], byteorder="big")
def handle_init(req: dh_ag95_init):
res = AG95_Init(req.address)
return dh_ag95_initResponse(int(res))
def handle_set_pos(req: dh_ag95_setpos):
res = AG95_Pos(req.address, float(req.new_pos))
return dh_ag95_setposResponse(int(res))
def handle_set_force(req: dh_ag95_setforce):
res = AG95_Force(req.address, req.force)
return dh_ag95_setforceResponse(int(1))
def handle_get_status(req: dh_ag95_get_status):
return dh_ag95_get_statusResponse(AG95_getStatus(req.address))
if __name__ == "__main__":
robot_ip = rospy.get_param("robot_ip", "192.168.5.1")
robot_port = int(rospy.get_param("robot_port", 60000))
modbus_rtu_over_tcp.connect((robot_ip, robot_port))
modbus_rtu_over_tcp_status = True
rospy.init_node('dh_ag95')
rospy.Service('dh_ag95/init', dh_ag95_init, handle_init)
rospy.Service('dh_ag95/set_pos', dh_ag95_setpos, handle_set_pos)
rospy.Service('dh_ag95/set_force', dh_ag95_setforce, handle_set_force)
rospy.Service('dh_ag95/get_status', dh_ag95_get_status, handle_get_status)
print("waiting...")
rospy.spin()
modbus_rtu_over_tcp.close()