Skip to content

Commit

Permalink
Added some interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
anla-xu committed Jul 21, 2022
1 parent b9ea148 commit 0ba5648
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 66 deletions.
112 changes: 74 additions & 38 deletions pymycobot/Interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def _mesg(self, genre, *args, **kwargs):
command_data = self._encode_int16(command_data)

LEN = len(command_data) + 1
check_digit = sum(command_data[1:]) + genre
if check_digit >= 256:
check_digit %= 256
check_digit = (sum(command_data[1:]) + genre) & 0xff
# if check_digit >= 256:
# check_digit %= 256
command = [
ProtocolCode.HEADER,
ProtocolCode.HEADER,
Expand Down Expand Up @@ -843,33 +843,6 @@ def get_joint_current(self, id, joint_id):
"""
return self._mesg(ProtocolCode.GET_JOINT_CURRENT, id, joint_id)

# Basic
def set_basic_mode(self, pin_no, pin_mode):
"""Set base IO, input or output mode
Args:
pin_no: 1 - 5
pin_mode: 0 - input 1 - output
"""
return self._mesg(ProtocolCode.SET_BASIC_OUTPUT, 0, pin_no, pin_mode)

def set_basic_output(self, pin_no, pin_signal):
"""Set basic output.
Args:
pin_no: pin port number (0 - 20).
pin_signal: 0 / 1 (0 - low, 1 - high)
"""
return self._mesg(ProtocolCode.GET_BASIC_INPUT, 0, pin_no, pin_signal)

def get_basic_input(self, pin_no):
"""Get basic input for M5 version.
Args:
pin_no: (int) pin port number (0 - 20).
"""
return self._mesg(ProtocolCode.GET_BASE_INPUT, 0, pin_no, has_reply=True)

def get_plan_speed(self, id = 0):
"""Get planning speed
Expand Down Expand Up @@ -953,8 +926,8 @@ def get_servo_temps(self, id):
"""
return self._mesg(0xE6, id, has_reply=True)

def get_base_coords(self, coords, arm):
"""Convert coordinates to base coordinates
def get_base_coords(self, *args: int):
"""Convert coordinates to base coordinates. Pass in parameters or no parameters
Args:
coords: a list of coords value(List[float]), length 6 [x(mm), y, z, rx(angle), ry, rz]
Expand All @@ -963,12 +936,16 @@ def get_base_coords(self, coords, arm):
Return:
Base coords
"""
coord_list = []
for idx in range(3):
coord_list.append(self._coord2int(coords[idx]))
for angle in coords[3:]:
coord_list.append(self._angle2int(angle))
return self._mesg(ProtocolCode.GET_BASE_COORDS, 0, coord_list, arm, has_reply = True)
if len(args) == 2:
coords, arm = args
coord_list = []
for idx in range(3):
coord_list.append(self._coord2int(coords[idx]))
for angle in coords[3:]:
coord_list.append(self._angle2int(angle))
return self._mesg(ProtocolCode.GET_BASE_COORDS, 0, coord_list, arm, has_reply = True)
elif len(args) == 0:
return self._mesg(ProtocolCode.GET_ALL_BASE_COORDS, 0, has_reply = True)

def base_to_single_coords(self, base_coords, arm):
"""Convert base coordinates to coordinates
Expand Down Expand Up @@ -1002,6 +979,65 @@ def collision(self, left_angles, right_angles):

return self._mesg(ProtocolCode.COLLISION, 0, degrees1, degrees2, has_reply = True)

def get_base_coord(self, id):
"""Get the base coordinates of the single arm
Args:
id: 1/2 (L/R)
"""
return self._mesg(ProtocolCode.GET_BASE_COORD, id, has_reply = True)

def write_base_coord(self, id, axis, coord, speed):
"""Base single coordinate movement
Args:
id: 1/2 (L/R)
axis: 1 - 6 (x/y/z/rx/ry/rz)
coord: Coordinate value
speed: 1 - 100
"""
value = self._coord2int(coord) if axis <= 3 else self._angle2int(coord)
return self._mesg(ProtocolCode.WRITE_BASE_COORD, id, axis, [value], speed)

def write_base_coords(self, id, coords, speed):
"""base coordinate move
Args:
id: 1/2 (L/R)
coords: coords: a list of coords value(List[float]), length 6, [x(mm), y, z, rx(angle), ry, rz]
speed: 1 - 100
"""
coord_list = []
for idx in range(3):
coord_list.append(self._coord2int(coords[idx]))
for angle in coords[3:]:
coord_list.append(self._angle2int(angle))
return self._mesg(ProtocolCode.WRITE_BASE_COORDS, id, coord_list, speed)

def jog_inc_coord(self, axis, increment, speed):
"""Double-arm coordinated coordinate stepping
Args:
axis: 1 - 6 (x/y/z/rx/ry/rz)
increment:
speed: 1 - 100
"""
value = self._coord2int(increment) if axis <= 3 else self._angle2int(increment)
return self._mesg(ProtocolCode.JOG_INC_COORD, 0, [value], speed)

def collision_switch(self, state):
"""Collision Detection Switch
Args:
state (int): 0 - close 1 - open (Off by default)
"""
return self._mesg(ProtocolCode.COLLISION_SWITCH, state)

def is_collision_on(self):
"""Get collision detection status"""
return self._mesg(ProtocolCode.IS_COLLISION_ON, 0, has_reply = True)


# def init_iic(self):
# from smbus2 import SMBus
# i2c = SMBus(1) # 1 代表 /dev/i2c-1
2 changes: 1 addition & 1 deletion pymycobot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"MyPalletizerLite"
]

__version__ = "2.9.0b3"
__version__ = "2.9.0b6"
__author__ = "Elephantrobotics"
__email__ = "[email protected]"
__git_url__ = "https://github.com/elephantrobotics/pymycobot"
Expand Down
9 changes: 5 additions & 4 deletions pymycobot/bluet.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# coding: utf-8
import bluetooth
import sys

class BluetoothConnection:
def __init__(self, bd_address=None, port=None):
self.device = []
import bluetooth
self.bluetooth = bluetooth
self.target_name = "mybuddy"
self.nearby_devices = None
self.bd_address = bd_address
self.port = port

def find_target_device(self):
available_addr = []
self.nearby_devices = bluetooth.discover_devices(lookup_names=True, duration=5)
self.nearby_devices = self.bluetooth.discover_devices(lookup_names=True, duration=5)
if self.nearby_devices:
for addr, name in self.nearby_devices:
if self.target_name == name:
Expand All @@ -22,7 +23,7 @@ def find_target_device(self):

def connect_target_device(self):
if self.bd_address:
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock = self.bluetooth.BluetoothSocket(self.bluetooth.RFCOMM)
sock.connect((self.bd_address, self.port))
return sock
target_address = self.find_target_device()
Expand All @@ -36,7 +37,7 @@ def connect_target_device(self):
sys.stdout.write(device_info)
choose_device = input("please enter 1-{}:".format(len(target_address)))
target_address = target_address[int(choose_device)-1][0]
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock = self.bluetooth.BluetoothSocket(self.bluetooth.RFCOMM)
try:
sock.connect((target_address[0][0], 1))
return sock
Expand Down
11 changes: 9 additions & 2 deletions pymycobot/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ class ProtocolCode(object):
GET_BASE_COORDS = 0xF0
BASE_TO_SINGLE_COORDS = 0xF1
COLLISION = 0xF2
GET_BASE_COORD = 0xF3
GET_ALL_BASE_COORDS = 0xF4
WRITE_BASE_COORD = 0xF5
WRITE_BASE_COORDS = 0xF6
JOG_INC_COORD = 0xF7
COLLISION_SWITCH = 0xF8
IS_COLLISION_ON = 0xF9

# IIC
# SET_IIC_STATE = 0xA4
Expand Down Expand Up @@ -299,7 +306,7 @@ def write(self, command, method=None):
if method == "socket":
data = b""
if self.rasp:
self.sock.sendall(str(command).encode())
self.sock.send(str(command).encode())
else:
self.sock.sendall(bytes(command))

Expand All @@ -315,7 +322,7 @@ def write(self, command, method=None):
break
else:
try:
self.sock.settimeout(0.2)
self.sock.settimeout(1)
data = self.sock.recv(1024)
except:
data = b''
Expand Down
45 changes: 38 additions & 7 deletions pymycobot/mybuddy.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ def _mesg(self, genre, *args, **kwargs):
elif genre in [ProtocolCode.GET_ANGLES]:
return [self._int2angle(angle) for angle in res]
elif genre in [ProtocolCode.GET_ANGLE]:
return self._process_single(self._int2angle(angle) for angle in res)
return self._int2angle(res[0]) if res else None
elif genre in [ProtocolCode.GET_COORD]:
if real_command[5] < 4:
return self._int2coord(res[0]) if res else None
else:
return self._int2angle(res[0]) if res else None
elif genre in [ProtocolCode.GET_COORDS, ProtocolCode.GET_TOOL_REFERENCE, ProtocolCode.GET_WORLD_REFERENCE, ProtocolCode.GET_BASE_COORDS, ProtocolCode.BASE_TO_SINGLE_COORDS]:
if res:
r = []
Expand Down Expand Up @@ -203,22 +208,48 @@ def send_radians(self, id, radians, speed):
return self._mesg(ProtocolCode.SEND_ANGLES, id, degrees, speed)

# Basic for raspberry pi.
def gpio_init(self):
"""Init GPIO module, and set BCM mode."""
def set_gpio_mode(self, pin_no, mode):
"""Init GPIO module, and set BCM mode.
Args:
pin_no: (int)pin number.
mode: 0 - input 1 - output
"""
import RPi.GPIO as GPIO # type: ignore

GPIO.setmode(GPIO.BCM)
self.gpio = GPIO
self.gpio.setmode(GPIO.BCM)
if mode == 1:
self.gpio.setup(pin_no, self.gpio.OUT)
else:
self.gpio.setup(pin_no, self.gpio.IN)


def gpio_output(self, pin, v):
def set_gpio_output(self, pin, v):
"""Set GPIO output value.
Args:
pin: (int)pin number.
v: (int) 0 / 1
"""
self.gpio.setup(pin, self.gpio.OUT)
self.gpio.output(pin, v)

def set_gpio_input(self, pin):
"""Set GPIO input value.
Args:
pin: (int)pin number.
"""
self.gpio.input(pin)

def set_gpio_pwm(self, pin, baud, dc):
"""Set GPIO PWM value.
Args:
pin: (int)pin number.
baud: (int) 10 - 1000000
dc: (int) 0 - 100
"""
self.gpio.PWM(pin, baud, dc)

# Other
def wait(self, t):
Expand Down
53 changes: 39 additions & 14 deletions pymycobot/mybuddybluetooth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# coding: utf-8
import bluetooth

from pymycobot.Interface import MyBuddyCommandGenerator
from pymycobot.common import ProtocolCode, write, read
Expand All @@ -9,17 +8,32 @@

class MyBuddyBlueTooth(MyBuddyCommandGenerator):
"""MyBuddy bluetooth API"""
_write = write
def __init__(self, bt_address=None, port = 10):
"""There is a default Bluetooth search time of 5 seconds"""
"""When bt_address is the default value of None, enter the Bluetooth search mode. There is a default Bluetooth search time of 5 seconds"""
super(MyBuddyBlueTooth).__init__()
self.ble = BluetoothConnection(bt_address, port)
self.ble_obj = self.ble.connect_target_device()
self.sock = self.ble.connect_target_device()

def __read(self):
while True:
data = self.ble_obj.recv(1024)
if data:
return data
def connect(self, serialport="/dev/ttyAMA0", baudrate="1000000", timeout='0.2'):
"""Connect the robot arm through the serial port and baud rate
Args:
serialport: (str) default /dev/ttyAMA0
baudrate: default 1000000
timeout: default 0.1
"""
self.rasp = True
self._write(serialport, "socket")
self._write(baudrate, "socket")
self._write(timeout, "socket")
# self._write([254, 254, 1, 2, 32, 32], "socket")
# self._write([254, 254, 1, 2, 32, 32], "socket")
# self._write([254, 254, 1, 2, 32, 32], "socket")

# self._write(timeout, "socket")
# self._write(timeout, "socket")


def _mesg(self, genre, *args, **kwargs):
Expand All @@ -36,13 +50,14 @@ def _mesg(self, genre, *args, **kwargs):
"""
real_command, has_reply = super(
MyBuddyBlueTooth, self)._mesg(genre, *args, **kwargs)
self.ble_obj.write(bytes(self._flatten(real_command)))
data = self._write(self._flatten(real_command), "socket")

if has_reply:
data = self.__read()
if data:
res = self._process_received(data, genre, arm=12)
if genre in [
ProtocolCode.ROBOT_VERSION,
ProtocolCode.SOFTWARE_VERSION,
ProtocolCode.GET_ROBOT_ID,
ProtocolCode.IS_POWER_ON,
ProtocolCode.IS_CONTROLLER_CONNECTED,
ProtocolCode.IS_PAUSED, # TODO have bug: return b''
Expand All @@ -67,20 +82,30 @@ def _mesg(self, genre, *args, **kwargs):
return self._process_single(res)
elif genre in [ProtocolCode.GET_ANGLES]:
return [self._int2angle(angle) for angle in res]
elif genre in [ProtocolCode.GET_COORDS, ProtocolCode.GET_TOOL_REFERENCE, ProtocolCode.GET_WORLD_REFERENCE]:
elif genre in [ProtocolCode.GET_ANGLE]:
return self._process_single(self._int2angle(angle) for angle in res)
elif genre in [ProtocolCode.GET_COORD]:
if real_command[5] < 4:
return self._int2coord(res[0])
else:
return self._int2angle(res[0])
elif genre in [ProtocolCode.GET_COORDS, ProtocolCode.GET_TOOL_REFERENCE, ProtocolCode.GET_WORLD_REFERENCE, ProtocolCode.GET_BASE_COORDS, ProtocolCode.BASE_TO_SINGLE_COORDS]:
if res:
r = []
for idx in range(3):
r.append(self._int2coord(res[idx]))
for idx in range(3, 6):
r.append(self._int2angle(res[idx]))
return r
else:
else:
return res
elif genre in [ProtocolCode.GET_SERVO_VOLTAGES]:
elif genre in [ProtocolCode.GET_SERVO_VOLTAGES, ProtocolCode.COLLISION]:
return [self._int2coord(angle) for angle in res]
else:
return res
return None

def close(self):
self._write("close","socket")
self.sock.close()

0 comments on commit 0ba5648

Please sign in to comment.