ZMODEM: input/output split in API; over+out

This change makes the ZMODEM API use separate references for the input
and output communications channels, which is better adapted to the
established pattern in kata-shell.

Consuming up to the "OO" (over-and-out) bytes from the sender is also
included as a small throw-in. Preliminary dev runs have revealed that
the sz utility from lrzsz always sends "OO", preceded by 2 bytes of
unclear purpose, even after getting the ZFIN from the receiver.

Change-Id: Ib25261ad4c9054a3403ed5910aeacd62fbd6b93c
GitOrigin-RevId: 452cdaa41f473c0c3781faaf3bd1958b9803bb3a
This commit is contained in:
Matt Harvey
2021-09-21 21:41:33 -07:00
committed by Sam Leffler
parent 5c3a68f7ac
commit 929966d376
7 changed files with 148 additions and 189 deletions

View File

@@ -81,7 +81,7 @@ impl Read for &[u8] {
}
/// Forwarding implementation of Read for &mut
impl<'a, T> Read for &'a mut T
impl<'a, T: ?Sized> Read for &'a mut T
where
T: Read,
{
@@ -91,7 +91,7 @@ where
}
/// Forwarding implementation of Write for &mut
impl<'a, T> Write for &'a mut T
impl<'a, T: ?Sized> Write for &'a mut T
where
T: Write,
{

View File

@@ -37,3 +37,5 @@ pub const ZCRCQ: u8 = b'j'; /* CRC next, frame continues, ZACK expected */
pub const ZCRCW: u8 = b'k'; /* CRC next, ZACK expected, end of frame */
pub const XON: u8 = 0x11;
pub const OO: &str = "OO";

View File

@@ -165,33 +165,35 @@ where
Ok(buf.pop()) // pop ZCRC* byte
}
pub fn recv_data<RW, OUT>(
pub fn recv_data<CI, CO, DO>(
header: u8,
count: &mut u32,
rw: &mut RW,
out: &mut OUT,
channel_in: &mut CI,
channel_out: &mut CO,
data_out: &mut DO,
) -> io::Result<bool>
where
RW: io::Write + io::Read,
OUT: io::Write,
CI: io::Read,
CO: io::Write,
DO: io::Write,
{
let mut buf = Vec::new();
loop {
buf.clear();
let zcrc = match recv_zlde_frame(header, rw, &mut buf)? {
let zcrc = match recv_zlde_frame(header, channel_in, &mut buf)? {
Some(x) => x,
None => return Ok(false),
};
out.write_all(&buf)?;
data_out.write_all(&buf)?;
*count += buf.len() as u32;
match zcrc {
ZCRCW => {
debug!("ZCRCW: CRC next, ZACK expected, end of frame");
write_zack(rw, *count)?;
write_zack(channel_out, *count)?;
return Ok(true);
}
ZCRCE => {
@@ -200,7 +202,7 @@ where
}
ZCRCQ => {
debug!("ZCRCQ: CRC next, frame continues, ZACK expected");
write_zack(rw, *count)?
write_zack(channel_out, *count)?
}
ZCRCG => {
debug!("CCRCG: CRC next, frame continues nonstop");
@@ -377,7 +379,7 @@ pub fn write_over_and_out<W>(w: &mut W) -> io::Result<()>
where
W: io::Write,
{
w.write_all("OO".as_bytes())
w.write_all(OO.as_bytes())
}
pub fn escape_buf(src: &[u8], dst: &mut Vec<u8>) {

View File

@@ -53,28 +53,31 @@ impl State {
}
/// Receives data by Z-Modem protocol
pub fn recv<RW, W>(rw: RW, mut w: W) -> io::Result<usize>
pub fn recv<CI, CO, DO>(
mut channel_in: CI,
mut channel_out: CO,
mut data_out: DO,
) -> io::Result<usize>
where
RW: io::Read + io::Write,
W: io::Write,
CI: io::Read,
CO: io::Write,
DO: io::Write,
{
let mut rw_log = rw;
let mut count = 0;
let mut state = State::new();
write_zrinit(&mut rw_log)?;
write_zrinit(&mut channel_out)?;
while state != State::Done {
if !find_zpad(&mut rw_log)? {
if !find_zpad(&mut channel_in)? {
continue;
}
let frame = match parse_header(&mut rw_log)? {
let frame = match parse_header(&mut channel_in)? {
Some(x) => x,
None => {
recv_error(&mut rw_log, &state, count)?;
recv_error(&mut channel_out, &state, count)?;
continue;
}
};
@@ -85,15 +88,15 @@ where
// do things according new state
match state {
State::SendingZRINIT => {
write_zrinit(&mut rw_log)?;
write_zrinit(&mut channel_out)?;
}
State::ProcessingZFILE => {
let mut buf = Vec::new();
if recv_zlde_frame(frame.get_header(), &mut rw_log, &mut buf)?.is_none() {
write_znak(&mut rw_log)?;
if recv_zlde_frame(frame.get_header(), &mut channel_in, &mut buf)?.is_none() {
write_znak(&mut channel_out)?;
} else {
write_zrpos(&mut rw_log, count)?;
write_zrpos(&mut channel_out, count)?;
// TODO: process supplied data
if let Ok(s) = from_utf8(&buf) {
@@ -103,9 +106,15 @@ where
}
State::ReceivingData => {
if frame.get_count() != count
|| !recv_data(frame.get_header(), &mut count, &mut rw_log, &mut w)?
|| !recv_data(
frame.get_header(),
&mut count,
&mut channel_in,
&mut channel_out,
&mut data_out,
)?
{
write_zrpos(&mut rw_log, count)?;
write_zrpos(&mut channel_out, count)?;
}
}
State::CheckingData => {
@@ -117,16 +126,20 @@ where
);
// receiver ignores the ZEOF because a new zdata is coming
} else {
write_zrinit(&mut rw_log)?;
write_zrinit(&mut channel_out)?;
}
}
State::Done => {
write_zfin(&mut rw_log)?;
// NB: lexxvir/zmodem had a 10ms sleep here that has been removed
// due to no_std. Here we change it to flush() so that a return
// from this function does really indicate all the bytes have
// been written.
rw_log.flush()?;
write_zfin(&mut channel_out)?;
// lexxvir/zmodem had a 30ms sleep here, maybe for the
// following behavior from the ZMODEM spec: "The receiver
// waits briefly for the "O" characters, then exits whether
// they were received or not."
//
// sz does send these characters, and 2 more bytes before them.
// If we don't consume them here, they will become garbage on
// input after returning.
read_until_match(OO.as_bytes(), &mut channel_in)?;
}
}
}
@@ -145,3 +158,15 @@ where
_ => write_znak(w),
}
}
fn read_until_match<R: io::Read>(pattern: &[u8], mut r: R) -> io::Result<()> {
let mut remainder = pattern;
let mut b = [0u8; 1];
while !remainder.is_empty() {
r.read(&mut b)?;
if b[0] == remainder[0] {
remainder = &remainder[1..];
}
}
Ok(())
}

View File

@@ -1,66 +0,0 @@
use hexdump::*;
use log::LogLevel::*;
use std::io::*;
pub struct ReadWriteLog<RW> {
inner: BufReader<RW>,
}
impl<RW: Read + Write> ReadWriteLog<RW> {
pub fn new(rw: RW) -> ReadWriteLog<RW> {
ReadWriteLog {
inner: BufReader::new(rw),
}
}
}
impl<R: Read> Read for ReadWriteLog<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let r = self.inner.read(buf)?;
if log_enabled!(Debug) {
debug!("In:");
for x in hexdump_iter(&buf[..r]) {
debug!("{}", x);
}
}
Ok(r)
}
}
impl<R: Read> BufRead for ReadWriteLog<R> {
fn fill_buf(&mut self) -> Result<&[u8]> {
let r = self.inner.fill_buf()?;
if log_enabled!(Debug) {
debug!("In:");
for x in hexdump_iter(r) {
debug!("{}", x);
}
}
Ok(r)
}
fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
}
impl<RW: Write + Read> Write for ReadWriteLog<RW> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
if log_enabled!(Debug) {
debug!("Out:");
for x in hexdump_iter(buf) {
debug!("{}", x);
}
}
self.inner.get_mut().write(buf)
}
fn flush(&mut self) -> Result<()> {
self.inner.get_mut().flush()
}
}

View File

@@ -62,31 +62,36 @@ impl State {
}
}
pub fn send<RW, R>(rw: RW, r: &mut R, filename: &str, filesize: Option<u32>) -> io::Result<()>
pub fn send<CI, CO, DI>(
mut channel_in: CI,
mut channel_out: CO,
mut r: DI,
filename: &str,
filesize: Option<u32>,
) -> io::Result<()>
where
RW: io::Read + io::Write,
R: io::Read + io::Seek,
CI: io::Read,
CO: io::Write,
DI: io::Read + io::Seek,
{
let mut rw_log = rw;
let mut data = [0; SUBPACKET_SIZE];
let mut offset: u32;
write_zrqinit(&mut rw_log)?;
write_zrqinit(&mut channel_out)?;
let mut state = State::new();
while state != State::Done {
rw_log.flush()?;
channel_out.flush()?;
if !find_zpad(&mut rw_log)? {
if !find_zpad(&mut channel_in)? {
continue;
}
let frame = match parse_header(&mut rw_log)? {
let frame = match parse_header(&mut channel_in)? {
Some(x) => x,
None => {
write_znak(&mut rw_log)?;
write_znak(&mut channel_out)?;
continue;
}
};
@@ -97,10 +102,10 @@ where
// do things according new state
match state {
State::SendingZRQINIT => {
write_zrqinit(&mut rw_log)?;
write_zrqinit(&mut channel_out)?;
}
State::SendingZFILE => {
write_zfile(&mut rw_log, filename, filesize)?;
write_zfile(&mut channel_out, filename, filesize)?;
}
State::SendingData => {
offset = frame.get_count();
@@ -109,35 +114,35 @@ where
let num = r.read(&mut data)?;
if num == 0 {
write_zeof(&mut rw_log, offset)?;
write_zeof(&mut channel_out, offset)?;
} else {
// ZBIN32|ZDATA
// ZCRCG - best perf
// ZCRCQ - mid perf
// ZCRCW - worst perf
// ZCRCE - send at end
write_zdata(&mut rw_log, offset)?;
write_zdata(&mut channel_out, offset)?;
let mut i = 0;
loop {
i += 1;
write_zlde_data(&mut rw_log, ZCRCG, &data[..num])?;
write_zlde_data(&mut channel_out, ZCRCG, &data[..num])?;
offset += num as u32;
let num = r.read(&mut data)?;
if num < data.len() || i >= SUBPACKET_PER_ACK {
write_zlde_data(&mut rw_log, ZCRCW, &data[..num])?;
write_zlde_data(&mut channel_out, ZCRCW, &data[..num])?;
break;
}
}
}
}
State::SendingZFIN => {
write_zfin(&mut rw_log)?;
write_zfin(&mut channel_out)?;
}
State::Done => {
write_over_and_out(&mut rw_log)?;
write_over_and_out(&mut channel_out)?;
}
_ => (),
}

View File

@@ -18,24 +18,21 @@ fn forget_err(_e: std::io::Error) -> kata_io::Error {
kata_io::Error {}
}
struct InOut<R: Read, W: Write> {
struct ReadWrapper<R: std::io::Read> {
r: R,
w: W,
}
impl<R: Read, W: Write> InOut<R, W> {
pub fn new(r: R, w: W) -> InOut<R, W> {
InOut { r, w }
}
}
impl<R: Read, W: Write> kata_io::Read for InOut<R, W> {
impl<R: std::io::Read> kata_io::Read for ReadWrapper<R> {
fn read(&mut self, buf: &mut [u8]) -> kata_io::Result<usize> {
self.r.read(buf).map_err(forget_err)
}
}
impl<R: Read, W: Write> kata_io::Write for InOut<R, W> {
struct WriteWrapper<W: std::io::Write> {
w: W,
}
impl<W: std::io::Write> kata_io::Write for WriteWrapper<W> {
fn write(&mut self, buf: &[u8]) -> kata_io::Result<usize> {
self.w.write(buf).map_err(forget_err)
}
@@ -45,45 +42,24 @@ impl<R: Read, W: Write> kata_io::Write for InOut<R, W> {
}
}
/// Minimal vector-backed kata_io::Read and kata_io::Write
///
/// This is a workaround to not being able to implement the kata_io traits for
/// std::io::Cursor in this file, since none of those come from the current
/// crate.
struct WrappedCursor<T> {
c: std::io::Cursor<T>,
struct SendInput<T: std::io::Read + std::io::Seek> {
t: T,
}
impl<T> WrappedCursor<T> {
pub fn into_inner(self) -> T {
self.c.into_inner()
}
}
impl kata_io::Read for WrappedCursor<&Vec<u8>> {
impl<T: std::io::Read + std::io::Seek> kata_io::Read for SendInput<T> {
fn read(&mut self, buf: &mut [u8]) -> kata_io::Result<usize> {
self.c.read(buf).map_err(forget_err)
self.t.read(buf).map_err(forget_err)
}
}
impl kata_io::Write for WrappedCursor<Vec<u8>> {
fn write(&mut self, buf: &[u8]) -> kata_io::Result<usize> {
self.c.write(buf).map_err(forget_err)
}
fn flush(&mut self) -> kata_io::Result<()> {
self.c.flush().map_err(forget_err)
}
}
impl kata_io::Seek for WrappedCursor<&Vec<u8>> {
impl<T: std::io::Read + std::io::Seek> kata_io::Seek for SendInput<T> {
fn seek(&mut self, pos: kata_io::SeekFrom) -> kata_io::Result<u64> {
let std_pos = match pos {
kata_io::SeekFrom::Start(n) => std::io::SeekFrom::Start(n),
kata_io::SeekFrom::End(n) => std::io::SeekFrom::End(n),
kata_io::SeekFrom::Current(n) => std::io::SeekFrom::Current(n),
};
self.c.seek(std_pos).map_err(forget_err)
self.t.seek(std_pos).map_err(forget_err)
}
}
@@ -113,14 +89,18 @@ fn recv_from_sz() {
.spawn()
.expect("sz failed to run");
let child_stdin = sz.stdin.unwrap();
let child_stdout = sz.stdout.unwrap();
let mut inout = InOut::new(child_stdout, child_stdin);
let mut c = Cursor::new(Vec::new());
let mut c = WrappedCursor {
c: Cursor::new(Vec::new()),
};
zmodem::recv::recv(&mut inout, &mut c).unwrap();
zmodem::recv::recv(
ReadWrapper {
r: sz.stdout.unwrap(),
},
WriteWrapper {
w: sz.stdin.unwrap(),
},
WriteWrapper { w: &mut c },
)
.unwrap();
sleep(Duration::from_millis(300));
remove_file("recv_from_sz").unwrap();
@@ -135,26 +115,31 @@ fn send_to_rz() {
let _ = remove_file("send_to_rz");
let sz = Command::new("rz")
let rz = Command::new("rz")
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.spawn()
.expect("rz failed to run");
let child_stdin = sz.stdin.unwrap();
let child_stdout = sz.stdout.unwrap();
let mut inout = InOut::new(child_stdout, child_stdin);
let len = RND_VALUES.len() as u32;
let copy = RND_VALUES.clone();
let mut cur = WrappedCursor {
c: Cursor::new(&copy),
};
sleep(Duration::from_millis(300));
zmodem::send::send(&mut inout, &mut cur, "send_to_rz", Some(len)).unwrap();
zmodem::send::send(
ReadWrapper {
r: rz.stdout.unwrap(),
},
WriteWrapper {
w: rz.stdin.unwrap(),
},
SendInput {
t: Cursor::new(&copy),
},
"send_to_rz",
Some(len),
)
.unwrap();
sleep(Duration::from_millis(300));
let mut f = File::open("send_to_rz").expect("open 'send_to_rz'");
@@ -189,26 +174,32 @@ fn lib_send_recv() {
spawn(move || {
let outf = OpenOptions::new().write(true).open("test-fifo1").unwrap();
let inf = File::open("test-fifo2").unwrap();
let mut inout = InOut::new(inf, outf);
let origin = RND_VALUES.clone();
let mut c = WrappedCursor {
c: Cursor::new(&origin),
};
zmodem::send::send(&mut inout, &mut c, "test", None).unwrap();
zmodem::send::send(
ReadWrapper {
r: File::open("test-fifo2").unwrap(),
},
WriteWrapper { w: outf },
SendInput {
t: Cursor::new(&RND_VALUES.clone()),
},
"test",
None,
)
.unwrap();
});
let mut c = WrappedCursor {
c: Cursor::new(Vec::new()),
};
let mut c = Cursor::new(Vec::new());
let inf = File::open("test-fifo1").unwrap();
let outf = OpenOptions::new().write(true).open("test-fifo2").unwrap();
let mut inout = InOut::new(inf, outf);
zmodem::recv::recv(&mut inout, &mut c).unwrap();
zmodem::recv::recv(
ReadWrapper {
r: File::open("test-fifo1").unwrap(),
},
WriteWrapper {
w: OpenOptions::new().write(true).open("test-fifo2").unwrap(),
},
WriteWrapper { w: &mut c },
)
.unwrap();
let _ = remove_file("test-fifo1");
let _ = remove_file("test-fifo2");