agent,runtime-rs: fix container io detach and attach

Partially fix some issues related to container io detach and attach.

Fixes: #6714
Signed-off-by: Zixuan Tan <tanzixuan.me@gmail.com>
This commit is contained in:
Zixuan Tan 2023-10-30 01:21:32 +08:00
parent 657b17a86f
commit 5536743361
5 changed files with 75 additions and 75 deletions

View File

@ -995,13 +995,25 @@ impl BaseContainer for LinuxContainer {
// A reference count used to clean up the term master fd. // A reference count used to clean up the term master fd.
let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) }); let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) });
// Copy from stdin to term_master
if let Some(mut stdin_stream) = proc_io.stdin.take() { if let Some(mut stdin_stream) = proc_io.stdin.take() {
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone(); let logger = logger.clone();
let term_closer = term_closer.clone(); let term_closer = term_closer.clone();
tokio::spawn(async move { tokio::spawn(async move {
let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; // As the stdin fifo is opened in RW mode in the shim, which will never
debug!(logger, "copy from stdin to term_master end: {:?}", res); // read EOF, we close the stdin fifo here when explicit requested.
tokio::select! {
res = tokio::io::copy(&mut stdin_stream, &mut term_master) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
}
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
}
}
wgw_input.done();
std::mem::forget(term_master); // Avoid auto closing of term_master std::mem::forget(term_master); // Avoid auto closing of term_master
drop(term_closer); drop(term_closer);
}); });
@ -1022,22 +1034,35 @@ impl BaseContainer for LinuxContainer {
} }
} }
} else { } else {
// Allow null io in passfd io mode when vsock streams are not provided let stdin = p.stdin.unwrap();
child_stdin = if let Some(stdin) = p.stdin { let stdout = p.stdout.unwrap();
unsafe { std::process::Stdio::from_raw_fd(stdin) } let stderr = p.stderr.unwrap();
} else { child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) };
std::process::Stdio::null() child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) };
}; child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) };
child_stdout = if let Some(stdout) = p.stdout {
unsafe { std::process::Stdio::from_raw_fd(stdout) } if let Some(proc_io) = &mut p.proc_io {
} else { // Copy from stdin to parent_stdin
std::process::Stdio::null() if let Some(mut stdin_stream) = proc_io.stdin.take() {
}; let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
child_stderr = if let Some(stderr) = p.stderr { let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
unsafe { std::process::Stdio::from_raw_fd(stderr) } let wgw_input = proc_io.wg_input.worker();
} else { let logger = logger.clone();
std::process::Stdio::null() tokio::spawn(async move {
}; // As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin stream when containerd explicit requested.
tokio::select! {
res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin) => {
debug!(logger, "copy from stdin to parent_stdin end: {:?}", res);
}
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
}
}
wgw_input.done();
});
}
}
} }
let pidns = get_pid_namespace(&self.logger, linux)?; let pidns = get_pid_namespace(&self.logger, linux)?;

View File

