mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-04 11:06:21 +00:00
containerd-shim-kata-v2: add the start service support
Add the Start api support of start a pod or container created before. Signed-off-by: fupan <lifupan@gmail.com>
This commit is contained in:
parent
72fd6e0c7d
commit
4c5b29647b
@ -283,6 +283,27 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//start a container
|
||||
if r.ExecID == "" {
|
||||
err = startContainer(ctx, s, c)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
return &taskAPI.StartResponse{
|
||||
Pid: s.pid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//start an exec
|
||||
return nil, errdefs.ErrNotImplemented
|
||||
}
|
||||
|
||||
@ -382,3 +403,13 @@ func (s *service) checkProcesses(e exit) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) getContainer(id string) (*container, error) {
|
||||
c := s.containers[id]
|
||||
|
||||
if c == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container does not exist %s", id)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
71
containerd-shim-v2/start.go
Normal file
71
containerd-shim-v2/start.go
Normal file
@ -0,0 +1,71 @@
|
||||
// Copyright (c) 2018 HyperHQ Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/kata-containers/runtime/pkg/katautils"
|
||||
)
|
||||
|
||||
func startContainer(ctx context.Context, s *service, c *container) error {
|
||||
//start a container
|
||||
if c.cType == "" {
|
||||
err := fmt.Errorf("Bug, the container %s type is empty", c.id)
|
||||
return err
|
||||
}
|
||||
|
||||
if s.sandbox == nil {
|
||||
err := fmt.Errorf("Bug, the sandbox hasn't been created for this container %s", c.id)
|
||||
return err
|
||||
}
|
||||
|
||||
if c.cType.IsSandbox() {
|
||||
err := s.sandbox.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_, err := s.sandbox.StartContainer(c.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Run post-start OCI hooks.
|
||||
err := katautils.EnterNetNS(s.sandbox.GetNetNs(), func() error {
|
||||
return katautils.PostStartHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.status = task.StatusRunning
|
||||
|
||||
stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, c.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
||||
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.ttyio = tty
|
||||
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
|
||||
} else {
|
||||
//close the io exit channel, since there is no io for this container,
|
||||
//otherwise the following wait goroutine will hang on this channel.
|
||||
close(c.exitIOch)
|
||||
}
|
||||
|
||||
go wait(s, c, "")
|
||||
|
||||
return nil
|
||||
}
|
@ -5,7 +5,24 @@
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/fifo"
|
||||
)
|
||||
|
||||
// The buffer size used to specify the buffer for IO streams copy
|
||||
const bufSize = 32 << 10
|
||||
|
||||
var (
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
buffer := make([]byte, bufSize)
|
||||
return &buffer
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type ttyIO struct {
|
||||
@ -13,3 +30,97 @@ type ttyIO struct {
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
}
|
||||
|
||||
func (tty *ttyIO) close() {
|
||||
|
||||
if tty.Stdin != nil {
|
||||
tty.Stdin.Close()
|
||||
}
|
||||
cf := func(w io.Writer) {
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
if c, ok := w.(io.WriteCloser); ok {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
cf(tty.Stdout)
|
||||
cf(tty.Stderr)
|
||||
}
|
||||
|
||||
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
|
||||
var in io.ReadCloser
|
||||
var outw io.Writer
|
||||
var errw io.Writer
|
||||
var err error
|
||||
|
||||
if stdin != "" {
|
||||
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if stdout != "" {
|
||||
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if !console && stderr != "" {
|
||||
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_WRONLY, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ttyIO := &ttyIO{
|
||||
Stdin: in,
|
||||
Stdout: outw,
|
||||
Stderr: errw,
|
||||
}
|
||||
|
||||
return ttyIO, nil
|
||||
}
|
||||
|
||||
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||
var wg sync.WaitGroup
|
||||
var closeOnce sync.Once
|
||||
|
||||
if tty.Stdin != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
if tty.Stdout != nil {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
|
||||
wg.Done()
|
||||
closeOnce.Do(tty.close)
|
||||
}()
|
||||
}
|
||||
|
||||
if tty.Stderr != nil && stderrPipe != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
closeOnce.Do(tty.close)
|
||||
close(exitch)
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||
"github.com/kata-containers/runtime/pkg/katautils"
|
||||
@ -18,6 +19,16 @@ import (
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
func cReap(s *service, status int, id, execid string, exitat time.Time) {
|
||||
s.ec <- exit{
|
||||
timestamp: exitat,
|
||||
pid: s.pid,
|
||||
status: status,
|
||||
id: id,
|
||||
execid: execid,
|
||||
}
|
||||
}
|
||||
|
||||
func validBundle(containerID, bundlePath string) (string, error) {
|
||||
// container ID MUST be provided.
|
||||
if containerID == "" {
|
||||
|
56
containerd-shim-v2/wait.go
Normal file
56
containerd-shim-v2/wait.go
Normal file
@ -0,0 +1,56 @@
|
||||
// Copyright (c) 2018 HyperHQ Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func wait(s *service, c *container, execID string) (int32, error) {
|
||||
var execs *exec
|
||||
var err error
|
||||
|
||||
processID := c.id
|
||||
|
||||
if execID == "" {
|
||||
//wait until the io closed, then wait the container
|
||||
<-c.exitIOch
|
||||
}
|
||||
|
||||
ret, err := s.sandbox.WaitProcess(c.id, processID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{
|
||||
"container": c.id,
|
||||
"pid": processID,
|
||||
}).Error("Wait for process failed")
|
||||
}
|
||||
|
||||
if execID == "" {
|
||||
c.exitCh <- uint32(ret)
|
||||
} else {
|
||||
execs.exitCh <- uint32(ret)
|
||||
}
|
||||
|
||||
timeStamp := time.Now()
|
||||
c.mu.Lock()
|
||||
if execID == "" {
|
||||
c.status = task.StatusStopped
|
||||
c.exit = uint32(ret)
|
||||
c.time = timeStamp
|
||||
} else {
|
||||
execs.status = task.StatusStopped
|
||||
execs.exitCode = ret
|
||||
execs.exitTime = timeStamp
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
go cReap(s, int(ret), c.id, execID, timeStamp)
|
||||
|
||||
return ret, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user