diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 2325347160..e0c205cbe0 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -19,6 +19,7 @@ use ttrpc::{ }; use anyhow::{anyhow, Context, Result}; +use cgroups::freezer::FreezerState; use oci::{LinuxNamespace, Root, Spec}; use protobuf::{Message, RepeatedField, SingularPtrField}; use protocols::agent::{ @@ -40,8 +41,9 @@ use rustjail::specconv::CreateOpts; use nix::errno::Errno; use nix::mount::MsFlags; use nix::sys::signal::Signal; -use nix::sys::stat; +use nix::sys::{signal, stat}; use nix::unistd::{self, Pid}; +use rustjail::cgroups::Manager; use rustjail::process::ProcessOperations; use sysinfo::{DiskExt, System, SystemExt}; @@ -391,7 +393,6 @@ impl AgentService { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; info!( sl!(), @@ -400,27 +401,97 @@ impl AgentService { "exec-id" => eid.clone(), ); - let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; - - let mut signal = Signal::try_from(req.signal as i32).map_err(|e| { + let mut sig = Signal::try_from(req.signal as i32).map_err(|e| { anyhow!(e).context(format!( "failed to convert {:?} to signal (container-id: {}, exec-id: {})", req.signal, cid, eid )) })?; - - // For container initProcess, if it hasn't installed handler for "SIGTERM" signal, - // it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal - // instead of "SIGTERM" to terminate it. - if p.init && signal == Signal::SIGTERM && !is_signal_handled(p.pid, req.signal) { - signal = Signal::SIGKILL; + { + let mut sandbox = s.lock().await; + let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + // For container initProcess, if it hasn't installed handler for "SIGTERM" signal, + // it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal + // instead of "SIGTERM" to terminate it. + if p.init && sig == Signal::SIGTERM && !is_signal_handled(p.pid, sig as u32) { + sig = Signal::SIGKILL; + } + p.signal(sig)?; } - p.signal(signal)?; + if eid.is_empty() { + // eid is empty, signal all the remaining processes in the container cgroup + info!( + sl!(), + "signal all the remaining processes"; + "container-id" => cid.clone(), + "exec-id" => eid.clone(), + ); + if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await { + warn!( + sl!(), + "freeze cgroup failed"; + "container-id" => cid.clone(), + "exec-id" => eid.clone(), + "error" => format!("{:?}", err), + ); + } + + let pids = self.get_pids(&cid).await?; + for pid in pids.iter() { + if let Err(err) = signal::kill(Pid::from_raw(*pid), Some(sig)) { + warn!( + sl!(), + "signal failed"; + "container-id" => cid.clone(), + "exec-id" => eid.clone(), + "pid" => pid, + "error" => format!("{:?}", err), + ); + } + } + if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Thawed).await { + warn!( + sl!(), + "unfreeze cgroup failed"; + "container-id" => cid.clone(), + "exec-id" => eid.clone(), + "error" => format!("{:?}", err), + ); + } + } Ok(()) } + async fn freeze_cgroup(&self, cid: &str, state: FreezerState) -> Result<()> { + let s = self.sandbox.clone(); + let mut sandbox = s.lock().await; + let ctr = sandbox + .get_container(cid) + .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; + let cm = ctr + .cgroup_manager + .as_ref() + .ok_or_else(|| anyhow!("cgroup manager not exist"))?; + cm.freeze(state)?; + Ok(()) + } + + async fn get_pids(&self, cid: &str) -> Result> { + let s = self.sandbox.clone(); + let mut sandbox = s.lock().await; + let ctr = sandbox + .get_container(cid) + .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; + let cm = ctr + .cgroup_manager + .as_ref() + .ok_or_else(|| anyhow!("cgroup manager not exist"))?; + let pids = cm.get_pids()?; + Ok(pids) + } + #[instrument] async fn do_wait_process( &self, diff --git a/src/runtime/pkg/containerd-shim-v2/service.go b/src/runtime/pkg/containerd-shim-v2/service.go index 8e20ae82fb..72f3f14a04 100644 --- a/src/runtime/pkg/containerd-shim-v2/service.go +++ b/src/runtime/pkg/containerd-shim-v2/service.go @@ -776,6 +776,8 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E return empty, errors.New("The exec process does not exist") } processStatus = execs.status + } else { + r.All = true } // According to CRI specs, kubelet will call StopPodSandbox() diff --git a/src/runtime/pkg/containerd-shim-v2/start.go b/src/runtime/pkg/containerd-shim-v2/start.go index 5eede489f3..65bfe6d9a1 100644 --- a/src/runtime/pkg/containerd-shim-v2/start.go +++ b/src/runtime/pkg/containerd-shim-v2/start.go @@ -8,12 +8,14 @@ package containerdshim import ( "context" "fmt" + "github.com/sirupsen/logrus" "github.com/containerd/containerd/api/types/task" "github.com/kata-containers/kata-containers/src/runtime/pkg/katautils" ) func startContainer(ctx context.Context, s *service, c *container) (retErr error) { + shimLog.WithField("container", c.id).Debug("start container") defer func() { if retErr != nil { // notify the wait goroutine to continue @@ -78,7 +80,8 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error return err } c.ttyio = tty - go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr) + + go ioCopy(shimLog.WithField("container", c.id), c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr) } else { // close the io exit channel, since there is no io for this container, // otherwise the following wait goroutine will hang on this channel. @@ -94,6 +97,10 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error } func startExec(ctx context.Context, s *service, containerID, execID string) (e *exec, retErr error) { + shimLog.WithFields(logrus.Fields{ + "container": containerID, + "exec": execID, + }).Debug("start container execution") // start an exec c, err := s.getContainer(containerID) if err != nil { @@ -140,7 +147,10 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e * } execs.ttyio = tty - go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr) + go ioCopy(shimLog.WithFields(logrus.Fields{ + "container": c.id, + "exec": execID, + }), execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr) go wait(ctx, s, c, execID) diff --git a/src/runtime/pkg/containerd-shim-v2/stream.go b/src/runtime/pkg/containerd-shim-v2/stream.go index f976c49ef4..58045359b3 100644 --- a/src/runtime/pkg/containerd-shim-v2/stream.go +++ b/src/runtime/pkg/containerd-shim-v2/stream.go @@ -12,6 +12,7 @@ import ( "syscall" "github.com/containerd/fifo" + "github.com/sirupsen/logrus" ) // The buffer size used to specify the buffer for IO streams copy @@ -86,18 +87,20 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) ( return ttyIO, nil } -func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { +func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { var wg sync.WaitGroup if tty.Stdin != nil { wg.Add(1) go func() { + shimLog.Debug("stdin io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(stdinPipe, tty.Stdin, *p) // notify that we can close process's io safely. close(stdinCloser) wg.Done() + shimLog.Debug("stdin io stream copy exited") }() } @@ -105,6 +108,7 @@ func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteClo wg.Add(1) go func() { + shimLog.Debug("stdout io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(tty.Stdout, stdoutPipe, *p) @@ -113,20 +117,24 @@ func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteClo // close stdin to make the other routine stop tty.Stdin.Close() } + shimLog.Debug("stdout io stream copy exited") }() } if tty.Stderr != nil && stderrPipe != nil { wg.Add(1) go func() { + shimLog.Debug("stderr io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(tty.Stderr, stderrPipe, *p) wg.Done() + shimLog.Debug("stderr io stream copy exited") }() } wg.Wait() tty.close() close(exitch) + shimLog.Debug("all io stream copy goroutines exited") } diff --git a/src/runtime/pkg/containerd-shim-v2/stream_test.go b/src/runtime/pkg/containerd-shim-v2/stream_test.go index 01b988b1a7..9b848e52bd 100644 --- a/src/runtime/pkg/containerd-shim-v2/stream_test.go +++ b/src/runtime/pkg/containerd-shim-v2/stream_test.go @@ -7,6 +7,7 @@ package containerdshim import ( "context" + "github.com/sirupsen/logrus" "io" "os" "path/filepath" @@ -179,7 +180,7 @@ func TestIoCopy(t *testing.T) { defer tty.close() // start the ioCopy threads : copy from src to dst - go ioCopy(exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR) + go ioCopy(logrus.WithContext(context.Background()), exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR) var firstW, secondW, thirdW io.WriteCloser var firstR, secondR, thirdR io.Reader diff --git a/src/runtime/pkg/containerd-shim-v2/wait.go b/src/runtime/pkg/containerd-shim-v2/wait.go index c6587c16cd..0eeac976e0 100644 --- a/src/runtime/pkg/containerd-shim-v2/wait.go +++ b/src/runtime/pkg/containerd-shim-v2/wait.go @@ -31,12 +31,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32, if execID == "" { //wait until the io closed, then wait the container <-c.exitIOch + shimLog.WithField("container", c.id).Debug("The container io streams closed") } else { execs, err = c.getExec(execID) if err != nil { return exitCode255, err } <-execs.exitIOch + shimLog.WithFields(logrus.Fields{ + "container": c.id, + "exec": execID, + }).Debug("The container process io streams closed") //This wait could be triggered before exec start which //will get the exec's id, thus this assignment must after //the exec exit, to make sure it get the exec's id. @@ -82,13 +87,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32, c.exitTime = timeStamp c.exitCh <- uint32(ret) - + shimLog.WithField("container", c.id).Debug("The container status is StatusStopped") } else { execs.status = task.StatusStopped execs.exitCode = ret execs.exitTime = timeStamp execs.exitCh <- uint32(ret) + shimLog.WithFields(logrus.Fields{ + "container": c.id, + "exec": execID, + }).Debug("The container exec status is StatusStopped") } s.mu.Unlock()