diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 853123cbcf..4b24a7fee6 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -200,15 +200,8 @@ impl Process { } pub async fn close_stdin(&mut self) { - // stdin will be closed automatically in passfd-io senario - if self.proc_io.is_some() { - return; - } - close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); - - self.notify_term_close(); } pub fn cleanup_process_stream(&mut self) { diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index aa598bc7ca..70111be921 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -584,25 +584,32 @@ impl AgentService { let cid = req.container_id; let eid = req.exec_id; - let writer = { - let mut sandbox = self.sandbox.lock().await; - let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; - - // use ptmx io - if p.term_master.is_some() { - p.get_writer(StreamType::TermMaster) - } else { - // use piped io - p.get_writer(StreamType::ParentStdin) - } - }; - - let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?; - writer.lock().await.write_all(req.data.as_slice()).await?; - let mut resp = WriteStreamResponse::new(); resp.set_len(req.data.len() as u32); + // EOF of stdin + if req.data.is_empty() { + let mut sandbox = self.sandbox.lock().await; + let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + p.close_stdin().await; + } else { + let writer = { + let mut sandbox = self.sandbox.lock().await; + let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + + // use ptmx io + if p.term_master.is_some() { + p.get_writer(StreamType::TermMaster) + } else { + // use piped io + p.get_writer(StreamType::ParentStdin) + } + }; + + let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?; + writer.lock().await.write_all(req.data.as_slice()).await?; + } + Ok(resp) } @@ -645,6 +652,7 @@ impl AgentService { biased; v = read_stream(&reader, req.len as usize) => { let vector = v?; + let mut resp = ReadStreamResponse::new(); resp.set_data(vector); @@ -845,6 +853,9 @@ impl agent_ttrpc::AgentService for AgentService { ctx: &TtrpcContext, req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { + // The stdin will be closed when EOF is got in rpc `write_stdin`[runtime-rs] + // so this rpc will not be called anymore by runtime-rs. + trace_rpc_call!(ctx, "close_stdin", req); is_allowed(&req).await?;