Merge pull request #5271 from liubin/fix/4729-add-close-io-for-kubectl-cp

runtime-rs: fix shim close_io call to support kubectl cp
This commit is contained in:
Bin Liu 2022-09-29 13:10:49 +08:00 committed by GitHub
commit abbdf89a06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 9 deletions

View File

@ -256,13 +256,13 @@ impl ContainerInner {
pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> { pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> {
match process.process_type { match process.process_type {
ProcessType::Container => self.init_process.close_io().await, ProcessType::Container => self.init_process.close_io(self.agent.clone()).await,
ProcessType::Exec => { ProcessType::Exec => {
let exec = self let exec = self
.exec_processes .exec_processes
.get_mut(&process.exec_id) .get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?; .ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process.close_io().await; exec.process.close_io(self.agent.clone()).await;
} }
}; };

View File

@ -131,7 +131,7 @@ impl Process {
) -> Result<()> { ) -> Result<()> {
info!(self.logger, "run io copy for {}", io_name); info!(self.logger, "run io copy for {}", io_name);
let io_name = io_name.to_string(); let io_name = io_name.to_string();
let logger = self.logger.new(o!("io name" => io_name)); let logger = self.logger.new(o!("io_name" => io_name));
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
loop { loop {
match tokio::io::copy(&mut reader, &mut writer).await { match tokio::io::copy(&mut reader, &mut writer).await {
@ -141,9 +141,11 @@ impl Process {
continue; continue;
} }
} }
warn!(logger, "io: failed to copy stream {}", e); warn!(logger, "run_io_copy: failed to copy stream: {}", e);
}
Ok(length) => {
warn!(logger, "run_io_copy: stop to copy stream length {}", length)
} }
Ok(length) => warn!(logger, "io: stop to copy stream length {}", length),
}; };
break; break;
} }
@ -163,8 +165,8 @@ impl Process {
let status = self.status.clone(); let status = self.status.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
//wait on all of the container's io stream terminated // wait on all of the container's io stream terminated
info!(logger, "begin wait group io",); info!(logger, "begin wait group io");
wg.wait().await; wg.wait().await;
info!(logger, "end wait group for io"); info!(logger, "end wait group for io");
@ -223,8 +225,16 @@ impl Process {
*status = ProcessStatus::Stopped; *status = ProcessStatus::Stopped;
} }
pub async fn close_io(&mut self) { pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
self.wg_stdin.wait().await; self.wg_stdin.wait().await;
let req = agent::CloseStdinRequest {
process_id: self.process.clone().into(),
};
if let Err(e) = agent.close_stdin(req).await {
warn!(self.logger, "failed clsoe process io: {:?}", e);
}
} }
pub async fn get_status(&self) -> ProcessStatus { pub async fn get_status(&self) -> ProcessStatus {

View File

@ -77,5 +77,6 @@ impl_service!(
wait | api::WaitRequest | api::WaitResponse, wait | api::WaitRequest | api::WaitResponse,
stats | api::StatsRequest | api::StatsResponse, stats | api::StatsRequest | api::StatsResponse,
connect | api::ConnectRequest | api::ConnectResponse, connect | api::ConnectRequest | api::ConnectResponse,
shutdown | api::ShutdownRequest | api::Empty shutdown | api::ShutdownRequest | api::Empty,
close_io | api::CloseIORequest | api::Empty
); );