"""This module provides implementations for sending values through a communication port."""
import logging
from typing import Union, Literal
from ..utils.decorators import register
__all__ = ["get_port", "BasePort", "SerialPort", "ParallelPort", "DummyPort"]
logger = logging.getLogger(__name__)
PORTS = {}
PortLiteral = Union[Literal["serial"], Literal["parallel"], Literal["dummy"], str]
[docs]
def get_port(name: PortLiteral):
"""Get a port implementation by name (e.g. serial, parallel, dummy)."""
port = PORTS.get(name, None)
if port is None:
raise ValueError(
f"Unknown port: {name}. Available port implementations: {list(PORTS.keys())}"
)
[docs]
class BasePort:
"""Base class for a port."""
[docs]
def __init__(self, address, log: bool = True):
self.address = address
self.log = log
[docs]
def send(self, value):
"""Send a trigger code through the port."""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
def reset(self):
"""Reset the port (e.g., set trigger to 0)."""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
def close(self):
"""Close the port connection."""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
def encode(self, value):
"""Encode a value to send through the port."""
return value
def __repr__(self):
return f"{self.__class__.__name__}({self.address})"
def _log(self, message):
if self.log:
logger.info(f"{self}: {message}")
def __del__(self):
self.close()
[docs]
@register("serial", PORTS)
class SerialPort(BasePort):
"""
Serial port implementation using pyserial.
This class provides an implementation of a serial communication port using the
pyserial library. It supports sending data as either integers (converted to a
single byte) or as bytes, resetting the port to a predefined state, and closing
the connection.
Note
----
This class is based on `pyserial` and requires it to be installed. You can install
it via pip.
Examples
--------
Using a Windows COM port:
>>> sp = SerialPort(address='COM3', baudrate=9600, timeout=1)
>>> sp.send(255) # Sends the byte corresponding to 255.
>>> sp.send(b'\xff') # Sends the byte directly.
>>> sp.reset() # Resets the port.
>>> sp.close() # Closes the connection.
Using a Unix-like system port:
>>> sp = SerialPort(address='/dev/ttyUSB0', baudrate=115200)
>>> sp.send(128) # Sends the byte corresponding to 128.
>>> sp.send(b'\x80') # Sends the byte directly.
"""
[docs]
def __init__(
self,
address: str,
baudrate: int = 115200,
reset_value: bytes = b"\x00",
log: bool = True,
**kwargs,
):
"""
Initialize the SerialPort instance.
Parameters
----------
address : Optional[str], optional
The serial port address (e.g., 'COM3' on Windows or '/dev/ttyUSB0' on Unix-like systems).
Defaults to None.
baudrate : int, optional
The baud rate for the serial connection. Defaults to 115200.
reset_value : bytes, optional
The byte sequence used to reset the port. Defaults to b"\\x00".
**kwargs
Additional keyword arguments to pass to the serial.Serial constructor (e.g., timeout).
Raises
------
ImportError
If the pyserial library is not installed.
Examples
--------
>>> sp = SerialPort(address='COM3', baudrate=9600, timeout=1)
>>> sp = SerialPort(address='/dev/ttyUSB0', baudrate=115200)
"""
try:
import serial
except ImportError:
raise ImportError("pyserial is required for SerialPort.")
super().__init__(address, log=log)
self.reset_value = reset_value
self.connection = serial.Serial(address, baudrate=baudrate, **kwargs)
[docs]
def send(self, value: Union[int, bytes]):
"""
Send a value over the serial port.
If an integer is provided, it is converted to a single byte using big-endian byte order.
If a multi-byte conversion is required, convert the value to bytes before calling this method.
Parameters
----------
value : Union[int, bytes]
The value to send. If an integer is provided, it will be converted to a single byte.
For other sizes or multi-byte values, pass the value as bytes.
Examples
--------
>>> sp = SerialPort(address='COM3')
>>> sp.send(255) # Converts 255 to a single byte and sends it.
>>> sp.send(b'\xff') # Sends the byte directly.
"""
if self.connection is None:
raise RuntimeError("The port is closed.")
value = self.encode(value)
self.connection.write(value)
self._log(f"Sent value: {value}")
[docs]
def encode(self, value: Union[int, bytes]) -> bytes:
"""Encode a value as bytes."""
if isinstance(value, int):
value = hex(value)[2:4].encode()
return value
[docs]
def reset(self):
"""
Reset the serial port by sending the predefined reset value.
This is typically used to return the port to a known, idle state.
"""
self.connection.write(self.reset_value)
[docs]
def close(self):
"""
Close the serial port connection.
This method should be called to properly release the serial port when it is no longer needed.
"""
if self.connection is not None:
self.connection.close()
del self.connection
self.connection = None
self._log("Closed port connection")
[docs]
@register("parallel", PORTS)
class ParallelPort(BasePort):
"""
Parallel port implementation using pyparallel.
This class provides an implementation for sending triggers via a parallel port using the
pyparallel library. It supports sending data as either integers or as bytes. If an integer
is provided, it is converted to a single byte using big-endian byte order. For multi-byte
values, send the data as bytes directly.
Note
----
This class is based on pyparallel and requires it to be installed. You can install it via pip:
pip install pyparallel
Examples
--------
Using a typical PC parallel port address:
>>> pp = ParallelPort(address='0x378')
>>> pp.send(128) # Converts 128 to a single byte and sends it.
>>> pp.send(b'\x80') # Sends the byte directly.
>>> pp.reset() # Resets the port (sets data to 0).
Using an alternative port address:
>>> pp2 = ParallelPort(address='0x3BC')
>>> pp2.send(255)
"""
[docs]
def __init__(self, address: str, reset_value: int = 0, log: bool = True, **kwargs):
"""
Initialize the ParallelPort instance.
Parameters
----------
address : str
The address of the parallel port (e.g., '0x378' is a common base address on x86 systems).
**kwargs
Additional keyword arguments for compatibility with pyparallel.
"""
try:
import parallel
except ImportError:
raise ImportError(
"pyparallel is required for ParallelPort. Install it via 'pip install pyparallel'."
)
super().__init__(address, log=log)
self.reset_value = reset_value
# Convert hexadecimal string addresses to integer, if applicable.
try:
if isinstance(address, str) and address.startswith("0x"):
addr_int = int(address, 16)
else:
addr_int = int(address)
except (ValueError, TypeError):
addr_int = address # If conversion fails, use address as provided.
self._port = parallel.Parallel(addr_int, **kwargs)
[docs]
def send(self, value: Union[int, bytes]):
"""
Send a value over the parallel port.
If an integer is provided, it is converted to a single byte using big-endian byte order.
If a different byte size is required, send the data as bytes directly.
Parameters
----------
value : Union[int, bytes]
The value to send. If an integer is provided, it will be converted to a single byte.
For multi-byte values, provide the data as bytes.
Examples
--------
>>> pp = ParallelPort(address='0x378')
>>> pp.send(128) # Converts 128 to a single byte and sends it.
>>> pp.send(b'\x80') # Sends the byte directly.
"""
if self._port is None:
raise RuntimeError("The port is closed.")
data = self.encode(value)
self._port.setData(data)
self._log(f"Set value: {value}")
[docs]
def encode(self, value: Union[int, bytes]):
"""Encode a value as bytes."""
return value
[docs]
def reset(self):
"""
Reset the parallel port by setting its data register to 0.
This method returns the port to a known, idle state.
"""
self._port.setData(self.reset_value)
self._log(f"Reset port to {self.reset_value}")
[docs]
def close(self):
"""
Close the parallel port connection.
For pyparallel, there is no explicit close method; this method is provided for interface
consistency.
"""
if self._port is not None:
del self._port
self._port = None
self._log("Closed port connection")
[docs]
@register("dummy", PORTS)
class DummyPort(BasePort):
"""
Dummy port implementation for testing and debugging.
This class simulates a port without requiring actual hardware.
It logs all values that are sent, reset actions, and close operations.
Examples
--------
>>> dummy = DummyPort(address='dummy')
>>> dummy.send(128) # Logs the sending of integer 128.
>>> dummy.send(b'\x80') # Logs the sending of the byte b'\x80'.
>>> dummy.reset() # Logs the reset action.
>>> dummy.close() # Logs the close action.
"""
[docs]
def __init__(self, log: bool = True):
"""
Initialize the DummyPort instance.
Parameters
----------
address : str
A dummy address identifier.
**kwargs
Additional keyword arguments (ignored).
"""
super().__init__(None, log=log)
self._open = True
[docs]
def send(self, value: Union[int, bytes]):
"""
Log the value that would be sent over the port.
If an integer is provided, it must be between 0 and 255. If a bytes object
is provided, it must be exactly one byte long. For multi-byte values, send the
data in separate calls.
Parameters
----------
value : Union[int, bytes]
The value to log. If an integer, it is logged directly.
If bytes, it is converted to an integer for logging.
"""
self._log(f"Sent value: {value} (simulated)")
[docs]
def reset(self):
"""
Log the action of resetting the port.
This method simulates resetting the port to a known, idle state.
"""
logger.info("DummyPort: Resetting port (simulated)")
[docs]
def close(self):
"""
Log the action of closing the port.
This method simulates closing the port connection.
"""
if self._open:
self._open = False
logger.info("DummyPort: Closing port (simulated)")