From 269c940edc80ba79a32f1a7f4b307c8ac3c08e44 Mon Sep 17 00:00:00 2001 From: fupan Date: Mon, 19 Nov 2018 11:09:49 +0800 Subject: [PATCH] containerd-shim-kata-v2: add the exec service support Add the Exec api support for exec an process in a running container. Signed-off-by: fupan --- containerd-shim-v2/exec.go | 91 +++++++++++++++++++++++++++++++++++ containerd-shim-v2/service.go | 39 ++++++++++++--- containerd-shim-v2/start.go | 44 +++++++++++++++++ containerd-shim-v2/wait.go | 10 ++++ 4 files changed, 176 insertions(+), 8 deletions(-) diff --git a/containerd-shim-v2/exec.go b/containerd-shim-v2/exec.go index a96ec65f2..d730c2666 100644 --- a/containerd-shim-v2/exec.go +++ b/containerd-shim-v2/exec.go @@ -6,10 +6,16 @@ package containerdshim import ( + "encoding/json" + "fmt" + "strings" "time" "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + googleProtobuf "github.com/gogo/protobuf/types" vc "github.com/kata-containers/runtime/virtcontainers" + specs "github.com/opencontainers/runtime-spec/specs-go" ) type exec struct { @@ -37,3 +43,88 @@ type tty struct { width uint32 terminal bool } + +func getEnvs(envs []string) []vc.EnvVar { + var vcEnvs = []vc.EnvVar{} + var env vc.EnvVar + + for _, v := range envs { + pair := strings.SplitN(v, "=", 2) + + if len(pair) == 2 { + env = vc.EnvVar{Var: pair[0], Value: pair[1]} + } else if len(pair) == 1 { + env = vc.EnvVar{Var: pair[0], Value: ""} + } + + vcEnvs = append(vcEnvs, env) + } + + return vcEnvs +} + +func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *googleProtobuf.Any) (*exec, error) { + var height uint32 + var width uint32 + + if jspec == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "googleProtobuf.Any points to nil") + } + + // process exec request + var spec specs.Process + if err := json.Unmarshal(jspec.Value, &spec); err != nil { + return nil, err + } + + if spec.ConsoleSize != nil { + height = uint32(spec.ConsoleSize.Height) + width = uint32(spec.ConsoleSize.Width) + } + + tty := &tty{ + stdin: stdin, + stdout: stdout, + stderr: stderr, + height: height, + width: width, + terminal: terminal, + } + + cmds := &vc.Cmd{ + Args: spec.Args, + Envs: getEnvs(spec.Env), + User: fmt.Sprintf("%d", spec.User.UID), + PrimaryGroup: fmt.Sprintf("%d", spec.User.GID), + WorkDir: spec.Cwd, + Interactive: terminal, + Detach: !terminal, + NoNewPrivileges: spec.NoNewPrivileges, + } + + exec := &exec{ + container: c, + cmds: cmds, + tty: tty, + exitCode: exitCode255, + exitIOch: make(chan struct{}), + exitCh: make(chan uint32, 1), + status: task.StatusCreated, + } + + return exec, nil +} + +func (c *container) getExec(id string) (*exec, error) { + if c.execs == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id) + } + + exec := c.execs[id] + + if exec == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id) + } + + return exec, nil +} diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 9786c5303..3087bff22 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -37,7 +37,8 @@ const ( // it to containerd as the containerd event format. bufferSize = 32 - chSize = 128 + chSize = 128 + exitCode255 = 255 ) var ( @@ -297,14 +298,17 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if err != nil { return nil, errdefs.ToGRPC(err) } - - return &taskAPI.StartResponse{ - Pid: s.pid, - }, nil + } else { + //start an exec + _, err = startExec(ctx, s, r.ID, r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } } - //start an exec - return nil, errdefs.ErrNotImplemented + return &taskAPI.StartResponse{ + Pid: s.pid, + }, nil } // Delete the initial process and container @@ -314,7 +318,26 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - return nil, errdefs.ErrNotImplemented + s.Lock() + defer s.Unlock() + + c, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + if execs := c.execs[r.ExecID]; execs != nil { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + + execs, err := newExec(c, r.Stdin, r.Stdout, r.Stderr, r.Terminal, r.Spec) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + c.execs[r.ExecID] = execs + + return empty, nil } // ResizePty of a process diff --git a/containerd-shim-v2/start.go b/containerd-shim-v2/start.go index 3a6c1ef24..71a90ed13 100644 --- a/containerd-shim-v2/start.go +++ b/containerd-shim-v2/start.go @@ -69,3 +69,47 @@ func startContainer(ctx context.Context, s *service, c *container) error { return nil } + +func startExec(ctx context.Context, s *service, containerID, execID string) (*exec, error) { + //start an exec + c, err := s.getContainer(containerID) + if err != nil { + return nil, err + } + + execs, err := c.getExec(execID) + if err != nil { + return nil, err + } + + _, proc, err := s.sandbox.EnterContainer(containerID, *execs.cmds) + if err != nil { + err := fmt.Errorf("cannot enter container %s, with err %s", containerID, err) + return nil, err + } + execs.id = proc.Token + + execs.status = task.StatusRunning + if execs.tty.height != 0 && execs.tty.width != 0 { + err = s.sandbox.WinsizeProcess(c.id, execs.id, execs.tty.height, execs.tty.width) + if err != nil { + return nil, err + } + } + + stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, execs.id) + if err != nil { + return nil, err + } + tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) + if err != nil { + return nil, err + } + execs.ttyio = tty + + go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) + + go wait(s, c, execID) + + return execs, nil +} diff --git a/containerd-shim-v2/wait.go b/containerd-shim-v2/wait.go index 438c28624..6b986a72e 100644 --- a/containerd-shim-v2/wait.go +++ b/containerd-shim-v2/wait.go @@ -21,6 +21,16 @@ func wait(s *service, c *container, execID string) (int32, error) { if execID == "" { //wait until the io closed, then wait the container <-c.exitIOch + } else { + execs, err = c.getExec(execID) + if err != nil { + return exitCode255, err + } + <-execs.exitIOch + //This wait could be triggered before exec start which + //will get the exec's id, thus this assignment must after + //the exec exit, to make sure it get the exec's id. + processID = execs.id } ret, err := s.sandbox.WaitProcess(c.id, processID)