agent,runtime-rs,runk: fix fmt and clippy warnings

Fix rustfmt and clippy warnings detected by CI.

Fixes: #6714
Signed-off-by: Zixuan Tan <tanzixuan.me@gmail.com>
This commit is contained in:
Zixuan Tan 2023-11-14 19:07:02 +08:00
parent 89be42a177
commit f6710610d1
8 changed files with 31 additions and 37 deletions

View File

@ -30,7 +30,7 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" }
rlimit = "0.5.3" rlimit = "0.5.3"
cfg-if = "0.1.0" cfg-if = "0.1.0"
tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] } tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] }
tokio-vsock = "0.3.1" tokio-vsock = "0.3.1"
futures = "0.3.17" futures = "0.3.17"
async-trait = "0.1.31" async-trait = "0.1.31"

View File

@ -1015,7 +1015,7 @@ impl BaseContainer for LinuxContainer {
break; break;
} }
Ok(n) => { Ok(n) => {
if let Err(_) = term_master.write_all(&buf[..n]).await { if term_master.write_all(&buf[..n]).await.is_err() {
break; break;
} }
} }
@ -1035,11 +1035,12 @@ impl BaseContainer for LinuxContainer {
}); });
} }
// Copy from term_master to stdout
if let Some(mut stdout_stream) = proc_io.stdout.take() { if let Some(mut stdout_stream) = proc_io.stdout.take() {
let wgw_output = proc_io.wg_output.worker(); let wgw_output = proc_io.wg_output.worker();
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let logger = logger.clone(); let logger = logger.clone();
let term_closer = term_closer.clone(); let term_closer = term_closer;
tokio::spawn(async move { tokio::spawn(async move {
let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await;
debug!(logger, "copy from term_master to stdout end: {:?}", res); debug!(logger, "copy from term_master to stdout end: {:?}", res);
@ -1050,6 +1051,7 @@ impl BaseContainer for LinuxContainer {
} }
} }
} else { } else {
// not using a terminal
let stdin = p.stdin.unwrap(); let stdin = p.stdin.unwrap();
let stdout = p.stdout.unwrap(); let stdout = p.stdout.unwrap();
let stderr = p.stderr.unwrap(); let stderr = p.stderr.unwrap();
@ -1058,8 +1060,11 @@ impl BaseContainer for LinuxContainer {
child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) };
if let Some(proc_io) = &mut p.proc_io { if let Some(proc_io) = &mut p.proc_io {
// Copy from stdin to parent_stdin // Here we copy from vsock stdin stream to parent_stdin manually.
// This is because we need to close the stdin fifo when the stdin stream
// is drained.
if let Some(mut stdin_stream) = proc_io.stdin.take() { if let Some(mut stdin_stream) = proc_io.stdin.take() {
info!(logger, "copy from stdin to parent_stdin");
let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker(); let wgw_input = proc_io.wg_input.worker();
@ -1073,11 +1078,11 @@ impl BaseContainer for LinuxContainer {
res = stdin_stream.read(&mut buf) => { res = stdin_stream.read(&mut buf) => {
match res { match res {
Err(_) | Ok(0) => { Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res); info!(logger, "copy from stdin to term_master end: {:?}", res);
break; break;
} }
Ok(n) => { Ok(n) => {
if let Err(_) = parent_stdin.write_all(&buf[..n]).await { if parent_stdin.write_all(&buf[..n]).await.is_err() {
break; break;
} }
} }
@ -1086,7 +1091,7 @@ impl BaseContainer for LinuxContainer {
// As the stdin fifo is opened in RW mode in the shim, which will never // As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested. // read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => { _ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested"); info!(logger, "copy ends as requested");
break break
} }
} }

View File

@ -5,7 +5,7 @@
use libc::pid_t; use libc::pid_t;
use std::fs::File; use std::fs::File;
use std::os::unix::io::{AsRawFd, RawFd, IntoRawFd}; use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_vsock::VsockStream; use tokio_vsock::VsockStream;
@ -195,7 +195,7 @@ impl Process {
p.parent_stdin = Some(pstdin); p.parent_stdin = Some(pstdin);
p.stdin = Some(stdin); p.stdin = Some(stdin);
if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) {
let fd = stdout.into_raw_fd(); let fd = stdout.into_raw_fd();
// The stdout/stderr of the process should be blocking, otherwise // The stdout/stderr of the process should be blocking, otherwise
// the process may encounter EAGAIN error when writing to stdout/stderr. // the process may encounter EAGAIN error when writing to stdout/stderr.
@ -207,7 +207,7 @@ impl Process {
p.stdout = Some(stdout); p.stdout = Some(stdout);
} }
if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) {
let fd = stderr.into_raw_fd(); let fd = stderr.into_raw_fd();
set_blocking(fd)?; set_blocking(fd)?;
p.stderr = Some(fd); p.stderr = Some(fd);
@ -241,8 +241,8 @@ impl Process {
} }
pub fn cleanup_process_stream(&mut self) { pub fn cleanup_process_stream(&mut self) {
if let Some(_) = self.proc_io.take() { if let Some(proc_io) = self.proc_io.take() {
// taken drop(proc_io);
return; return;
} }

View File

@ -22,12 +22,10 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> {
let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?; let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?;
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if let Ok((stream, peer_addr)) = listener.accept().await { if let Ok((stream, Vsock(addr))) = listener.accept().await {
if let Vsock(addr) = peer_addr { let port = addr.port();
let port = addr.port(); info!(sl(), "accept connection from peer port {}", port);
info!(sl(), "accept connection from peer port {}", port); HVSOCK_STREAMS.lock().await.insert(port, stream);
HVSOCK_STREAMS.lock().await.insert(port, stream);
}
} }
} }
}); });

View File

@ -210,20 +210,17 @@ impl Container {
.init_process .init_process
.passfd_io .passfd_io
.as_ref() .as_ref()
.map(|io| io.stdin_port) .and_then(|io| io.stdin_port),
.flatten(),
stdout_port: inner stdout_port: inner
.init_process .init_process
.passfd_io .passfd_io
.as_ref() .as_ref()
.map(|io| io.stdout_port) .and_then(|io| io.stdout_port),
.flatten(),
stderr_port: inner stderr_port: inner
.init_process .init_process
.passfd_io .passfd_io
.as_ref() .as_ref()
.map(|io| io.stderr_port) .and_then(|io| io.stderr_port),
.flatten(),
..Default::default() ..Default::default()
}; };

View File

@ -87,24 +87,17 @@ impl ContainerInner {
process_id: process.clone().into(), process_id: process.clone().into(),
string_user: None, string_user: None,
process: Some(exec.oci_process.clone()), process: Some(exec.oci_process.clone()),
stdin_port: exec stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port),
.process
.passfd_io
.as_ref()
.map(|io| io.stdin_port)
.flatten(),
stdout_port: exec stdout_port: exec
.process .process
.passfd_io .passfd_io
.as_ref() .as_ref()
.map(|io| io.stdout_port) .and_then(|io| io.stdout_port),
.flatten(),
stderr_port: exec stderr_port: exec
.process .process
.passfd_io .passfd_io
.as_ref() .as_ref()
.map(|io| io.stderr_port) .and_then(|io| io.stderr_port),
.flatten(),
}) })
.await .await
.context("exec process")?; .context("exec process")?;