@ -54,6 +54,11 @@ pub struct ProcessIo {
pub stdin: Option<VsockStream>, pub stdin: Option<VsockStream>,
pub stdout: Option<VsockStream>, pub stdout: Option<VsockStream>,
pub stderr: Option<VsockStream>, pub stderr: Option<VsockStream>,
// used to close stdin stream
pub close_stdin_tx: tokio::sync::watch::Sender<bool>,
pub close_stdin_rx: tokio::sync::watch::Receiver<bool>,
// wait for stdin copy task to finish
pub wg_input: WaitGroup,
// used to wait for all process outputs to be copied to the vsock streams // used to wait for all process outputs to be copied to the vsock streams
// only used when tty is used. // only used when tty is used.
pub wg_output: WaitGroup, pub wg_output: WaitGroup,
@ -65,10 +70,15 @@ impl ProcessIo {
stdout: Option<VsockStream>, stdout: Option<VsockStream>,
stderr: Option<VsockStream>, stderr: Option<VsockStream>,
) -> Self { ) -> Self {
let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false);
ProcessIo { ProcessIo {
stdin, stdin,
stdout, stdout,
stderr, stderr,
close_stdin_tx,
close_stdin_rx,
wg_input: WaitGroup::new(),
wg_output: WaitGroup::new(), wg_output: WaitGroup::new(),
} }
} }
@ -174,35 +184,22 @@ impl Process {
} else { } else {
info!(logger, "created console socket!"); info!(logger, "created console socket!");
if let Some(proc_io) = p.proc_io.as_mut() { let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?;
// In passfd io mode p.parent_stdin = Some(pstdin);
if let Some(stdin) = proc_io.stdin.take() { p.stdin = Some(stdin);
p.stdin = Some(stdin.as_raw_fd());
std::mem::forget(stdin);
}
// p.stdin can be None if the connection for stdin is not provided.
} else {
let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?;
p.parent_stdin = Some(pstdin);
p.stdin = Some(stdin);
}
if let Some(proc_io) = p.proc_io.as_mut() { if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() {
if let Some(stdout) = proc_io.stdout.take() { p.stdout = Some(stdout.as_raw_fd());
p.stdout = Some(stdout.as_raw_fd()); std::mem::forget(stdout);
std::mem::forget(stdout);
}
} else { } else {
let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?;
p.parent_stdout = Some(pstdout); p.parent_stdout = Some(pstdout);
p.stdout = Some(stdout); p.stdout = Some(stdout);
} }
if let Some(proc_io) = p.proc_io.as_mut() { if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() {
if let Some(stderr) = proc_io.stderr.take() { p.stderr = Some(stderr.as_raw_fd());
p.stderr = Some(stderr.as_raw_fd()); std::mem::forget(stderr);
std::mem::forget(stderr);
}
} else { } else {
let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?;
p.parent_stderr = Some(pstderr); p.parent_stderr = Some(pstderr);
@ -218,8 +215,14 @@ impl Process {
notify.notify_waiters(); notify.notify_waiters();
} }
/// won't be use in passfd io mode. pub async fn close_stdin(&mut self) {
pub fn close_stdin(&mut self) { if let Some(proc_io) = &mut self.proc_io {
// notify io copy task to close stdin stream
let _ = proc_io.close_stdin_tx.send(true);
// wait for io copy task to finish
proc_io.wg_input.wait().await;
}
close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, term_master, TermMaster);
close_process_stream!(self, parent_stdin, ParentStdin); close_process_stream!(self, parent_stdin, ParentStdin);

View File

@ -851,7 +851,7 @@ impl agent_ttrpc::AgentService for AgentService {
) )
})?; })?;
p.close_stdin(); p.close_stdin().await;
Ok(Empty::new()) Ok(Empty::new())
} }

View File

@ -487,31 +487,6 @@ impl VsockEpollListener for VsockConnection {
self.pending_rx.insert(PendingRx::CreditUpdate); self.pending_rx.insert(PendingRx::CreditUpdate);
} }
} }
if evset.contains(epoll::Events::EPOLLHUP)
&& !evset.contains(epoll::Events::EPOLLIN)
&& !evset.contains(epoll::Events::EPOLLOUT)
{
// The host stream has been hung up. We'll kill this connection.
warn!(
"vsock: connection received EPOLLHUP event: lp={}, pp={}",
self.local_port, self.peer_port
);
self.kill();
}
if evset.contains(epoll::Events::EPOLLERR)
&& !evset.contains(epoll::Events::EPOLLIN)
&& !evset.contains(epoll::Events::EPOLLOUT)
{
// The host stream has encountered an error. We'll kill this
// connection.
warn!(
"vsock: connection received EPOLLERR event: lp={}, pp={}",
self.local_port, self.peer_port
);
self.kill();
}
} }
} }

View File

@ -337,15 +337,12 @@ impl Process {
/// Close the stdin of the process in container. /// Close the stdin of the process in container.
pub async fn close_io(&mut self, agent: Arc<dyn Agent>) { pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
if self.passfd_io.is_some() { // In passfd io mode, the stdin close and sync logic is handled
// In passfd io mode, if containerd closes stdin stream, the // in the agent side.
// agent can get the close event from the vsock connection. if self.passfd_io.is_none() {
// so we just return here. self.wg_stdin.wait().await;
return;
} }
self.wg_stdin.wait().await;
let req = agent::CloseStdinRequest { let req = agent::CloseStdinRequest {
process_id: self.process.clone().into(), process_id: self.process.clone().into(),
}; };