containerd-shim-kata-v2: add the start service support

Add the Start api support of start a pod or
container created before.

Signed-off-by: fupan <lifupan@gmail.com>
This commit is contained in:
fupan 2018-11-19 10:28:48 +08:00
parent 72fd6e0c7d
commit 4c5b29647b
5 changed files with 280 additions and 0 deletions

View File

@ -283,6 +283,27 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.Lock()
defer s.Unlock()
c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
//start a container
if r.ExecID == "" {
err = startContainer(ctx, s, c)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &taskAPI.StartResponse{
Pid: s.pid,
}, nil
}
//start an exec
return nil, errdefs.ErrNotImplemented
}
@ -382,3 +403,13 @@ func (s *service) checkProcesses(e exit) {
}
return
}
func (s *service) getContainer(id string) (*container, error) {
c := s.containers[id]
if c == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container does not exist %s", id)
}
return c, nil
}

View File

@ -0,0 +1,71 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"github.com/containerd/containerd/api/types/task"
"github.com/kata-containers/runtime/pkg/katautils"
)
func startContainer(ctx context.Context, s *service, c *container) error {
//start a container
if c.cType == "" {
err := fmt.Errorf("Bug, the container %s type is empty", c.id)
return err
}
if s.sandbox == nil {
err := fmt.Errorf("Bug, the sandbox hasn't been created for this container %s", c.id)
return err
}
if c.cType.IsSandbox() {
err := s.sandbox.Start()
if err != nil {
return err
}
} else {
_, err := s.sandbox.StartContainer(c.id)
if err != nil {
return err
}
}
// Run post-start OCI hooks.
err := katautils.EnterNetNS(s.sandbox.GetNetNs(), func() error {
return katautils.PostStartHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle)
})
if err != nil {
return err
}
c.status = task.StatusRunning
stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, c.id)
if err != nil {
return err
}
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
} else {
//close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
}
go wait(s, c, "")
return nil
}

View File

@ -5,7 +5,24 @@
package containerdshim
import (
"context"
"io"
"sync"
"syscall"
"github.com/containerd/fifo"
)
// The buffer size used to specify the buffer for IO streams copy
const bufSize = 32 << 10
var (
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, bufSize)
return &buffer
},
}
)
type ttyIO struct {
@ -13,3 +30,97 @@ type ttyIO struct {
Stdout io.Writer
Stderr io.Writer
}
func (tty *ttyIO) close() {
if tty.Stdin != nil {
tty.Stdin.Close()
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
}
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var in io.ReadCloser
var outw io.Writer
var errw io.Writer
var err error
if stdin != "" {
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
}
if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
}
if !console && stderr != "" {
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
}
ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
}
return ttyIO, nil
}
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once
if tty.Stdin != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
wg.Done()
}()
}
if tty.Stdout != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
wg.Done()
closeOnce.Do(tty.close)
}()
}
if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
}()
}
wg.Wait()
closeOnce.Do(tty.close)
close(exitch)
}

View File

@ -10,6 +10,7 @@ import (
"context"
"fmt"
"os"
"time"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
"github.com/kata-containers/runtime/pkg/katautils"
@ -18,6 +19,16 @@ import (
"github.com/opencontainers/runtime-spec/specs-go"
)
func cReap(s *service, status int, id, execid string, exitat time.Time) {
s.ec <- exit{
timestamp: exitat,
pid: s.pid,
status: status,
id: id,
execid: execid,
}
}
func validBundle(containerID, bundlePath string) (string, error) {
// container ID MUST be provided.
if containerID == "" {

View File

@ -0,0 +1,56 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"time"
"github.com/containerd/containerd/api/types/task"
"github.com/sirupsen/logrus"
)
func wait(s *service, c *container, execID string) (int32, error) {
var execs *exec
var err error
processID := c.id
if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
}
ret, err := s.sandbox.WaitProcess(c.id, processID)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"container": c.id,
"pid": processID,
}).Error("Wait for process failed")
}
if execID == "" {
c.exitCh <- uint32(ret)
} else {
execs.exitCh <- uint32(ret)
}
timeStamp := time.Now()
c.mu.Lock()
if execID == "" {
c.status = task.StatusStopped
c.exit = uint32(ret)
c.time = timeStamp
} else {
execs.status = task.StatusStopped
execs.exitCode = ret
execs.exitTime = timeStamp
}
c.mu.Unlock()
go cReap(s, int(ret), c.id, execID, timeStamp)
return ret, nil
}