diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 6cfaef7ff3..c809213360 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -256,13 +256,13 @@ impl ContainerInner { pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> { match process.process_type { - ProcessType::Container => self.init_process.close_io().await, + ProcessType::Container => self.init_process.close_io(self.agent.clone()).await, ProcessType::Exec => { let exec = self .exec_processes .get_mut(&process.exec_id) .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; - exec.process.close_io().await; + exec.process.close_io(self.agent.clone()).await; } }; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 334488453a..559475b74a 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -131,7 +131,7 @@ impl Process { ) -> Result<()> { info!(self.logger, "run io copy for {}", io_name); let io_name = io_name.to_string(); - let logger = self.logger.new(o!("io name" => io_name)); + let logger = self.logger.new(o!("io_name" => io_name)); let _ = tokio::spawn(async move { loop { match tokio::io::copy(&mut reader, &mut writer).await { @@ -141,9 +141,11 @@ impl Process { continue; } } - warn!(logger, "io: failed to copy stream {}", e); + warn!(logger, "run_io_copy: failed to copy stream: {}", e); + } + Ok(length) => { + warn!(logger, "run_io_copy: stop to copy stream length {}", length) } - Ok(length) => warn!(logger, "io: stop to copy stream length {}", length), }; break; } @@ -163,8 +165,8 @@ impl Process { let status = self.status.clone(); let _ = tokio::spawn(async move { - //wait on all of the container's io stream terminated - info!(logger, "begin wait group io",); + // wait on all of the container's io stream terminated + info!(logger, "begin wait group io"); wg.wait().await; info!(logger, "end wait group for io"); @@ -223,8 +225,16 @@ impl Process { *status = ProcessStatus::Stopped; } - pub async fn close_io(&mut self) { + pub async fn close_io(&mut self, agent: Arc) { self.wg_stdin.wait().await; + + let req = agent::CloseStdinRequest { + process_id: self.process.clone().into(), + }; + + if let Err(e) = agent.close_stdin(req).await { + warn!(self.logger, "failed clsoe process io: {:?}", e); + } } pub async fn get_status(&self) -> ProcessStatus { diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index 447207a851..3a98a083e9 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -77,5 +77,6 @@ impl_service!( wait | api::WaitRequest | api::WaitResponse, stats | api::StatsRequest | api::StatsResponse, connect | api::ConnectRequest | api::ConnectResponse, - shutdown | api::ShutdownRequest | api::Empty + shutdown | api::ShutdownRequest | api::Empty, + close_io | api::CloseIORequest | api::Empty );