mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 04:04:45 +00:00
shim: support shim v2 logging plugin
Now kata shim only supports stdout/stderr of fifo from containerd/CRI-O, but shim v2 supports logging plugins, and nerdctl default will use the binary schema for logs. This commit will add the others type of log plugins: - file - binary In case of binary, kata shim will receive a stdout/stderr like: binary:///nerdctl?_NERDCTL_INTERNAL_LOGGING=/var/lib/nerdctl/1935db59 That means the nerdctl process will handle the logs(stdout/stderr) Fixes: #4420 Signed-off-by: Bin Liu <bin@hyper.sh>
This commit is contained in:
parent
a238d8c6bd
commit
4e30e11b31
@ -7,6 +7,7 @@ package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
sysexec "os/exec"
|
||||
@ -83,6 +84,11 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
|
||||
vci.SetLogger(ctx, shimLog)
|
||||
katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level)
|
||||
|
||||
ns, found := namespaces.Namespace(ctx)
|
||||
if !found {
|
||||
return nil, fmt.Errorf("shim namespace cannot be empty")
|
||||
}
|
||||
|
||||
s := &service{
|
||||
id: id,
|
||||
pid: uint32(os.Getpid()),
|
||||
@ -91,6 +97,7 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
|
||||
events: make(chan interface{}, chSize),
|
||||
ec: make(chan exit, bufferSize),
|
||||
cancel: shutdown,
|
||||
namespace: ns,
|
||||
}
|
||||
|
||||
go s.processExits()
|
||||
@ -129,6 +136,9 @@ type service struct {
|
||||
|
||||
id string
|
||||
|
||||
// Namespace from upper container engine
|
||||
namespace string
|
||||
|
||||
mu sync.Mutex
|
||||
eventSendMu sync.Mutex
|
||||
|
||||
|
216
src/runtime/pkg/containerd-shim-v2/shim_io_binary.go
Normal file
216
src/runtime/pkg/containerd-shim-v2/shim_io_binary.go
Normal file
@ -0,0 +1,216 @@
|
||||
// Copyright (c) 2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/execabs"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
)
|
||||
|
||||
const (
|
||||
binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
|
||||
)
|
||||
|
||||
var (
|
||||
_ IO = &binaryIO{}
|
||||
)
|
||||
|
||||
// binaryIO related code is from https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L311
|
||||
type binaryIO struct {
|
||||
cmd *execabs.Cmd
|
||||
out, err *pipe
|
||||
}
|
||||
|
||||
// https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L248
|
||||
func newBinaryIO(ctx context.Context, ns, id string, uri *url.URL) (bio *binaryIO, err error) {
|
||||
var closers []func() error
|
||||
defer func() {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
result := multierror.Append(err)
|
||||
for _, fn := range closers {
|
||||
result = multierror.Append(result, fn())
|
||||
}
|
||||
err = multierror.Flatten(result)
|
||||
}()
|
||||
|
||||
out, err := newPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
|
||||
}
|
||||
closers = append(closers, out.Close)
|
||||
|
||||
serr, err := newPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
|
||||
}
|
||||
closers = append(closers, serr.Close)
|
||||
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
closers = append(closers, r.Close, w.Close)
|
||||
|
||||
cmd := newBinaryCmd(uri, id, ns)
|
||||
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
|
||||
// don't need to register this with the reaper or wait when
|
||||
// running inside a shim
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("failed to start binary process: %w", err)
|
||||
}
|
||||
closers = append(closers, func() error { return cmd.Process.Kill() })
|
||||
|
||||
// close our side of the pipe after start
|
||||
if err := w.Close(); err != nil {
|
||||
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
|
||||
}
|
||||
|
||||
// wait for the logging binary to be ready
|
||||
b := make([]byte, 1)
|
||||
if _, err := r.Read(b); err != nil && err != io.EOF {
|
||||
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
|
||||
}
|
||||
|
||||
return &binaryIO{
|
||||
cmd: cmd,
|
||||
out: out,
|
||||
err: serr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// newBinaryCmd returns a Cmd to be used to start a logging binary.
|
||||
// The Cmd is generated from the provided uri, and the container ID and
|
||||
// namespace are appended to the Cmd environment.
|
||||
func newBinaryCmd(binaryURI *url.URL, id, ns string) *execabs.Cmd {
|
||||
var args []string
|
||||
for k, vs := range binaryURI.Query() {
|
||||
args = append(args, k)
|
||||
if len(vs) > 0 {
|
||||
args = append(args, vs[0])
|
||||
}
|
||||
}
|
||||
|
||||
cmd := execabs.Command(binaryURI.Path, args...)
|
||||
|
||||
cmd.Env = append(cmd.Env,
|
||||
"CONTAINER_ID="+id,
|
||||
"CONTAINER_NAMESPACE="+ns,
|
||||
)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (bi *binaryIO) Stdin() io.ReadCloser {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bi *binaryIO) Stdout() io.Writer {
|
||||
return bi.out.w
|
||||
}
|
||||
|
||||
func (bi *binaryIO) Stderr() io.Writer {
|
||||
return bi.err.w
|
||||
}
|
||||
|
||||
func (bi *binaryIO) Close() error {
|
||||
var (
|
||||
result *multierror.Error
|
||||
)
|
||||
|
||||
for _, v := range []*pipe{bi.out, bi.err} {
|
||||
if v != nil {
|
||||
if err := v.Close(); err != nil {
|
||||
result = multierror.Append(result, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := bi.cancel(); err != nil {
|
||||
result = multierror.Append(result, err)
|
||||
}
|
||||
|
||||
return result.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (bi *binaryIO) cancel() error {
|
||||
if bi.cmd == nil || bi.cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send SIGTERM first, so logger process has a chance to flush and exit properly
|
||||
if err := bi.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
result := multierror.Append(fmt.Errorf("failed to send SIGTERM: %w", err))
|
||||
|
||||
shimLog.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
|
||||
|
||||
if err := bi.cmd.Process.Kill(); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to kill process after faulty SIGTERM: %w", err))
|
||||
}
|
||||
|
||||
return result.ErrorOrNil()
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- bi.cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-time.After(binaryIOProcTermTimeout):
|
||||
shimLog.Warn("failed to wait for shim logger process to exit, killing")
|
||||
|
||||
err := bi.cmd.Process.Kill()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to kill shim logger process: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func newPipe() (*pipe, error) {
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pipe{
|
||||
r: r,
|
||||
w: w,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type pipe struct {
|
||||
r *os.File
|
||||
w *os.File
|
||||
}
|
||||
|
||||
// https://github.com/containerd/containerd/blob/v1.6.6/vendor/github.com/containerd/go-runc/io.go#L71
|
||||
func (p *pipe) Close() error {
|
||||
var result *multierror.Error
|
||||
|
||||
if err := p.w.Close(); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close write pipe: %w", err))
|
||||
}
|
||||
|
||||
if err := p.r.Close(); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close read pipe: %w", err))
|
||||
}
|
||||
|
||||
return multierror.Prefix(result.ErrorOrNil(), "pipe:")
|
||||
}
|
80
src/runtime/pkg/containerd-shim-v2/shim_io_file.go
Normal file
80
src/runtime/pkg/containerd-shim-v2/shim_io_file.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright (c) 2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||
)
|
||||
|
||||
var (
|
||||
_ IO = &fileIO{}
|
||||
)
|
||||
|
||||
// fileIO only support write both stdout/stderr to the same file
|
||||
type fileIO struct {
|
||||
outw io.WriteCloser
|
||||
errw io.WriteCloser
|
||||
path string
|
||||
}
|
||||
|
||||
// openLogFile opens/creates a container log file with its directory.
|
||||
func openLogFile(path string) (*os.File, error) {
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
|
||||
}
|
||||
|
||||
func newFileIO(ctx context.Context, stdio *stdio, uri *url.URL) (*fileIO, error) {
|
||||
var outw, errw, f io.WriteCloser
|
||||
var err error
|
||||
|
||||
logFile := uri.Path
|
||||
if f, err = openLogFile(logFile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if stdio.Stdout != "" {
|
||||
outw = cioutil.NewSerialWriteCloser(f)
|
||||
}
|
||||
|
||||
if !stdio.Console && stdio.Stderr != "" {
|
||||
errw = cioutil.NewSerialWriteCloser(f)
|
||||
}
|
||||
|
||||
return &fileIO{
|
||||
path: logFile,
|
||||
outw: outw,
|
||||
errw: errw,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fi *fileIO) Close() error {
|
||||
if fi.outw != nil {
|
||||
return wc(fi.outw)
|
||||
} else if fi.errw != nil {
|
||||
return wc(fi.errw)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fi *fileIO) Stdin() io.ReadCloser {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fi *fileIO) Stdout() io.Writer {
|
||||
return fi.outw
|
||||
}
|
||||
|
||||
func (fi *fileIO) Stderr() io.Writer {
|
||||
return fi.errw
|
||||
}
|
95
src/runtime/pkg/containerd-shim-v2/shim_io_pipe.go
Normal file
95
src/runtime/pkg/containerd-shim-v2/shim_io_pipe.go
Normal file
@ -0,0 +1,95 @@
|
||||
// Copyright (c) 2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
)
|
||||
|
||||
var (
|
||||
_ IO = &pipeIO{}
|
||||
)
|
||||
|
||||
type pipeIO struct {
|
||||
in io.ReadCloser
|
||||
outw io.WriteCloser
|
||||
errw io.WriteCloser
|
||||
}
|
||||
|
||||
func newPipeIO(ctx context.Context, stdio *stdio) (*pipeIO, error) {
|
||||
var in io.ReadCloser
|
||||
var outw io.WriteCloser
|
||||
var errw io.WriteCloser
|
||||
var err error
|
||||
|
||||
if stdio.Stdin != "" {
|
||||
in, err = fifo.OpenFifo(ctx, stdio.Stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if stdio.Stdout != "" {
|
||||
outw, err = fifo.OpenFifo(ctx, stdio.Stdout, syscall.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if !stdio.Console && stdio.Stderr != "" {
|
||||
errw, err = fifo.OpenFifo(ctx, stdio.Stderr, syscall.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
pipeIO := &pipeIO{
|
||||
in: in,
|
||||
outw: outw,
|
||||
errw: errw,
|
||||
}
|
||||
|
||||
return pipeIO, nil
|
||||
}
|
||||
|
||||
func (pi *pipeIO) Stdin() io.ReadCloser {
|
||||
return pi.in
|
||||
}
|
||||
|
||||
func (pi *pipeIO) Stdout() io.Writer {
|
||||
return pi.outw
|
||||
}
|
||||
|
||||
func (pi *pipeIO) Stderr() io.Writer {
|
||||
return pi.errw
|
||||
}
|
||||
|
||||
func (pi *pipeIO) Close() error {
|
||||
var result *multierror.Error
|
||||
|
||||
if pi.in != nil {
|
||||
if err := pi.in.Close(); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close stdin: %w", err))
|
||||
}
|
||||
pi.in = nil
|
||||
}
|
||||
|
||||
if err := wc(pi.outw); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close stdout: %w", err))
|
||||
}
|
||||
|
||||
if err := wc(pi.errw); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close stderr: %w", err))
|
||||
}
|
||||
|
||||
return result.ErrorOrNil()
|
||||
}
|
@ -8,6 +8,7 @@ package containerdshim
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
@ -75,7 +76,7 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
|
||||
c.stdinPipe = stdin
|
||||
|
||||
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
||||
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||
tty, err := newTtyIO(ctx, s.namespace, c.id, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -141,7 +142,7 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e *
|
||||
|
||||
execs.stdinPipe = stdin
|
||||
|
||||
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
||||
tty, err := newTtyIO(ctx, s.namespace, execs.id, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -7,16 +7,22 @@ package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// The buffer size used to specify the buffer for IO streams copy
|
||||
const bufSize = 32 << 10
|
||||
const (
|
||||
// The buffer size used to specify the buffer for IO streams copy
|
||||
bufSize = 32 << 10
|
||||
|
||||
shimLogPluginBinary = "binary"
|
||||
shimLogPluginFifo = "fifo"
|
||||
shimLogPluginFile = "file"
|
||||
)
|
||||
|
||||
var (
|
||||
bufPool = sync.Pool{
|
||||
@ -27,76 +33,84 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
type stdio struct {
|
||||
Stdin string
|
||||
Stdout string
|
||||
Stderr string
|
||||
Console bool
|
||||
}
|
||||
type IO interface {
|
||||
io.Closer
|
||||
Stdin() io.ReadCloser
|
||||
Stdout() io.Writer
|
||||
Stderr() io.Writer
|
||||
}
|
||||
|
||||
type ttyIO struct {
|
||||
Stdin io.ReadCloser
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
io IO
|
||||
raw *stdio
|
||||
}
|
||||
|
||||
func (tty *ttyIO) close() {
|
||||
|
||||
if tty.Stdin != nil {
|
||||
tty.Stdin.Close()
|
||||
tty.Stdin = nil
|
||||
}
|
||||
cf := func(w io.Writer) {
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
if c, ok := w.(io.WriteCloser); ok {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
cf(tty.Stdout)
|
||||
cf(tty.Stderr)
|
||||
tty.io.Close()
|
||||
}
|
||||
|
||||
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
|
||||
var in io.ReadCloser
|
||||
var outw io.Writer
|
||||
var errw io.Writer
|
||||
// newTtyIO creates a new ttyIO struct.
|
||||
// ns(namespace)/id(container ID) are used for containerd binary IO.
|
||||
// containerd will pass the ns/id as ENV to the binary log driver,
|
||||
// and the binary log driver will use ns/id to get the log options config file.
|
||||
// for example nerdctl: https://github.com/containerd/nerdctl/blob/v0.21.0/pkg/logging/logging.go#L102
|
||||
func newTtyIO(ctx context.Context, ns, id, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
|
||||
var err error
|
||||
var io IO
|
||||
|
||||
if stdin != "" {
|
||||
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
|
||||
raw := &stdio{
|
||||
Stdin: stdin,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
Console: console,
|
||||
}
|
||||
|
||||
uri, err := url.Parse(stdout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
|
||||
}
|
||||
|
||||
if uri.Scheme == "" {
|
||||
uri.Scheme = "fifo"
|
||||
}
|
||||
|
||||
switch uri.Scheme {
|
||||
case shimLogPluginFifo:
|
||||
io, err = newPipeIO(ctx, raw)
|
||||
case shimLogPluginBinary:
|
||||
io, err = newBinaryIO(ctx, ns, id, uri)
|
||||
case shimLogPluginFile:
|
||||
io, err = newFileIO(ctx, raw, uri)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown STDIO scheme %s", uri.Scheme)
|
||||
}
|
||||
|
||||
if stdout != "" {
|
||||
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("failed to creat io stream: %w", err)
|
||||
}
|
||||
|
||||
if !console && stderr != "" {
|
||||
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ttyIO := &ttyIO{
|
||||
Stdin: in,
|
||||
Stdout: outw,
|
||||
Stderr: errw,
|
||||
}
|
||||
|
||||
return ttyIO, nil
|
||||
return &ttyIO{
|
||||
io: io,
|
||||
raw: raw,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if tty.Stdin != nil {
|
||||
if tty.io.Stdin() != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
shimLog.Debug("stdin io stream copy started")
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
||||
io.CopyBuffer(stdinPipe, tty.io.Stdin(), *p)
|
||||
// notify that we can close process's io safely.
|
||||
close(stdinCloser)
|
||||
wg.Done()
|
||||
@ -104,30 +118,30 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
|
||||
}()
|
||||
}
|
||||
|
||||
if tty.Stdout != nil {
|
||||
if tty.io.Stdout() != nil {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
shimLog.Debug("stdout io stream copy started")
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
|
||||
io.CopyBuffer(tty.io.Stdout(), stdoutPipe, *p)
|
||||
wg.Done()
|
||||
if tty.Stdin != nil {
|
||||
if tty.io.Stdin() != nil {
|
||||
// close stdin to make the other routine stop
|
||||
tty.Stdin.Close()
|
||||
tty.io.Stdin().Close()
|
||||
}
|
||||
shimLog.Debug("stdout io stream copy exited")
|
||||
}()
|
||||
}
|
||||
|
||||
if tty.Stderr != nil && stderrPipe != nil {
|
||||
if tty.io.Stderr() != nil && stderrPipe != nil {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
shimLog.Debug("stderr io stream copy started")
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
|
||||
io.CopyBuffer(tty.io.Stderr(), stderrPipe, *p)
|
||||
wg.Done()
|
||||
shimLog.Debug("stderr io stream copy exited")
|
||||
}()
|
||||
@ -138,3 +152,10 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
|
||||
close(exitch)
|
||||
shimLog.Debug("all io stream copy goroutines exited")
|
||||
}
|
||||
|
||||
func wc(w io.WriteCloser) error {
|
||||
if w == nil {
|
||||
return nil
|
||||
}
|
||||
return w.Close()
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -15,6 +14,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -45,7 +46,7 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
|
||||
defer outr.Close()
|
||||
errr = createReadFifo(stderr)
|
||||
defer errr.Close()
|
||||
tty, err = newTtyIO(ctx, "", stdout, stderr, false)
|
||||
tty, err = newTtyIO(ctx, "", "", "", stdout, stderr, false)
|
||||
assert.NoError(err)
|
||||
defer tty.close()
|
||||
|
||||
@ -72,9 +73,9 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
checkFifoWrite(tty.Stdout)
|
||||
checkFifoWrite(tty.io.Stdout())
|
||||
checkFifoRead(outr)
|
||||
checkFifoWrite(tty.Stderr)
|
||||
checkFifoWrite(tty.io.Stderr())
|
||||
checkFifoRead(errr)
|
||||
|
||||
err = outr.Close()
|
||||
@ -84,8 +85,8 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
|
||||
|
||||
// Make sure that writing to tty fifo will not get `EPIPE`
|
||||
// when the read side is closed
|
||||
checkFifoWrite(tty.Stdout)
|
||||
checkFifoWrite(tty.Stderr)
|
||||
checkFifoWrite(tty.io.Stdout())
|
||||
checkFifoWrite(tty.io.Stderr())
|
||||
|
||||
// Reopen the fifo
|
||||
outr = createReadFifo(stdout)
|
||||
@ -171,7 +172,7 @@ func TestIoCopy(t *testing.T) {
|
||||
defer srcInW.Close()
|
||||
}
|
||||
|
||||
tty, err := newTtyIO(ctx, srcStdinPath, dstStdoutPath, dstStderrPath, false)
|
||||
tty, err := newTtyIO(ctx, "", "", srcStdinPath, dstStdoutPath, dstStderrPath, false)
|
||||
assert.NoError(err)
|
||||
defer tty.close()
|
||||
|
||||
|
57
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/read_closer.go
generated
vendored
Normal file
57
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/read_closer.go
generated
vendored
Normal file
@ -0,0 +1,57 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ioutil
|
||||
|
||||
import "io"
|
||||
|
||||
// writeCloseInformer wraps a reader with a close function.
|
||||
type wrapReadCloser struct {
|
||||
reader *io.PipeReader
|
||||
writer *io.PipeWriter
|
||||
}
|
||||
|
||||
// NewWrapReadCloser creates a wrapReadCloser from a reader.
|
||||
// NOTE(random-liu): To avoid goroutine leakage, the reader passed in
|
||||
// must be eventually closed by the caller.
|
||||
func NewWrapReadCloser(r io.Reader) io.ReadCloser {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
_, _ = io.Copy(pw, r)
|
||||
pr.Close()
|
||||
pw.Close()
|
||||
}()
|
||||
return &wrapReadCloser{
|
||||
reader: pr,
|
||||
writer: pw,
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p.
|
||||
func (w *wrapReadCloser) Read(p []byte) (int, error) {
|
||||
n, err := w.reader.Read(p)
|
||||
if err == io.ErrClosedPipe {
|
||||
return n, io.EOF
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close closes read closer.
|
||||
func (w *wrapReadCloser) Close() error {
|
||||
w.reader.Close()
|
||||
w.writer.Close()
|
||||
return nil
|
||||
}
|
102
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/write_closer.go
generated
vendored
Normal file
102
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/write_closer.go
generated
vendored
Normal file
@ -0,0 +1,102 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ioutil
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// writeCloseInformer wraps passed in write closer with a close channel.
|
||||
// Caller could wait on the close channel for the write closer to be
|
||||
// closed.
|
||||
type writeCloseInformer struct {
|
||||
close chan struct{}
|
||||
wc io.WriteCloser
|
||||
}
|
||||
|
||||
// NewWriteCloseInformer creates the writeCloseInformer from a write closer.
|
||||
func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) {
|
||||
close := make(chan struct{})
|
||||
return &writeCloseInformer{
|
||||
close: close,
|
||||
wc: wc,
|
||||
}, close
|
||||
}
|
||||
|
||||
// Write passes through the data into the internal write closer.
|
||||
func (w *writeCloseInformer) Write(p []byte) (int, error) {
|
||||
return w.wc.Write(p)
|
||||
}
|
||||
|
||||
// Close closes the internal write closer and inform the close channel.
|
||||
func (w *writeCloseInformer) Close() error {
|
||||
err := w.wc.Close()
|
||||
close(w.close)
|
||||
return err
|
||||
}
|
||||
|
||||
// nopWriteCloser wraps passed in writer with a nop close function.
|
||||
type nopWriteCloser struct {
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
// NewNopWriteCloser creates the nopWriteCloser from a writer.
|
||||
func NewNopWriteCloser(w io.Writer) io.WriteCloser {
|
||||
return &nopWriteCloser{w: w}
|
||||
}
|
||||
|
||||
// Write passes through the data into the internal writer.
|
||||
func (n *nopWriteCloser) Write(p []byte) (int, error) {
|
||||
return n.w.Write(p)
|
||||
}
|
||||
|
||||
// Close is a nop close function.
|
||||
func (n *nopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// serialWriteCloser wraps a write closer and makes sure all writes
|
||||
// are done in serial.
|
||||
// Parallel write won't intersect with each other. Use case:
|
||||
// 1) Pipe: Write content longer than PIPE_BUF.
|
||||
// See http://man7.org/linux/man-pages/man7/pipe.7.html
|
||||
// 2) <3.14 Linux Kernel: write is not atomic
|
||||
// See http://man7.org/linux/man-pages/man2/write.2.html
|
||||
type serialWriteCloser struct {
|
||||
mu sync.Mutex
|
||||
wc io.WriteCloser
|
||||
}
|
||||
|
||||
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
|
||||
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
|
||||
return &serialWriteCloser{wc: wc}
|
||||
}
|
||||
|
||||
// Write writes a group of byte arrays in order atomically.
|
||||
func (s *serialWriteCloser) Write(data []byte) (int, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.wc.Write(data)
|
||||
}
|
||||
|
||||
// Close closes the write closer.
|
||||
func (s *serialWriteCloser) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.wc.Close()
|
||||
}
|
105
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/writer_group.go
generated
vendored
Normal file
105
src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/writer_group.go
generated
vendored
Normal file
@ -0,0 +1,105 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ioutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// WriterGroup is a group of writers. Writer could be dynamically
|
||||
// added and removed.
|
||||
type WriterGroup struct {
|
||||
mu sync.Mutex
|
||||
writers map[string]io.WriteCloser
|
||||
closed bool
|
||||
}
|
||||
|
||||
var _ io.Writer = &WriterGroup{}
|
||||
|
||||
// NewWriterGroup creates an empty writer group.
|
||||
func NewWriterGroup() *WriterGroup {
|
||||
return &WriterGroup{
|
||||
writers: make(map[string]io.WriteCloser),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a writer into the group. The writer will be closed
|
||||
// if the writer group is closed.
|
||||
func (g *WriterGroup) Add(key string, w io.WriteCloser) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
if g.closed {
|
||||
w.Close()
|
||||
return
|
||||
}
|
||||
g.writers[key] = w
|
||||
}
|
||||
|
||||
// Get gets a writer from the group, returns nil if the writer
|
||||
// doesn't exist.
|
||||
func (g *WriterGroup) Get(key string) io.WriteCloser {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
return g.writers[key]
|
||||
}
|
||||
|
||||
// Remove removes a writer from the group.
|
||||
func (g *WriterGroup) Remove(key string) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
w, ok := g.writers[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
w.Close()
|
||||
delete(g.writers, key)
|
||||
}
|
||||
|
||||
// Write writes data into each writer. If a writer returns error,
|
||||
// it will be closed and removed from the writer group. It returns
|
||||
// error if writer group is empty.
|
||||
func (g *WriterGroup) Write(p []byte) (int, error) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
for k, w := range g.writers {
|
||||
n, err := w.Write(p)
|
||||
if err == nil && len(p) == n {
|
||||
continue
|
||||
}
|
||||
// The writer is closed or in bad state, remove it.
|
||||
w.Close()
|
||||
delete(g.writers, k)
|
||||
}
|
||||
if len(g.writers) == 0 {
|
||||
return 0, errors.New("writer group is empty")
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close closes the writer group. Write will return error after
|
||||
// closed.
|
||||
func (g *WriterGroup) Close() {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
for _, w := range g.writers {
|
||||
w.Close()
|
||||
}
|
||||
g.writers = nil
|
||||
g.closed = true
|
||||
}
|
1
src/runtime/vendor/modules.txt
vendored
1
src/runtime/vendor/modules.txt
vendored
@ -79,6 +79,7 @@ github.com/containerd/containerd/mount
|
||||
github.com/containerd/containerd/namespaces
|
||||
github.com/containerd/containerd/pkg/cri/annotations
|
||||
github.com/containerd/containerd/pkg/dialer
|
||||
github.com/containerd/containerd/pkg/ioutil
|
||||
github.com/containerd/containerd/pkg/runtimeoptions/v1
|
||||
github.com/containerd/containerd/pkg/shutdown
|
||||
github.com/containerd/containerd/pkg/ttrpcutil
|
||||
|
Loading…
Reference in New Issue
Block a user