HEX
Server: Apache/2.4.58 (Ubuntu)
System: Linux ns3133907 6.8.0-86-generic #87-Ubuntu SMP PREEMPT_DYNAMIC Mon Sep 22 18:03:36 UTC 2025 x86_64
User: cssnetorguk (1024)
PHP: 8.2.28
Disabled: NONE
Upload Files
File: //proc/self/root/lib/python3/dist-packages/serial/urlhandler/protocol_cp2110.py
#! python
#
# Backend for Silicon Labs CP2110/4 HID-to-UART devices.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
# (C) 2019 Google LLC
#
# SPDX-License-Identifier:    BSD-3-Clause

# This backend implements support for HID-to-UART devices manufactured
# by Silicon Labs and marketed as CP2110 and CP2114. The
# implementation is (mostly) OS-independent and in userland. It relies
# on cython-hidapi (https://github.com/trezor/cython-hidapi).

# The HID-to-UART protocol implemented by CP2110/4 is described in the
# AN434 document from Silicon Labs:
# https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf

# TODO items:

# - rtscts support is configured for hardware flow control, but the
#   signaling is missing (AN434 suggests this is done through GPIO).
# - Cancelling reads and writes is not supported.
# - Baudrate validation is not implemented, as it depends on model and configuration.

import struct
import threading

try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse

try:
    import Queue
except ImportError:
    import queue as Queue

import hid  # hidapi

import serial
from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout


# Report IDs and related constant
_REPORT_GETSET_UART_ENABLE = 0x41
_DISABLE_UART = 0x00
_ENABLE_UART = 0x01

_REPORT_SET_PURGE_FIFOS = 0x43
_PURGE_TX_FIFO = 0x01
_PURGE_RX_FIFO = 0x02

_REPORT_GETSET_UART_CONFIG = 0x50

_REPORT_SET_TRANSMIT_LINE_BREAK = 0x51
_REPORT_SET_STOP_LINE_BREAK = 0x52


class Serial(SerialBase):
    # This is not quite correct. AN343 specifies that the minimum
    # baudrate is different between CP2110 and CP2114, and it's halved
    # when using non-8-bit symbols.
    BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200,
                 38400, 57600, 115200, 230400, 460800, 500000, 576000,
                 921600, 1000000)

    def __init__(self, *args, **kwargs):
        self._hid_handle = None
        self._read_buffer = None
        self._thread = None
        super(Serial, self).__init__(*args, **kwargs)

    def open(self):
        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        if self.is_open:
            raise SerialException("Port is already open.")

        self._read_buffer = Queue.Queue()

        self._hid_handle = hid.device()
        try:
            portpath = self.from_url(self.portstr)
            self._hid_handle.open_path(portpath)
        except OSError as msg:
            raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))

        try:
            self._reconfigure_port()
        except:
            try:
                self._hid_handle.close()
            except:
                pass
            self._hid_handle = None
            raise
        else:
            self.is_open = True
            self._thread = threading.Thread(target=self._hid_read_loop)
            self._thread.setDaemon(True)
            self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port))
            self._thread.start()

    def from_url(self, url):
        parts = urlparse.urlsplit(url)
        if parts.scheme != "cp2110":
            raise SerialException(
                'expected a string in the forms '
                '"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": '
                'not starting with cp2110:// {{!r}}'.format(parts.scheme))
        if parts.netloc:  # cp2100://BUS:DEVICE:ENDPOINT, for libusb
            return parts.netloc.encode('utf-8')
        return parts.path.encode('utf-8')

    def close(self):
        self.is_open = False
        if self._thread:
            self._thread.join(1)  # read timeout is 0.1
            self._thread = None
        self._hid_handle.close()
        self._hid_handle = None

    def _reconfigure_port(self):
        parity_value = None
        if self._parity == serial.PARITY_NONE:
            parity_value = 0x00
        elif self._parity == serial.PARITY_ODD:
            parity_value = 0x01
        elif self._parity == serial.PARITY_EVEN:
            parity_value = 0x02
        elif self._parity == serial.PARITY_MARK:
            parity_value = 0x03
        elif self._parity == serial.PARITY_SPACE:
            parity_value = 0x04
        else:
            raise ValueError('Invalid parity: {!r}'.format(self._parity))

        if self.rtscts:
            flow_control_value = 0x01
        else:
            flow_control_value = 0x00

        data_bits_value = None
        if self._bytesize == 5:
            data_bits_value = 0x00
        elif self._bytesize == 6:
            data_bits_value = 0x01
        elif self._bytesize == 7:
            data_bits_value = 0x02
        elif self._bytesize == 8:
            data_bits_value = 0x03
        else:
            raise ValueError('Invalid char len: {!r}'.format(self._bytesize))

        stop_bits_value = None
        if self._stopbits == serial.STOPBITS_ONE:
            stop_bits_value = 0x00
        elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
            stop_bits_value = 0x01
        elif self._stopbits == serial.STOPBITS_TWO:
            stop_bits_value = 0x01
        else:
            raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))

        configuration_report = struct.pack(
            '>BLBBBB',
            _REPORT_GETSET_UART_CONFIG,
            self._baudrate,
            parity_value,
            flow_control_value,
            data_bits_value,
            stop_bits_value)

        self._hid_handle.send_feature_report(configuration_report)

        self._hid_handle.send_feature_report(
            bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART)))
        self._update_break_state()

    @property
    def in_waiting(self):
        return self._read_buffer.qsize()

    def reset_input_buffer(self):
        if not self.is_open:
            raise PortNotOpenError()
        self._hid_handle.send_feature_report(
            bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO)))
        # empty read buffer
        while self._read_buffer.qsize():
            self._read_buffer.get(False)

    def reset_output_buffer(self):
        if not self.is_open:
            raise PortNotOpenError()
        self._hid_handle.send_feature_report(
            bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO)))

    def _update_break_state(self):
        if not self._hid_handle:
            raise PortNotOpenError()

        if self._break_state:
            self._hid_handle.send_feature_report(
                bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0)))
        else:
            # Note that while AN434 states "There are no data bytes in
            # the payload other than the Report ID", either hidapi or
            # Linux does not seem to send the report otherwise.
            self._hid_handle.send_feature_report(
                bytes((_REPORT_SET_STOP_LINE_BREAK, 0)))

    def read(self, size=1):
        if not self.is_open:
            raise PortNotOpenError()

        data = bytearray()
        try:
            timeout = Timeout(self._timeout)
            while len(data) < size:
                if self._thread is None:
                    raise SerialException('connection failed (reader thread died)')
                buf = self._read_buffer.get(True, timeout.time_left())
                if buf is None:
                    return bytes(data)
                data += buf
                if timeout.expired():
                    break
        except Queue.Empty:  # -> timeout
            pass
        return bytes(data)

    def write(self, data):
        if not self.is_open:
            raise PortNotOpenError()
        data = to_bytes(data)
        tx_len = len(data)
        while tx_len > 0:
            to_be_sent = min(tx_len, 0x3F)
            report = to_bytes([to_be_sent]) + data[:to_be_sent]
            self._hid_handle.write(report)

            data = data[to_be_sent:]
            tx_len = len(data)

    def _hid_read_loop(self):
        try:
            while self.is_open:
                data = self._hid_handle.read(64, timeout_ms=100)
                if not data:
                    continue
                data_len = data.pop(0)
                assert data_len == len(data)
                self._read_buffer.put(bytearray(data))
        finally:
            self._thread = None