The Communications System: Part 1
Prelude
The Communications system is arguably one of the most important parts of our simulator. After all, it’s kinda hard to debug a program when you can’t ask it why something isn’t working.
The user will interact with our simulated motion controller via a single Serial Port, which we’ll be modelling as a simple thing which sends and receives bytes. Serial ports are a fairly old technology, and have several drawbacks compared to the Ethernet and TCP protocols that most programmers are familiar with.
- There are no “packets” (i.e. bring your own frames)
- There’s no guarantee the other side has received a message (i.e. bring your own ACKs) - or even that there’s anyone on the other end!
- If you receive data, there’s no guarantee it wasn’t garbled during transmission (i.e. bring your own error detection and correction)
This all combines to make the Serial protocol an unreliable one. Reliable protocols can be built on top of unreliable ones, we just need to be smarter.
For simplicity, we’ll design the communications system using request-response pairs. This means:
- For every message sent to the simulator, there will be a corresponding
response message
- This implies that no response means the request wasn’t received and should be resent (or ignored if non-critical)
- Responses will always be sent in the order their requests arrived in
Building Reliability
The way we’ll be adding reliability to the underlying error-prone stream of bytes received from the Serial connection is by using a protocol called the Advanced Navigation Packet Protocol (ANPP). This is a handly little protocol *published by Advanced Navigation under the MIT license, with an open-source Rust port.
Each message sent using ANPP will be laid out as:
Offset | Size | Description |
---|---|---|
0x0 | 1 | Packet ID |
0x1 | 1 | Length |
0x2 | 2 | CRC-16 checksum |
0x4 | 1 | Header check-byte (XOR of bytes 0..4 ) |
0x5 | < 256 | Body |
This lets us receive bytes one-by-one over the Serial port, then we can
periodically scan through the received bytes looking for a valid header
(sequence of 5 bytes which equal 0
when XOR’d together). From there we can
identify the message body (the next Length
bytes) and identify transmission
errors using the CRC-16 checksum.
ANPP gives us a nice way of detecting when a message has been recieved successfully, but we also need a higher-level mechanism for detecting transmission failures and correcting them.
The easiest way to do this is called Automatic Repeat reQuest, i.e. tell the sender to resend because an error was detected, and/or automatically resend the previous message if it hasn’t been answered after X seconds.
Sending Data to the Communications System
Data can be received at any time in a normal microcontroller. The typical way to handle this is by either frequently polling the pins wired up to our serial port, or to configure the microcontroller to automatically invoke a callback whenever a byte is received.
In this case the interrupt approach seems quite natural due to JavaScript’s callback-based nature.
All bytes the simulator receives will need to be stored in a buffer until the next tick.
We can model the way data is passed to the Communications
system by giving
the new comms
crate a Rx
trait:
// comms/src/lib.rs
/// The receiving end of a *Serial Connection*.
pub trait Rx {
/// Get all bytes received by the simulator since the last tick.
///
/// # Note to Implementors
///
/// To prevent reading data twice, this buffer should be cleared after every
/// tick.
fn receive(&self) -> &[u8];
}
We’ll also give the WASM code a way to write data to a buffer owned by our
App
.
// sim/src/app.rs
#[wasm_bindgen]
impl App {
pub fn on_data_received(&mut self, data: &[u8]) {
self.inputs.on_data_received(data);
}
}
// sim/src/inputs.rs
#[derive(Debug, Clone, Default)]
pub struct Inputs {
clock: PerformanceClock,
last_tick: Cell<Duration>,
rx_buffer: ArrayVec<[u8; 256]>, // <-- new field!
}
impl Inputs {
...
pub(crate) fn on_data_received(&mut self, data: &[u8]) {
// writes up to `capacity` bytes to the buffer. Extra items are
// silently dropped on the floor.
self.rx_buffer.extend(data.into_iter().copied());
}
}
In most microcontrollers an Interrupt Service Routine is a function that
takes no arguments and returns nothing (fn()
), meaning the only way to send
data from the ISR to the main application is via static
memory.
This is more of an implementation detail than anything else. For our purposes
using a method on App
makes things simpler and easier to test, so we’ll do
that. At the end of the day, thanks to the Rx
trair our Communications
system doesn’t really care where bytes come from, just that we can give it
a buffer of recently received data.
Decoding Received Data
Now we’ve got a way to send data between the frontend and the backend, lets
start coding the Communications
system which is in charge of decoding packets.
// comms/src/lib.rs
use aimc_hal::System;
use anpp::Decoder;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Communications {
decoder: Decoder,
}
impl<I: Rx, O> System<I, O> for Communications {
fn poll(&mut self, inputs: &I, outputs: &mut O) {
unimplemented!();
}
}
The poll()
method for Communications
is really simple. You copy data from
inputs.received()
into self.decoder
(using
Decoder::push_data()
), then keep calling
Decoder::decode()
to read packets until it returns a
DecodeError::RequiresMoreData
.
// comms/src/lib.rs
impl<I: Rx, O> System<I, O> for Communications {
fn poll(&mut self, inputs: &I, _outputs: &mut O) {
// A: how do we want to handle overflows?
let _ = self.decoder.push_data(inputs.receive());
loop {
match self.decoder.decode() {
Ok(pkt) => unimplemented!("B: What do we do now?"),
Err(DecodeError::InvalidCRC) => {
unimplemented!("C: How do we handle corrupted packets?")
},
Err(DecodeError::RequiresMoreData) => break,
}
}
}
}
This looks fairly straightforward, but it’s raised three questions:
- A: how do we want to handle decoder buffer overflows? If we’re receiving more data than we can process and can’t increase the buffer size (buffers have a size defined at compile-time) then we need to drop data. The question then becomes whether to drop data already in the buffer, or drop data we haven’t had a chance to look at yet?
- B: We’ve got a valid packet… now what?
- C: Invalid CRCs indicate that a message was garbled in transit. Should we just ignore the error, or do we want to keep track of how many CRC errors we’ve had and report it to the frontend at some point?
For now, lets handle A by clearing the Decoder
buffer. This lets us get
rid of garbled data left over from previous poll()
s and start with a clean
slate.
// comms/src/lib.rs
impl<I: Rx, O> System<I, O> for Communications {
fn poll(&mut self, inputs: &I, _outputs: &mut O) {
let received = inputs.receive();
if self.decoder.push_data(received).is_err() {
// we've run out of space in the decoder buffer, clear out leftovers
// from previous runs and copy in as much new data as possible
self.decoder.clear();
let len = core::cmp::min(
received.len(),
self.decoder.remaining_capacity(),
);
let _ = self.decoder.push_data(&received[..len]);
}
...
}
}
Either way, this situation isn’t ideal. We don’t want to drop data at all, so
ideally the frontend wouldn’t send more data than the Communications
system
can handle.
This gives us an effective limit of
anpp::Decoder::DEFAULT_DECODER_BUFFER_SIZE
(512 bytes) per
poll()
of the Communications
system. Considering will be polling the
simulator from requestAnimationFrame()
, and requestAnimationFrame()
only
fires when the browser redraws (about 60Hz, or every 16ms), this limits the
entire application to a maximum transfer rate of 512*60 = 30720
bytes per
second.
B is easy enough to solve. The Communications
system is only concerned
with the receiving and transmitting of messages, so it should let the rest of
the application decide how a message should be handled and what to reply with.
// comms/src/lib.rs
pub trait MessageHandler {
fn handle_message(&mut self, msg: &Packet) -> Result<Packet, CommsError>;
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum CommsError {
/// The [`MessageHandler`] doesn't know how to handle the message.
UnknownMessageType,
}
Now we can handle the message and send back a response.
// comms/src/lib.rs
impl<I, T, M> System<I, Outputs<T, M>> for Communications
where
I: Rx,
T: Tx,
M: MessageHandler,
{
fn poll(&mut self, inputs: &I, outputs: &mut Outputs<T, M>) {
...
loop {
match self.decoder.decode() {
Ok(request) => {
let response = outputs
.message_handler
.handle_message(&request)
.expect("Unhandled message");
outputs.send(response);
},
...
}
}
}
}
/// The receiving end of a *Serial Connection*.
pub trait Rx {
/// Get all bytes received by the simulator since the last tick.
///
/// # Note to Implementors
///
/// To prevent reading data twice, this buffer should be cleared after every
/// tick.
fn receive(&self) -> &[u8];
}
/// The transmitting end of a *Serial Connection*.
pub trait Tx {
/// Queue some data to be sent to the frontend.
///
/// There is no guarantee that the data will all be sent. This may happen if
/// the receiver isn't listening or they aren't able to receive at this
/// time.
fn send(&mut self, data: &[u8]);
}
pub struct Outputs<T, M> {
message_handler: M,
tx: T,
}
impl<T: Tx, M> Outputs<T, M> {
fn send(&mut self, packet: &Packet) {
let mut buffer = [0; Packet::MAX_PACKET_SIZE + 5];
debug_assert!(buffer.len() >= packet.total_length());
let bytes_written = packet
.write_to_buffer(&mut buffer)
.expect("our buffer should always be big enough");
self.tx.send(&buffer[..bytes_written]);
}
}
Later on we may want to deal with a CommsError::UnknownMessageType
by sending
back some sort of “Not Acknowledged” message, but for now we’ll panic.
To represent the transfer side of a serial port we’ll introduce a Tx
trait.
We could have merged handle_message()
and send()
into a single trait,
but that wouldn’t make logical sense. The Tx
trait is implemented by some
bit of hardware that connects to the outside world, while a MessageHandler
is used to communicate with the rest of the application.
To make things more ergonomic, Tx
and MessageHandler
are implemented for
mutable references. That lets a caller just pass in a mutable reference to
existing types, i.e. Outputs::new(&mut some_tx, &mut some_handler)
.
// comms/src/lib.rs
impl<'a, T: Tx> Tx for &'a mut T {
fn send(&mut self, data: &[u8]) { (*self).send(data); }
}
impl<'a, M: MessageHandler> MessageHandler for &'a mut M {
fn handle_message(&mut self, msg: &Packet) -> Result<Packet, CommsError> {
(*self).handle_message(msg)
}
}
To handle C (CRC errors), we’ll give the MessageHandler
a method that’ll
be called whenever a CRC error occurs. That way the component in charge of
routing messages can note down how many errors have occurred within a single
run.
// comms/src/lib.rs
impl<I, T, M> System<I, Outputs<T, M>> for Communications
where ...
{
fn poll(&mut self, inputs: &I, outputs: &mut Outputs<T, M>) {
...
loop {
match self.decoder.decode() {
...
Err(DecodeError::InvalidCRC) => {
outputs.message_handler.on_crc_error()
},
...
}
}
}
}
pub trait MessageHandler {
fn handle_message(&mut self, msg: &Packet) -> Result<Packet, CommsError>;
/// Callback used to notify the application whenever a CRC error occurs.
fn on_crc_error(&mut self) {}
}
The Next Step
We’ve now set things up so we can receive input from the frontend and decode the
data into raw Packet
s, the next step is to start defining the various messages
our system will use and wire up a MessageHandler
.