Skip to content

Commit e1fa895

Browse files
Implement a pipelining API
This is so that we can send multiple commands to a board while keeping to only one request-response cycle. Co-Authored-By: Jake Howard <[email protected]>
1 parent 062f944 commit e1fa895

File tree

1 file changed

+129
-37
lines changed

1 file changed

+129
-37
lines changed

sbot/serial_wrapper.py

+129-37
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
from __future__ import annotations
88

9+
import itertools
910
import logging
1011
import sys
1112
import threading
@@ -122,47 +123,102 @@ def stop(self) -> None:
122123
"""
123124
self._disconnect()
124125

126+
def _connect_if_needed(self) -> None:
127+
if not self.serial.is_open:
128+
if not self._connect():
129+
# If the serial port cannot be opened raise an error,
130+
# this will be caught by the retry decorator
131+
raise BoardDisconnectionError((
132+
f'Connection to board {self.identity.board_type}:'
133+
f'{self.identity.asset_tag} could not be established',
134+
))
135+
125136
@retry(times=3, exceptions=(BoardDisconnectionError, UnicodeDecodeError))
126-
def query(self, data: str) -> str:
137+
def query_multi(self, commands: list[str]) -> list[str]:
127138
"""
128139
Send a command to the board and return the response.
129140
130-
This method will automatically reconnect to the board and retry the command
141+
This method will automatically reconnect to the board and retry the commands
131142
up to 3 times on serial errors.
132143
133-
:param data: The data to write to the board.
144+
:param commands: The commands to write to the board.
134145
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
135146
including failing to respond to the command.
136-
:return: The response from the board with the trailing newline removed.
147+
:return: The responses from the board with the trailing newlines removed.
137148
"""
149+
# Verify no command has a newline in it, and build a command `bytes` from the
150+
# list of commands
151+
encoded_commands: list[bytes] = []
152+
invalid_commands: list[tuple[str, str]] = []
153+
154+
for command in commands:
155+
if '\n' in command:
156+
invalid_commands.append(("contains newline", command))
157+
else:
158+
try:
159+
byte_form = command.encode(encoding='utf-8')
160+
except UnicodeEncodeError as e:
161+
invalid_commands.append((str(e), command))
162+
else:
163+
encoded_commands.append(byte_form)
164+
encoded_commands.append(b'\n')
165+
166+
if invalid_commands:
167+
invalid_commands.sort()
168+
169+
invalid_command_groups = dict(itertools.groupby(
170+
invalid_commands,
171+
key=lambda x: x[0],
172+
))
173+
174+
error_message = "\n".join(
175+
["Invalid commands:"] +
176+
[
177+
f" {reason}: " + ", ".join(
178+
repr(command)
179+
for _, command in grouped_commands
180+
)
181+
for reason, grouped_commands in invalid_command_groups.items()
182+
],
183+
)
184+
raise ValueError(error_message)
185+
186+
full_commands = b''.join(encoded_commands)
187+
138188
with self._lock:
139-
if not self.serial.is_open:
140-
if not self._connect():
141-
# If the serial port cannot be opened raise an error,
142-
# this will be caught by the retry decorator
143-
raise BoardDisconnectionError((
144-
f'Connection to board {self.identity.board_type}:'
145-
f'{self.identity.asset_tag} could not be established',
146-
))
189+
# If the serial port is not open, try to connect
190+
self._connect_if_needed() # TODO: Write me
147191

192+
# Contain all the serial IO in a try-catch; on error, disconnect and raise an error
148193
try:
149-
logger.log(TRACE, f'Serial write - {data!r}')
150-
cmd = data + '\n'
151-
self.serial.write(cmd.encode())
152-
153-
response = self.serial.readline()
154-
try:
155-
response_str = response.decode().rstrip('\n')
156-
except UnicodeDecodeError as e:
157-
logger.warning(
158-
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
159-
f"returned invalid characters: {response!r}")
160-
raise e
161-
logger.log(
162-
TRACE, f'Serial read - {response_str!r}')
163-
164-
if b'\n' not in response:
165-
# If readline times out no error is raised, it returns an incomplete string
194+
# Send the commands to the board
195+
self.serial.write(full_commands)
196+
197+
# Log the commands
198+
for command in commands:
199+
logger.log(TRACE, f"Serial write - {command!r}")
200+
201+
# Read as many lines as there are commands
202+
responses_binary = [
203+
self.serial.readline()
204+
for _ in range(len(commands))
205+
]
206+
207+
# Log the responses. For backwards compatibility reasons, we decode
208+
# these separately here before any error processing, so that the
209+
# logs are correct even if an error occurs.
210+
for response_binary in responses_binary:
211+
response_decoded = response_binary.decode(
212+
"utf-8",
213+
errors="replace",
214+
).rstrip('\n')
215+
logger.log(TRACE, f"Serial read - {response_decoded!r}")
216+
217+
# Check all responses have a trailing newline (an incomplete
218+
# response will not).
219+
# This is within the lock and try-catch to ensure the serial port
220+
# is closed on error.
221+
if not all(response.endswith(b'\n') for response in responses_binary):
166222
logger.warning((
167223
f'Connection to board {self.identity.board_type}:'
168224
f'{self.identity.asset_tag} timed out waiting for response'
@@ -176,15 +232,51 @@ def query(self, data: str) -> str:
176232
'disconnected during transaction'
177233
))
178234

179-
if response_str.startswith('NACK'):
180-
_, error_msg = response_str.split(':', maxsplit=1)
181-
logger.error((
182-
f'Board {self.identity.board_type}:{self.identity.asset_tag} '
183-
f'returned NACK on write command: {error_msg}'
184-
))
185-
raise RuntimeError(error_msg)
235+
# Decode all the responses as UTF-8
236+
try:
237+
responses_decoded = [
238+
response.decode("utf-8").rstrip('\n')
239+
for response in responses_binary
240+
]
241+
except UnicodeDecodeError as e:
242+
logger.warning(
243+
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
244+
f"returned invalid characters: {responses_binary!r}")
245+
raise e
246+
247+
# Collect any NACK responses; if any, raise an error
248+
nack_prefix = 'NACK:'
249+
nack_responses = [
250+
response
251+
for response in responses_decoded
252+
if response.startswith(nack_prefix)
253+
]
254+
255+
if nack_responses:
256+
errors = [response[len(nack_prefix):] for response in nack_responses]
257+
# We can't use exception groups due to needing to support Python 3.8
258+
raise (
259+
RuntimeError(errors[0])
260+
if len(errors) == 1
261+
else RuntimeError("Multiple errors: " + ", ".join(errors))
262+
)
263+
264+
# Return the list of responses
265+
return responses_decoded
266+
267+
def query(self, data: str) -> str:
268+
"""
269+
Send a command to the board and return the response.
186270
187-
return response_str
271+
This method will automatically reconnect to the board and retry the command
272+
up to 3 times on serial errors.
273+
274+
:param data: The data to write to the board.
275+
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
276+
including failing to respond to the command.
277+
:return: The response from the board with the trailing newline removed.
278+
"""
279+
return self.query_multi([data])[0]
188280

189281
def write(self, data: str) -> None:
190282
"""

0 commit comments

Comments
 (0)