Merge pull request #3975 from bergwolf/github/backport-stable-2.4

agent: Signal the whole process group
This commit is contained in:
Fupan Li 2022-03-28 18:26:12 +08:00 committed by GitHub
commit 565efd1bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 17 deletions

View File

@ -19,6 +19,7 @@ use ttrpc::{
};
use anyhow::{anyhow, Context, Result};
use cgroups::freezer::FreezerState;
use oci::{LinuxNamespace, Root, Spec};
use protobuf::{Message, RepeatedField, SingularPtrField};
use protocols::agent::{
@ -40,8 +41,9 @@ use rustjail::specconv::CreateOpts;
use nix::errno::Errno;
use nix::mount::MsFlags;
use nix::sys::signal::Signal;
use nix::sys::stat;
use nix::sys::{signal, stat};
use nix::unistd::{self, Pid};
use rustjail::cgroups::Manager;
use rustjail::process::ProcessOperations;
use sysinfo::{DiskExt, System, SystemExt};
@ -389,7 +391,6 @@ impl AgentService {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
info!(
sl!(),
@ -398,27 +399,97 @@ impl AgentService {
"exec-id" => eid.clone(),
);
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
let mut signal = Signal::try_from(req.signal as i32).map_err(|e| {
let mut sig = Signal::try_from(req.signal as i32).map_err(|e| {
anyhow!(e).context(format!(
"failed to convert {:?} to signal (container-id: {}, exec-id: {})",
req.signal, cid, eid
))
})?;
// For container initProcess, if it hasn't installed handler for "SIGTERM" signal,
// it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal
// instead of "SIGTERM" to terminate it.
if p.init && signal == Signal::SIGTERM && !is_signal_handled(p.pid, req.signal) {
signal = Signal::SIGKILL;
{
let mut sandbox = s.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// For container initProcess, if it hasn't installed handler for "SIGTERM" signal,
// it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal
// instead of "SIGTERM" to terminate it.
if p.init && sig == Signal::SIGTERM && !is_signal_handled(p.pid, sig as u32) {
sig = Signal::SIGKILL;
}
p.signal(sig)?;
}
p.signal(signal)?;
if eid.is_empty() {
// eid is empty, signal all the remaining processes in the container cgroup
info!(
sl!(),
"signal all the remaining processes";
"container-id" => cid.clone(),
"exec-id" => eid.clone(),
);
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await {
warn!(
sl!(),
"freeze cgroup failed";
"container-id" => cid.clone(),
"exec-id" => eid.clone(),
"error" => format!("{:?}", err),
);
}
let pids = self.get_pids(&cid).await?;
for pid in pids.iter() {
if let Err(err) = signal::kill(Pid::from_raw(*pid), Some(sig)) {
warn!(
sl!(),
"signal failed";
"container-id" => cid.clone(),
"exec-id" => eid.clone(),
"pid" => pid,
"error" => format!("{:?}", err),
);
}
}
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Thawed).await {
warn!(
sl!(),
"unfreeze cgroup failed";
"container-id" => cid.clone(),
"exec-id" => eid.clone(),
"error" => format!("{:?}", err),
);
}
}
Ok(())
}
async fn freeze_cgroup(&self, cid: &str, state: FreezerState) -> Result<()> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
let cm = ctr
.cgroup_manager
.as_ref()
.ok_or_else(|| anyhow!("cgroup manager not exist"))?;
cm.freeze(state)?;
Ok(())
}
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
let cm = ctr
.cgroup_manager
.as_ref()
.ok_or_else(|| anyhow!("cgroup manager not exist"))?;
let pids = cm.get_pids()?;
Ok(pids)
}
#[instrument]
async fn do_wait_process(
&self,

View File

@ -776,6 +776,8 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E
return empty, errors.New("The exec process does not exist")
}
processStatus = execs.status
} else {
r.All = true
}
// According to CRI specs, kubelet will call StopPodSandbox()

View File

@ -8,12 +8,14 @@ package containerdshim
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd/api/types/task"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
)
func startContainer(ctx context.Context, s *service, c *container) (retErr error) {
shimLog.WithField("container", c.id).Debug("start container")
defer func() {
if retErr != nil {
// notify the wait goroutine to continue
@ -78,7 +80,8 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
go ioCopy(shimLog.WithField("container", c.id), c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
// close the io exit channel, since there is no io for this container,
// otherwise the following wait goroutine will hang on this channel.
@ -94,6 +97,10 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
}
func startExec(ctx context.Context, s *service, containerID, execID string) (e *exec, retErr error) {
shimLog.WithFields(logrus.Fields{
"container": containerID,
"exec": execID,
}).Debug("start container execution")
// start an exec
c, err := s.getContainer(containerID)
if err != nil {
@ -140,7 +147,10 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e *
}
execs.ttyio = tty
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go ioCopy(shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}), execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go wait(ctx, s, c, execID)

View File

@ -12,6 +12,7 @@ import (
"syscall"
"github.com/containerd/fifo"
"github.com/sirupsen/logrus"
)
// The buffer size used to specify the buffer for IO streams copy
@ -86,18 +87,20 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil
}
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
if tty.Stdin != nil {
wg.Add(1)
go func() {
shimLog.Debug("stdin io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
shimLog.Debug("stdin io stream copy exited")
}()
}
@ -105,6 +108,7 @@ func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteClo
wg.Add(1)
go func() {
shimLog.Debug("stdout io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
@ -113,20 +117,24 @@ func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteClo
// close stdin to make the other routine stop
tty.Stdin.Close()
}
shimLog.Debug("stdout io stream copy exited")
}()
}
if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
shimLog.Debug("stderr io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
shimLog.Debug("stderr io stream copy exited")
}()
}
wg.Wait()
tty.close()
close(exitch)
shimLog.Debug("all io stream copy goroutines exited")
}

View File

@ -7,6 +7,7 @@ package containerdshim
import (
"context"
"github.com/sirupsen/logrus"
"io"
"os"
"path/filepath"
@ -179,7 +180,7 @@ func TestIoCopy(t *testing.T) {
defer tty.close()
// start the ioCopy threads : copy from src to dst
go ioCopy(exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR)
go ioCopy(logrus.WithContext(context.Background()), exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR)
var firstW, secondW, thirdW io.WriteCloser
var firstR, secondR, thirdR io.Reader

View File

@ -31,12 +31,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32,
if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
shimLog.WithField("container", c.id).Debug("The container io streams closed")
} else {
execs, err = c.getExec(execID)
if err != nil {
return exitCode255, err
}
<-execs.exitIOch
shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}).Debug("The container process io streams closed")
//This wait could be triggered before exec start which
//will get the exec's id, thus this assignment must after
//the exec exit, to make sure it get the exec's id.
@ -82,13 +87,17 @@ func wait(ctx context.Context, s *service, c *container, execID string) (int32,
c.exitTime = timeStamp
c.exitCh <- uint32(ret)
shimLog.WithField("container", c.id).Debug("The container status is StatusStopped")
} else {
execs.status = task.StatusStopped
execs.exitCode = ret
execs.exitTime = timeStamp
execs.exitCh <- uint32(ret)
shimLog.WithFields(logrus.Fields{
"container": c.id,
"exec": execID,
}).Debug("The container exec status is StatusStopped")
}
s.mu.Unlock()