diff --git a/cli/containerd-shim-kata-v2/main.go b/cli/containerd-shim-kata-v2/main.go new file mode 100644 index 0000000000..88fe05a02f --- /dev/null +++ b/cli/containerd-shim-kata-v2/main.go @@ -0,0 +1,16 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/kata-containers/runtime/containerd-shim-v2" +) + +func main() { + shim.Run("io.containerd.kata.v2", containerdshim.New) +} + diff --git a/containerd-shim-v2/container.go b/containerd-shim-v2/container.go new file mode 100644 index 0000000000..86a90d3edf --- /dev/null +++ b/containerd-shim-v2/container.go @@ -0,0 +1,35 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "sync" + "time" + + "github.com/containerd/containerd/api/types/task" + vc "github.com/kata-containers/runtime/virtcontainers" + "github.com/kata-containers/runtime/virtcontainers/pkg/oci" +) + +type container struct { + s *service + ttyio *ttyIO + spec *oci.CompatOCISpec + time time.Time + execs map[string]*exec + exitIOch chan struct{} + exitCh chan uint32 + id string + stdin string + stdout string + stderr string + bundle string + cType vc.ContainerType + mu sync.Mutex + exit uint32 + status task.Status + terminal bool +} diff --git a/containerd-shim-v2/exec.go b/containerd-shim-v2/exec.go new file mode 100644 index 0000000000..a96ec65f2a --- /dev/null +++ b/containerd-shim-v2/exec.go @@ -0,0 +1,39 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "time" + + "github.com/containerd/containerd/api/types/task" + vc "github.com/kata-containers/runtime/virtcontainers" +) + +type exec struct { + container *container + cmds *vc.Cmd + tty *tty + ttyio *ttyIO + id string + + exitCode int32 + + status task.Status + + exitIOch chan struct{} + exitCh chan uint32 + + exitTime time.Time +} + +type tty struct { + stdin string + stdout string + stderr string + height uint32 + width uint32 + terminal bool +} diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go new file mode 100644 index 0000000000..c2fbf30ede --- /dev/null +++ b/containerd-shim-v2/service.go @@ -0,0 +1,341 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +package containerdshim + +import ( + "context" + "os" + sysexec "os/exec" + "sync" + "syscall" + "time" + + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/namespaces" + cdruntime "github.com/containerd/containerd/runtime" + cdshim "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/kata-containers/runtime/pkg/katautils" + vc "github.com/kata-containers/runtime/virtcontainers" + "github.com/kata-containers/runtime/virtcontainers/pkg/oci" + + ptypes "github.com/gogo/protobuf/types" + "github.com/sirupsen/logrus" +) + +const ( + // Define the service's channel size, which is used for + // reaping the exited processes exit state and forwarding + // it to containerd as the containerd event format. + bufferSize = 32 + + chSize = 128 +) + +var ( + empty = &ptypes.Empty{} + _ taskAPI.TaskService = (taskAPI.TaskService)(&service{}) +) + +// concrete virtcontainer implementation +var vci vc.VC = &vc.VCImpl{} + +// New returns a new shim service that can be used via GRPC +func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shim, error) { + logger := logrus.WithField("ID", id) + vci.SetLogger(ctx, logger) + katautils.SetLogger(ctx, logger, logger.Logger.Level) + _, runtimeConfig, err := katautils.LoadConfiguration("", true, true) + if err != nil { + return nil, err + } + + s := &service{ + id: id, + pid: uint32(os.Getpid()), + context: ctx, + config: &runtimeConfig, + containers: make(map[string]*container), + events: make(chan interface{}, chSize), + ec: make(chan exit, bufferSize), + } + + go s.processExits() + + go s.forward(publisher) + + return s, nil +} + +type exit struct { + id string + execid string + pid uint32 + status int + timestamp time.Time +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + sync.Mutex + + // pid Since this shimv2 cannot get the container processes pid from VM, + // thus for the returned values needed pid, just return this shim's + // pid directly. + pid uint32 + + context context.Context + sandbox vc.VCSandbox + containers map[string]*container + config *oci.RuntimeConfig + events chan interface{} + + ec chan exit + id string +} + +func newCommand(ctx context.Context, containerdBinary, id, containerdAddress string) (*sysexec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-address", containerdAddress, + "-publish-binary", containerdBinary, + "-id", id, + "-debug", + } + cmd := sysexec.Command(self, args...) + cmd.Dir = cwd + + // Set the go max process to 2 in case the shim forks too much process + cmd.Env = append(os.Environ(), "GOMAXPROCS=2") + + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + return cmd, nil +} + +// StartShim willl start a kata shimv2 daemon which will implemented the +// ShimV2 APIs such as create/start/update etc containers. +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { + bundlePath, err := os.Getwd() + if err != nil { + return "", err + } + + address, err := getAddress(ctx, bundlePath, id) + if err != nil { + return "", err + } + if address != "" { + if err := cdshim.WriteAddress("address", address); err != nil { + return "", err + } + return address, nil + } + + cmd, err := newCommand(ctx, containerdBinary, id, containerdAddress) + if err != nil { + return "", err + } + + address, err = cdshim.SocketAddress(ctx, id) + if err != nil { + return "", err + } + + socket, err := cdshim.NewSocket(address) + if err != nil { + return "", err + } + defer socket.Close() + f, err := socket.File() + if err != nil { + return "", err + } + defer f.Close() + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + if err := cmd.Start(); err != nil { + return "", err + } + defer func() { + if err != nil { + cmd.Process.Kill() + } + }() + + // make sure to wait after start + go cmd.Wait() + if err := cdshim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { + return "", err + } + if err := cdshim.WriteAddress("address", address); err != nil { + return "", err + } + return address, nil +} + +func (s *service) forward(publisher events.Publisher) { + for e := range s.events { + if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { + logrus.WithError(err).Error("post event") + } + } +} + +func getTopic(ctx context.Context, e interface{}) string { + switch e.(type) { + case *eventstypes.TaskCreate: + return cdruntime.TaskCreateEventTopic + case *eventstypes.TaskStart: + return cdruntime.TaskStartEventTopic + case *eventstypes.TaskOOM: + return cdruntime.TaskOOMEventTopic + case *eventstypes.TaskExit: + return cdruntime.TaskExitEventTopic + case *eventstypes.TaskDelete: + return cdruntime.TaskDeleteEventTopic + case *eventstypes.TaskExecAdded: + return cdruntime.TaskExecAddedEventTopic + case *eventstypes.TaskExecStarted: + return cdruntime.TaskExecStartedEventTopic + case *eventstypes.TaskPaused: + return cdruntime.TaskPausedEventTopic + case *eventstypes.TaskResumed: + return cdruntime.TaskResumedEventTopic + case *eventstypes.TaskCheckpointed: + return cdruntime.TaskCheckpointedEventTopic + default: + logrus.Warnf("no topic for type %#v", e) + } + return cdruntime.TaskUnknownTopic +} + +func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// Create a new sandbox or container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + return nil, errdefs.ErrNotImplemented +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "service Checkpoint") +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + return nil, errdefs.ErrNotImplemented +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + return nil, errdefs.ErrNotImplemented +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) checkProcesses(e exit) { + s.Lock() + defer s.Unlock() + + id := e.execid + if id == "" { + id = e.id + } + s.events <- &eventstypes.TaskExit{ + ContainerID: e.id, + ID: id, + Pid: e.pid, + ExitStatus: uint32(e.status), + ExitedAt: e.timestamp, + } + return +} diff --git a/containerd-shim-v2/stream.go b/containerd-shim-v2/stream.go new file mode 100644 index 0000000000..f5489c6002 --- /dev/null +++ b/containerd-shim-v2/stream.go @@ -0,0 +1,15 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +package containerdshim + +import ( + "io" +) + +type ttyIO struct { + Stdin io.ReadCloser + Stdout io.Writer + Stderr io.Writer +} diff --git a/containerd-shim-v2/utils.go b/containerd-shim-v2/utils.go new file mode 100644 index 0000000000..c59c07d26e --- /dev/null +++ b/containerd-shim-v2/utils.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017 Intel Corporation +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "fmt" + "os" + + cdshim "github.com/containerd/containerd/runtime/v2/shim" + "github.com/kata-containers/runtime/pkg/katautils" + vc "github.com/kata-containers/runtime/virtcontainers" + "github.com/kata-containers/runtime/virtcontainers/pkg/oci" +) + +func validCreateParams(containerID, bundlePath string) (string, error) { + // container ID MUST be provided. + if containerID == "" { + return "", fmt.Errorf("Missing container ID") + } + + // bundle path MUST be provided. + if bundlePath == "" { + return "", fmt.Errorf("Missing bundle path") + } + + // bundle path MUST be valid. + fileInfo, err := os.Stat(bundlePath) + if err != nil { + return "", fmt.Errorf("Invalid bundle path '%s': %s", bundlePath, err) + } + if fileInfo.IsDir() == false { + return "", fmt.Errorf("Invalid bundle path '%s', it should be a directory", bundlePath) + } + + resolved, err := katautils.ResolvePath(bundlePath) + if err != nil { + return "", err + } + + return resolved, nil +} + +func getAddress(ctx context.Context, bundlePath, id string) (string, error) { + var err error + + // Checks the MUST and MUST NOT from OCI runtime specification + if bundlePath, err = validCreateParams(id, bundlePath); err != nil { + return "", err + } + + ociSpec, err := oci.ParseConfigJSON(bundlePath) + if err != nil { + return "", err + } + + containerType, err := ociSpec.ContainerType() + if err != nil { + return "", err + } + + if containerType == vc.PodContainer { + sandboxID, err := ociSpec.SandboxID() + if err != nil { + return "", err + } + address, err := cdshim.SocketAddress(ctx, sandboxID) + if err != nil { + return "", err + } + return address, nil + } + + return "", nil +}