Source code for miniarm_core.client

"""
Mini-Arm Client

Serial communication interface for the Mini-Arm robot.
"""

import time
import serial
import numpy as np
from typing import Optional, Union, List, Tuple

__all__ = ['MiniArmClient']


[docs] class MiniArmClient: """A client for the Mini-Arm robot arm that handles serial communication. This class provides methods for connecting to and controlling the Mini-Arm robot via serial communication. It can send commands and receive responses from the Raspberry Pi Pico running the Mini-Arm firmware. Parameters ---------- name : str, optional Name identifier for the client instance. Default is 'MiniArmClient'. port : str, optional Serial port to connect to. Default is 'COM3' (Windows). Use '/dev/ttyACM0' for Linux. baudrate : int, optional Baudrate for the serial connection. Default is 115200. command_delimiter : str, optional Character(s) used to terminate commands. Default is ';'. timeout : float, optional Serial read timeout in seconds. Default is 1.0. verbose : bool, optional Enable/disable verbose output logging. Default is False. Attributes ---------- connected : bool True if serial connection is established and open. s : serial.Serial The underlying serial connection object. Examples -------- Basic usage: >>> from mini_arm import MiniArmClient >>> client = MiniArmClient(port='COM3') >>> client.home() >>> pose = client.get_pose() >>> print(f"Current position: {pose}") >>> client.disconnect() Using context manager: >>> with MiniArmClient(port='COM3') as client: ... client.home() ... client.set_pose(0.135, 0.0, 0.22) """ # Default home position HOME_POSITION = [0.135, 0.0, 0.215] # Workspace limits (meters) WORKSPACE_LIMITS = { 'x': (0.05, 0.25), 'y': (-0.15, 0.15), 'z': (0.10, 0.30), } def __init__( self, name: str = 'MiniArmClient', port: str = 'COM3', baudrate: int = 115200, command_delimiter: str = ';', timeout: float = 1.0, verbose: bool = False ): self.name = name self.port = port self.baudrate = baudrate self.command_delimiter = command_delimiter self.timeout = timeout self.verbose = verbose # Internal state self.s: Optional[serial.Serial] = None self.connected: bool = False self._last_pose: Optional[np.ndarray] = None self._last_joints: Optional[List[float]] = None # Attempt connection self._connect() def _connect(self): """Establish serial connection to Mini-Arm.""" try: self.s = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout ) self.connected = self.s.is_open if self.connected: # Clear any startup messages time.sleep(0.5) self.get_buffer() self.logger(f"Connected to {self.port} at {self.baudrate} baud") except serial.SerialException as e: self.s = None self.connected = False self.logger(f"Failed to connect: {e}", warning=True)
[docs] def logger(self, *argv, warning: bool = False): """Log messages with timestamp. Parameters ---------- *argv : str Message components to join and log. warning : bool, optional If True, prefix message with '(Warning)'. Default is False. """ msg = ''.join(str(a) for a in argv) if warning: msg = '⚠️ ' + msg print(f"[{time.monotonic():.3f}][{self.name}] {msg}")
# ========================================================================= # Core Communication Methods # =========================================================================
[docs] def send_message(self, message: str) -> None: """Send a message to the Pico over serial. Automatically adds the command terminator if not present. Parameters ---------- message : str The command message to send. """ if not self.s or not self.s.is_open: self.logger("Serial connection not available", warning=True) return if not message.endswith(self.command_delimiter): message += self.command_delimiter try: self.s.write(message.encode()) if self.verbose: self.logger(f"TX: {message.strip()}") time.sleep(0.01) # 10ms delay to prevent buffer overflow except serial.SerialException as e: self.logger(f"Error sending: {e}", warning=True)
[docs] def send(self, command: str) -> Optional[str]: """Send a command and optionally read the response. Parameters ---------- command : str Command string to send to Mini-Arm. Returns ------- str or None Response from the device, if any. """ self.send_message(command) time.sleep(0.05) return self.get_buffer()
[docs] def get_buffer(self) -> Optional[str]: """Read all available bytes from the serial connection. Returns ------- str or None The buffer contents as a string, or None if no data available. """ if not self.s or not self.s.is_open: return None try: if self.s.in_waiting > 0: msg = "" while self.s.in_waiting > 0: msg += self.s.readline().decode(errors='ignore') time.sleep(0.01) if self.verbose and msg: self.logger(f"RX: {msg.strip()}") return msg.strip() if msg else None except serial.SerialException as e: self.logger(f"Error reading: {e}", warning=True) return None
[docs] def wait_for_response(self, timeout: float = 2.0) -> Optional[str]: """Wait for a response from the device. Parameters ---------- timeout : float Maximum time to wait in seconds. Returns ------- str or None Response string or None if timeout. """ start = time.time() while time.time() - start < timeout: response = self.get_buffer() if response: return response time.sleep(0.05) return None
# ========================================================================= # Motion Commands # =========================================================================
[docs] def home(self) -> None: """Move the robot arm to its home position.""" self.send_message("home") if self.verbose: self.logger("Moving to home position")
[docs] def ik_home(self) -> None: """Move to IK home position (all joints at 0).""" self.send_message("ikhome")
[docs] def set_pose( self, x: float, y: float, z: float, roll: float = 0.0, pitch: float = 0.0, yaw: float = 0.0 ) -> None: """Move the end-effector to a Cartesian position. Parameters ---------- x : float X coordinate in meters. y : float Y coordinate in meters. z : float Z coordinate in meters. roll : float, optional Roll angle in radians. Default is 0.0. pitch : float, optional Pitch angle in radians. Default is 0.0. yaw : float, optional Yaw angle in radians. Default is 0.0. """ pose_str = f"[{x},{y},{z},{roll},{pitch},{yaw}]" self.send_message(f"set_pose:{pose_str}") if self.verbose: self.logger(f"Moving to pose: ({x:.3f}, {y:.3f}, {z:.3f})")
[docs] def set_pose_xyz(self, position: Union[List[float], Tuple[float, float, float]]) -> None: """Move to XYZ position (convenience method). Parameters ---------- position : list or tuple [x, y, z] coordinates in meters. """ if len(position) >= 3: self.set_pose(position[0], position[1], position[2]) else: self.logger("Position must have at least 3 elements (x, y, z)", warning=True)
[docs] def set_delta_pose( self, dx: float = 0.0, dy: float = 0.0, dz: float = 0.0, droll: float = 0.0, dpitch: float = 0.0, dyaw: float = 0.0 ) -> None: """Move the end-effector by a relative delta. Parameters ---------- dx : float Delta X in meters. dy : float Delta Y in meters. dz : float Delta Z in meters. droll : float, optional Delta roll in radians. dpitch : float, optional Delta pitch in radians. dyaw : float, optional Delta yaw in radians. """ delta_str = f"[{dx},{dy},{dz},{droll},{dpitch},{dyaw}]" self.send_message(f"set_delta_pose:{delta_str}") if self.verbose: self.logger(f"Delta move: ({dx:.4f}, {dy:.4f}, {dz:.4f})")
[docs] def get_pose(self) -> Optional[np.ndarray]: """Get the current end-effector pose. Returns ------- np.ndarray or None Array [x, y, z, roll, pitch, yaw] or None if reading fails. """ self.send_message("get_pose") time.sleep(0.1) data = self.get_buffer() if data: try: # Parse response: "cords: [x: 0.135, y: 0.000, z: 0.215]angles: [...]" if 'cords:' in data: coords_part = data.split('cords:')[1].split('angles:')[0] coords_part = coords_part.replace('[', '').replace(']', '').strip() pose = [] for p in coords_part.split(','): if ':' in p: pose.append(float(p.split(':')[1].strip())) if len(pose) >= 3: self._last_pose = np.array(pose[:3]) return self._last_pose except Exception as e: if self.verbose: self.logger(f"Error parsing pose: {e}", warning=True) return self._last_pose
[docs] def get_current_pose(self) -> Optional[np.ndarray]: """Alias for get_pose() for backward compatibility.""" return self.get_pose()
# ========================================================================= # Joint Commands # =========================================================================
[docs] def set_joint(self, joint_index: int, angle: float) -> None: """Move a single joint to a specified angle. Parameters ---------- joint_index : int Joint index (0-5 for arm joints, 6 for gripper). angle : float Target angle in degrees. """ self.send_message(f"movemotor:{joint_index}:{angle}") if self.verbose: self.logger(f"Joint {joint_index} -> {angle}°")
[docs] def set_joints(self, angles: List[float]) -> None: """Set all joint angles. Parameters ---------- angles : list of float Joint angles in degrees [j0, j1, j2, j3, j4, j5] or [j0, j1, j2, j3, j4, j5, gripper]. """ angles_str = ','.join(str(a) for a in angles) self.send_message(f"movemotors:[{angles_str}]") if self.verbose: self.logger(f"Setting joints: {angles}")
[docs] def get_joints(self) -> Optional[List[float]]: """Get current joint angles. Returns ------- list of float or None Joint angles in degrees, or None if reading fails. """ self.send_message("get_joints") time.sleep(0.1) data = self.get_buffer() if data: try: # Parse joint values from response data = data.replace('[', '').replace(']', '') joints = [float(j.strip()) for j in data.split(',') if j.strip()] if joints: self._last_joints = joints return joints except Exception as e: if self.verbose: self.logger(f"Error parsing joints: {e}", warning=True) return self._last_joints
# ========================================================================= # Gripper Commands # =========================================================================
[docs] def set_gripper(self, value: Union[float, str]) -> None: """Control the gripper. Parameters ---------- value : float or str Gripper position (0-180 degrees) or state ('open', 'close'). """ self.send_message(f"set_gripper:{value}") if self.verbose: self.logger(f"Gripper -> {value}")
[docs] def gripper_open(self) -> None: """Open the gripper fully.""" self.set_gripper("open")
[docs] def gripper_close(self) -> None: """Close the gripper fully.""" self.set_gripper("close")
# ========================================================================= # LED Commands # =========================================================================
[docs] def set_led(self, r: int, g: int, b: int) -> None: """Set the RGB LED color. Parameters ---------- r : int Red value (0-255). g : int Green value (0-255). b : int Blue value (0-255). """ self.send_message(f"set_led:[{r},{g},{b}]") if self.verbose: self.logger(f"LED -> RGB({r}, {g}, {b})")
[docs] def led_off(self) -> None: """Turn off the LED.""" self.set_led(0, 0, 0)
[docs] def led_red(self) -> None: """Set LED to red.""" self.set_led(255, 0, 0)
[docs] def led_green(self) -> None: """Set LED to green.""" self.set_led(0, 255, 0)
[docs] def led_blue(self) -> None: """Set LED to blue.""" self.set_led(0, 0, 255)
# ========================================================================= # Trajectory Commands # =========================================================================
[docs] def start_trajectory(self, trajectory: str = "circle", repeat: bool = False) -> None: """Start a predefined trajectory. Parameters ---------- trajectory : str Trajectory name (e.g., 'circle') or list of points. repeat : bool Whether to repeat the trajectory continuously. """ repeat_str = "true" if repeat else "false" self.send_message(f"trajectory:{trajectory}:{repeat_str}") if self.verbose: self.logger(f"Starting trajectory: {trajectory} (repeat={repeat})")
[docs] def stop_trajectory(self) -> None: """Stop any running trajectory.""" self.send_message("stop") if self.verbose: self.logger("Stopping trajectory")
# ========================================================================= # System Commands # =========================================================================
[docs] def get_info(self) -> Optional[str]: """Get robot system information. Returns ------- str or None Robot info string. """ self.send_message("info") time.sleep(0.2) info = self.get_buffer() if info: print(info) return info
[docs] def get_help(self) -> Optional[str]: """Get list of available commands. Returns ------- str or None Help text with available commands. """ self.send_message("help") time.sleep(0.3) help_text = self.get_buffer() if help_text: print(help_text) return help_text
[docs] def set_debug(self, enabled: bool) -> None: """Enable or disable debug mode on the robot. Parameters ---------- enabled : bool True to enable debug output, False to disable. """ mode = "on" if enabled else "off" self.send_message(f"debug:{mode}") if self.verbose: self.logger(f"Debug mode: {mode}")
[docs] def set_debug_mode(self, mode: Union[bool, str]) -> None: """Alias for set_debug() for backward compatibility.""" if isinstance(mode, bool): self.set_debug(mode) else: enabled = mode.lower() in ('on', 'true', '1') self.set_debug(enabled)
[docs] def set_rate(self, hz: int) -> None: """Set the main loop rate on the robot. Parameters ---------- hz : int Loop rate in Hz. """ self.send_message(f"set_rate:{hz}") if self.verbose: self.logger(f"Loop rate set to {hz} Hz")
[docs] def test(self) -> None: """Send a test command to verify communication.""" self.send_message("test")
[docs] def play_music(self, song: str) -> None: """Play a music file on the robot. Parameters ---------- song : str Name of the song to play. """ self.send_message(f"play_music:{song}")
[docs] def read_fsr(self) -> Optional[str]: """Read force-sensitive resistor values. Returns ------- str or None FSR sensor values. """ self.send_message("fsr") time.sleep(0.1) return self.get_buffer()
# ========================================================================= # Xbox Controller Support # =========================================================================
[docs] def send_controller_data(self, controller_data: str) -> None: """Send Xbox controller data for teleoperation. Parameters ---------- controller_data : str Formatted controller state string. """ self.send_message(f"controller:{controller_data}")
# ========================================================================= # Connection Management # =========================================================================
[docs] def send_ctrl_c(self) -> None: """Send Ctrl+C to interrupt current operation.""" if self.s: self.s.write(b'\x03') time.sleep(0.01)
[docs] def send_ctrl_d(self) -> None: """Send Ctrl+D (soft reset).""" if self.s: self.s.write(b'\x04') time.sleep(0.01)
[docs] def reconnect(self) -> bool: """Attempt to reconnect to the robot. Returns ------- bool True if reconnection successful. """ self.disconnect() time.sleep(0.5) self._connect() return self.connected
[docs] def disconnect(self) -> None: """Close the serial connection.""" if self.s: try: self.s.close() except: pass self.connected = False self.logger("Disconnected")
def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.disconnect() return False def __repr__(self) -> str: status = "connected" if self.connected else "disconnected" return f"MiniArmClient(port='{self.port}', {status})"