View File

@ -85,7 +85,7 @@ impl PassfdIo {
.read(true) .read(true)
.write(true) .write(true)
.custom_flags(libc::O_NONBLOCK) .custom_flags(libc::O_NONBLOCK)
.open(&stdin) .open(stdin)
.context("open stdin")?; .context("open stdin")?;
let hostport = passfd_connect(uds_path, passfd_port, fin.into()) let hostport = passfd_connect(uds_path, passfd_port, fin.into())
@ -99,7 +99,7 @@ impl PassfdIo {
let fout = OpenOptions::new() let fout = OpenOptions::new()
.write(true) .write(true)
.custom_flags(libc::O_NONBLOCK) .custom_flags(libc::O_NONBLOCK)
.open(&stdout) .open(stdout)
.context("open stdout")?; .context("open stdout")?;
let hostport = passfd_connect(uds_path, passfd_port, fout.into()) let hostport = passfd_connect(uds_path, passfd_port, fout.into())
@ -115,7 +115,7 @@ impl PassfdIo {
let ferr = OpenOptions::new() let ferr = OpenOptions::new()
.write(true) .write(true)
.custom_flags(libc::O_NONBLOCK) .custom_flags(libc::O_NONBLOCK)
.open(&stderr) .open(stderr)
.context("open stderr")?; .context("open stderr")?;
let hostport = passfd_connect(uds_path, passfd_port, ferr.into()) let hostport = passfd_connect(uds_path, passfd_port, ferr.into())

View File

@ -294,6 +294,7 @@ impl ContainerLauncher {
&self.id, &self.id,
self.init, self.init,
0, 0,
None,
)?) )?)
} else { } else {
Err(anyhow!("no process configuration")) Err(anyhow!("no process configuration"))