diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 0dc6fa6dfe..9786c53036 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -283,6 +283,27 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + s.Lock() + defer s.Unlock() + + c, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + //start a container + if r.ExecID == "" { + err = startContainer(ctx, s, c) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &taskAPI.StartResponse{ + Pid: s.pid, + }, nil + } + + //start an exec return nil, errdefs.ErrNotImplemented } @@ -382,3 +403,13 @@ func (s *service) checkProcesses(e exit) { } return } + +func (s *service) getContainer(id string) (*container, error) { + c := s.containers[id] + + if c == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container does not exist %s", id) + } + + return c, nil +} diff --git a/containerd-shim-v2/start.go b/containerd-shim-v2/start.go new file mode 100644 index 0000000000..3a6c1ef24c --- /dev/null +++ b/containerd-shim-v2/start.go @@ -0,0 +1,71 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "fmt" + + "github.com/containerd/containerd/api/types/task" + "github.com/kata-containers/runtime/pkg/katautils" +) + +func startContainer(ctx context.Context, s *service, c *container) error { + //start a container + if c.cType == "" { + err := fmt.Errorf("Bug, the container %s type is empty", c.id) + return err + } + + if s.sandbox == nil { + err := fmt.Errorf("Bug, the sandbox hasn't been created for this container %s", c.id) + return err + } + + if c.cType.IsSandbox() { + err := s.sandbox.Start() + if err != nil { + return err + } + } else { + _, err := s.sandbox.StartContainer(c.id) + if err != nil { + return err + } + } + + // Run post-start OCI hooks. + err := katautils.EnterNetNS(s.sandbox.GetNetNs(), func() error { + return katautils.PostStartHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle) + }) + if err != nil { + return err + } + + c.status = task.StatusRunning + + stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, c.id) + if err != nil { + return err + } + + if c.stdin != "" || c.stdout != "" || c.stderr != "" { + tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) + if err != nil { + return err + } + c.ttyio = tty + go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) + } else { + //close the io exit channel, since there is no io for this container, + //otherwise the following wait goroutine will hang on this channel. + close(c.exitIOch) + } + + go wait(s, c, "") + + return nil +} diff --git a/containerd-shim-v2/stream.go b/containerd-shim-v2/stream.go index f5489c6002..a551686309 100644 --- a/containerd-shim-v2/stream.go +++ b/containerd-shim-v2/stream.go @@ -5,7 +5,24 @@ package containerdshim import ( + "context" "io" + "sync" + "syscall" + + "github.com/containerd/fifo" +) + +// The buffer size used to specify the buffer for IO streams copy +const bufSize = 32 << 10 + +var ( + bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, bufSize) + return &buffer + }, + } ) type ttyIO struct { @@ -13,3 +30,97 @@ type ttyIO struct { Stdout io.Writer Stderr io.Writer } + +func (tty *ttyIO) close() { + + if tty.Stdin != nil { + tty.Stdin.Close() + } + cf := func(w io.Writer) { + if w == nil { + return + } + if c, ok := w.(io.WriteCloser); ok { + c.Close() + } + } + cf(tty.Stdout) + cf(tty.Stderr) +} + +func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) { + var in io.ReadCloser + var outw io.Writer + var errw io.Writer + var err error + + if stdin != "" { + in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + } + + if stdout != "" { + outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + } + + if !console && stderr != "" { + errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + } + + ttyIO := &ttyIO{ + Stdin: in, + Stdout: outw, + Stderr: errw, + } + + return ttyIO, nil +} + +func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { + var wg sync.WaitGroup + var closeOnce sync.Once + + if tty.Stdin != nil { + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(stdinPipe, tty.Stdin, *p) + wg.Done() + }() + } + + if tty.Stdout != nil { + wg.Add(1) + + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(tty.Stdout, stdoutPipe, *p) + wg.Done() + closeOnce.Do(tty.close) + }() + } + + if tty.Stderr != nil && stderrPipe != nil { + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(tty.Stderr, stderrPipe, *p) + wg.Done() + }() + } + + wg.Wait() + closeOnce.Do(tty.close) + close(exitch) +} diff --git a/containerd-shim-v2/utils.go b/containerd-shim-v2/utils.go index 959db7d28f..13cb323c3e 100644 --- a/containerd-shim-v2/utils.go +++ b/containerd-shim-v2/utils.go @@ -10,6 +10,7 @@ import ( "context" "fmt" "os" + "time" cdshim "github.com/containerd/containerd/runtime/v2/shim" "github.com/kata-containers/runtime/pkg/katautils" @@ -18,6 +19,16 @@ import ( "github.com/opencontainers/runtime-spec/specs-go" ) +func cReap(s *service, status int, id, execid string, exitat time.Time) { + s.ec <- exit{ + timestamp: exitat, + pid: s.pid, + status: status, + id: id, + execid: execid, + } +} + func validBundle(containerID, bundlePath string) (string, error) { // container ID MUST be provided. if containerID == "" { diff --git a/containerd-shim-v2/wait.go b/containerd-shim-v2/wait.go new file mode 100644 index 0000000000..438c28624f --- /dev/null +++ b/containerd-shim-v2/wait.go @@ -0,0 +1,56 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "time" + + "github.com/containerd/containerd/api/types/task" + "github.com/sirupsen/logrus" +) + +func wait(s *service, c *container, execID string) (int32, error) { + var execs *exec + var err error + + processID := c.id + + if execID == "" { + //wait until the io closed, then wait the container + <-c.exitIOch + } + + ret, err := s.sandbox.WaitProcess(c.id, processID) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "container": c.id, + "pid": processID, + }).Error("Wait for process failed") + } + + if execID == "" { + c.exitCh <- uint32(ret) + } else { + execs.exitCh <- uint32(ret) + } + + timeStamp := time.Now() + c.mu.Lock() + if execID == "" { + c.status = task.StatusStopped + c.exit = uint32(ret) + c.time = timeStamp + } else { + execs.status = task.StatusStopped + execs.exitCode = ret + execs.exitTime = timeStamp + } + c.mu.Unlock() + + go cReap(s, int(ret), c.id, execID, timeStamp) + + return ret, nil +}