diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 02de8a77b9..21929e4dd7 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -995,13 +995,25 @@ impl BaseContainer for LinuxContainer { // A reference count used to clean up the term master fd. let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) }); + // Copy from stdin to term_master if let Some(mut stdin_stream) = proc_io.stdin.take() { let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); let term_closer = term_closer.clone(); tokio::spawn(async move { - let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; - debug!(logger, "copy from stdin to term_master end: {:?}", res); + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + tokio::select! { + res = tokio::io::copy(&mut stdin_stream, &mut term_master) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + } + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + } + } + wgw_input.done(); std::mem::forget(term_master); // Avoid auto closing of term_master drop(term_closer); }); @@ -1022,22 +1034,35 @@ impl BaseContainer for LinuxContainer { } } } else { - // Allow null io in passfd io mode when vsock streams are not provided - child_stdin = if let Some(stdin) = p.stdin { - unsafe { std::process::Stdio::from_raw_fd(stdin) } - } else { - std::process::Stdio::null() - }; - child_stdout = if let Some(stdout) = p.stdout { - unsafe { std::process::Stdio::from_raw_fd(stdout) } - } else { - std::process::Stdio::null() - }; - child_stderr = if let Some(stderr) = p.stderr { - unsafe { std::process::Stdio::from_raw_fd(stderr) } - } else { - std::process::Stdio::null() - }; + let stdin = p.stdin.unwrap(); + let stdout = p.stdout.unwrap(); + let stderr = p.stderr.unwrap(); + child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) }; + child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) }; + child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; + + if let Some(proc_io) = &mut p.proc_io { + // Copy from stdin to parent_stdin + if let Some(mut stdin_stream) = proc_io.stdin.take() { + let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); + let logger = logger.clone(); + tokio::spawn(async move { + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin stream when containerd explicit requested. + tokio::select! { + res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin) => { + debug!(logger, "copy from stdin to parent_stdin end: {:?}", res); + } + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + } + } + wgw_input.done(); + }); + } + } } let pidns = get_pid_namespace(&self.logger, linux)?; diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index a568ed854e..2ce9f37219 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -54,6 +54,11 @@ pub struct ProcessIo { pub stdin: Option, pub stdout: Option, pub stderr: Option, + // used to close stdin stream + pub close_stdin_tx: tokio::sync::watch::Sender, + pub close_stdin_rx: tokio::sync::watch::Receiver, + // wait for stdin copy task to finish + pub wg_input: WaitGroup, // used to wait for all process outputs to be copied to the vsock streams // only used when tty is used. pub wg_output: WaitGroup, @@ -65,10 +70,15 @@ impl ProcessIo { stdout: Option, stderr: Option, ) -> Self { + let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false); + ProcessIo { stdin, stdout, stderr, + close_stdin_tx, + close_stdin_rx, + wg_input: WaitGroup::new(), wg_output: WaitGroup::new(), } } @@ -174,35 +184,22 @@ impl Process { } else { info!(logger, "created console socket!"); - if let Some(proc_io) = p.proc_io.as_mut() { - // In passfd io mode - if let Some(stdin) = proc_io.stdin.take() { - p.stdin = Some(stdin.as_raw_fd()); - std::mem::forget(stdin); - } - // p.stdin can be None if the connection for stdin is not provided. - } else { - let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; - p.parent_stdin = Some(pstdin); - p.stdin = Some(stdin); - } + let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; + p.parent_stdin = Some(pstdin); + p.stdin = Some(stdin); - if let Some(proc_io) = p.proc_io.as_mut() { - if let Some(stdout) = proc_io.stdout.take() { - p.stdout = Some(stdout.as_raw_fd()); - std::mem::forget(stdout); - } + if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { + p.stdout = Some(stdout.as_raw_fd()); + std::mem::forget(stdout); } else { let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stdout = Some(pstdout); p.stdout = Some(stdout); } - if let Some(proc_io) = p.proc_io.as_mut() { - if let Some(stderr) = proc_io.stderr.take() { - p.stderr = Some(stderr.as_raw_fd()); - std::mem::forget(stderr); - } + if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { + p.stderr = Some(stderr.as_raw_fd()); + std::mem::forget(stderr); } else { let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stderr = Some(pstderr); @@ -218,8 +215,14 @@ impl Process { notify.notify_waiters(); } - /// won't be use in passfd io mode. - pub fn close_stdin(&mut self) { + pub async fn close_stdin(&mut self) { + if let Some(proc_io) = &mut self.proc_io { + // notify io copy task to close stdin stream + let _ = proc_io.close_stdin_tx.send(true); + // wait for io copy task to finish + proc_io.wg_input.wait().await; + } + close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 3ed56e761b..0068eb9292 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -851,7 +851,7 @@ impl agent_ttrpc::AgentService for AgentService { ) })?; - p.close_stdin(); + p.close_stdin().await; Ok(Empty::new()) } diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index c3415cf6cb..e2ca7e3339 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -487,31 +487,6 @@ impl VsockEpollListener for VsockConnection { self.pending_rx.insert(PendingRx::CreditUpdate); } } - - if evset.contains(epoll::Events::EPOLLHUP) - && !evset.contains(epoll::Events::EPOLLIN) - && !evset.contains(epoll::Events::EPOLLOUT) - { - // The host stream has been hung up. We'll kill this connection. - warn!( - "vsock: connection received EPOLLHUP event: lp={}, pp={}", - self.local_port, self.peer_port - ); - self.kill(); - } - - if evset.contains(epoll::Events::EPOLLERR) - && !evset.contains(epoll::Events::EPOLLIN) - && !evset.contains(epoll::Events::EPOLLOUT) - { - // The host stream has encountered an error. We'll kill this - // connection. - warn!( - "vsock: connection received EPOLLERR event: lp={}, pp={}", - self.local_port, self.peer_port - ); - self.kill(); - } } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 93ca0bd99b..6cc7557702 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -337,15 +337,12 @@ impl Process { /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { - if self.passfd_io.is_some() { - // In passfd io mode, if containerd closes stdin stream, the - // agent can get the close event from the vsock connection. - // so we just return here. - return; + // In passfd io mode, the stdin close and sync logic is handled + // in the agent side. + if self.passfd_io.is_none() { + self.wg_stdin.wait().await; } - self.wg_stdin.wait().await; - let req = agent::CloseStdinRequest { process_id: self.process.clone().into(), };