diff --git a/src/runtime/pkg/containerd-shim-v2/service.go b/src/runtime/pkg/containerd-shim-v2/service.go index 27ebe19268..9e703c9e21 100644 --- a/src/runtime/pkg/containerd-shim-v2/service.go +++ b/src/runtime/pkg/containerd-shim-v2/service.go @@ -7,6 +7,7 @@ package containerdshim import ( "context" + "fmt" "io" "os" sysexec "os/exec" @@ -85,6 +86,11 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu vci.SetLogger(ctx, shimLog) katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level) + ns, found := namespaces.Namespace(ctx) + if !found { + return nil, fmt.Errorf("shim namespace cannot be empty") + } + s := &service{ id: id, pid: uint32(os.Getpid()), @@ -93,6 +99,7 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu events: make(chan interface{}, chSize), ec: make(chan exit, bufferSize), cancel: shutdown, + namespace: ns, } go s.processExits() @@ -131,6 +138,9 @@ type service struct { id string + // Namespace from upper container engine + namespace string + mu sync.Mutex eventSendMu sync.Mutex diff --git a/src/runtime/pkg/containerd-shim-v2/shim_io_binary.go b/src/runtime/pkg/containerd-shim-v2/shim_io_binary.go new file mode 100644 index 0000000000..39a72ec115 --- /dev/null +++ b/src/runtime/pkg/containerd-shim-v2/shim_io_binary.go @@ -0,0 +1,216 @@ +// Copyright (c) 2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "syscall" + "time" + + "golang.org/x/sys/execabs" + + "github.com/hashicorp/go-multierror" +) + +const ( + binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup +) + +var ( + _ IO = &binaryIO{} +) + +// binaryIO related code is from https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L311 +type binaryIO struct { + cmd *execabs.Cmd + out, err *pipe +} + +// https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L248 +func newBinaryIO(ctx context.Context, ns, id string, uri *url.URL) (bio *binaryIO, err error) { + var closers []func() error + defer func() { + if err == nil { + return + } + result := multierror.Append(err) + for _, fn := range closers { + result = multierror.Append(result, fn()) + } + err = multierror.Flatten(result) + }() + + out, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdout pipes: %w", err) + } + closers = append(closers, out.Close) + + serr, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stderr pipes: %w", err) + } + closers = append(closers, serr.Close) + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + closers = append(closers, r.Close, w.Close) + + cmd := newBinaryCmd(uri, id, ns) + cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w) + // don't need to register this with the reaper or wait when + // running inside a shim + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start binary process: %w", err) + } + closers = append(closers, func() error { return cmd.Process.Kill() }) + + // close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, fmt.Errorf("failed to close write pipe after start: %w", err) + } + + // wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to read from logging binary: %w", err) + } + + return &binaryIO{ + cmd: cmd, + out: out, + err: serr, + }, nil +} + +// newBinaryCmd returns a Cmd to be used to start a logging binary. +// The Cmd is generated from the provided uri, and the container ID and +// namespace are appended to the Cmd environment. +func newBinaryCmd(binaryURI *url.URL, id, ns string) *execabs.Cmd { + var args []string + for k, vs := range binaryURI.Query() { + args = append(args, k) + if len(vs) > 0 { + args = append(args, vs[0]) + } + } + + cmd := execabs.Command(binaryURI.Path, args...) + + cmd.Env = append(cmd.Env, + "CONTAINER_ID="+id, + "CONTAINER_NAMESPACE="+ns, + ) + + return cmd +} + +func (bi *binaryIO) Stdin() io.ReadCloser { + return nil +} + +func (bi *binaryIO) Stdout() io.Writer { + return bi.out.w +} + +func (bi *binaryIO) Stderr() io.Writer { + return bi.err.w +} + +func (bi *binaryIO) Close() error { + var ( + result *multierror.Error + ) + + for _, v := range []*pipe{bi.out, bi.err} { + if v != nil { + if err := v.Close(); err != nil { + result = multierror.Append(result, err) + } + } + } + + if err := bi.cancel(); err != nil { + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} + +func (bi *binaryIO) cancel() error { + if bi.cmd == nil || bi.cmd.Process == nil { + return nil + } + + // Send SIGTERM first, so logger process has a chance to flush and exit properly + if err := bi.cmd.Process.Signal(syscall.SIGTERM); err != nil { + result := multierror.Append(fmt.Errorf("failed to send SIGTERM: %w", err)) + + shimLog.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim") + + if err := bi.cmd.Process.Kill(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to kill process after faulty SIGTERM: %w", err)) + } + + return result.ErrorOrNil() + } + + done := make(chan error, 1) + go func() { + done <- bi.cmd.Wait() + }() + + select { + case err := <-done: + return err + case <-time.After(binaryIOProcTermTimeout): + shimLog.Warn("failed to wait for shim logger process to exit, killing") + + err := bi.cmd.Process.Kill() + if err != nil { + return fmt.Errorf("failed to kill shim logger process: %w", err) + } + + return nil + } +} + +func newPipe() (*pipe, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + return &pipe{ + r: r, + w: w, + }, nil +} + +type pipe struct { + r *os.File + w *os.File +} + +// https://github.com/containerd/containerd/blob/v1.6.6/vendor/github.com/containerd/go-runc/io.go#L71 +func (p *pipe) Close() error { + var result *multierror.Error + + if err := p.w.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close write pipe: %w", err)) + } + + if err := p.r.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close read pipe: %w", err)) + } + + return multierror.Prefix(result.ErrorOrNil(), "pipe:") +} diff --git a/src/runtime/pkg/containerd-shim-v2/shim_io_file.go b/src/runtime/pkg/containerd-shim-v2/shim_io_file.go new file mode 100644 index 0000000000..8ef43981d1 --- /dev/null +++ b/src/runtime/pkg/containerd-shim-v2/shim_io_file.go @@ -0,0 +1,80 @@ +// Copyright (c) 2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "io" + "net/url" + "os" + "path/filepath" + + cioutil "github.com/containerd/containerd/pkg/ioutil" +) + +var ( + _ IO = &fileIO{} +) + +// fileIO only support write both stdout/stderr to the same file +type fileIO struct { + outw io.WriteCloser + errw io.WriteCloser + path string +} + +// openLogFile opens/creates a container log file with its directory. +func openLogFile(path string) (*os.File, error) { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, err + } + return os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) +} + +func newFileIO(ctx context.Context, stdio *stdio, uri *url.URL) (*fileIO, error) { + var outw, errw, f io.WriteCloser + var err error + + logFile := uri.Path + if f, err = openLogFile(logFile); err != nil { + return nil, err + } + + if stdio.Stdout != "" { + outw = cioutil.NewSerialWriteCloser(f) + } + + if !stdio.Console && stdio.Stderr != "" { + errw = cioutil.NewSerialWriteCloser(f) + } + + return &fileIO{ + path: logFile, + outw: outw, + errw: errw, + }, nil +} + +func (fi *fileIO) Close() error { + if fi.outw != nil { + return wc(fi.outw) + } else if fi.errw != nil { + return wc(fi.errw) + } + return nil +} + +func (fi *fileIO) Stdin() io.ReadCloser { + return nil +} + +func (fi *fileIO) Stdout() io.Writer { + return fi.outw +} + +func (fi *fileIO) Stderr() io.Writer { + return fi.errw +} diff --git a/src/runtime/pkg/containerd-shim-v2/shim_io_pipe.go b/src/runtime/pkg/containerd-shim-v2/shim_io_pipe.go new file mode 100644 index 0000000000..702549b41f --- /dev/null +++ b/src/runtime/pkg/containerd-shim-v2/shim_io_pipe.go @@ -0,0 +1,95 @@ +// Copyright (c) 2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package containerdshim + +import ( + "context" + "fmt" + "io" + "syscall" + + "github.com/containerd/fifo" + "github.com/hashicorp/go-multierror" +) + +var ( + _ IO = &pipeIO{} +) + +type pipeIO struct { + in io.ReadCloser + outw io.WriteCloser + errw io.WriteCloser +} + +func newPipeIO(ctx context.Context, stdio *stdio) (*pipeIO, error) { + var in io.ReadCloser + var outw io.WriteCloser + var errw io.WriteCloser + var err error + + if stdio.Stdin != "" { + in, err = fifo.OpenFifo(ctx, stdio.Stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return nil, err + } + } + + if stdio.Stdout != "" { + outw, err = fifo.OpenFifo(ctx, stdio.Stdout, syscall.O_RDWR, 0) + if err != nil { + return nil, err + } + } + + if !stdio.Console && stdio.Stderr != "" { + errw, err = fifo.OpenFifo(ctx, stdio.Stderr, syscall.O_RDWR, 0) + if err != nil { + return nil, err + } + } + + pipeIO := &pipeIO{ + in: in, + outw: outw, + errw: errw, + } + + return pipeIO, nil +} + +func (pi *pipeIO) Stdin() io.ReadCloser { + return pi.in +} + +func (pi *pipeIO) Stdout() io.Writer { + return pi.outw +} + +func (pi *pipeIO) Stderr() io.Writer { + return pi.errw +} + +func (pi *pipeIO) Close() error { + var result *multierror.Error + + if pi.in != nil { + if err := pi.in.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close stdin: %w", err)) + } + pi.in = nil + } + + if err := wc(pi.outw); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close stdout: %w", err)) + } + + if err := wc(pi.errw); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close stderr: %w", err)) + } + + return result.ErrorOrNil() +} diff --git a/src/runtime/pkg/containerd-shim-v2/start.go b/src/runtime/pkg/containerd-shim-v2/start.go index 65bfe6d9a1..d8c9368995 100644 --- a/src/runtime/pkg/containerd-shim-v2/start.go +++ b/src/runtime/pkg/containerd-shim-v2/start.go @@ -8,6 +8,7 @@ package containerdshim import ( "context" "fmt" + "github.com/sirupsen/logrus" "github.com/containerd/containerd/api/types/task" @@ -75,7 +76,7 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error c.stdinPipe = stdin if c.stdin != "" || c.stdout != "" || c.stderr != "" { - tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) + tty, err := newTtyIO(ctx, s.namespace, c.id, c.stdin, c.stdout, c.stderr, c.terminal) if err != nil { return err } @@ -141,7 +142,7 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e * execs.stdinPipe = stdin - tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) + tty, err := newTtyIO(ctx, s.namespace, execs.id, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) if err != nil { return nil, err } diff --git a/src/runtime/pkg/containerd-shim-v2/stream.go b/src/runtime/pkg/containerd-shim-v2/stream.go index 58045359b3..c20e63de82 100644 --- a/src/runtime/pkg/containerd-shim-v2/stream.go +++ b/src/runtime/pkg/containerd-shim-v2/stream.go @@ -7,16 +7,22 @@ package containerdshim import ( "context" + "fmt" "io" + "net/url" "sync" - "syscall" - "github.com/containerd/fifo" "github.com/sirupsen/logrus" ) -// The buffer size used to specify the buffer for IO streams copy -const bufSize = 32 << 10 +const ( + // The buffer size used to specify the buffer for IO streams copy + bufSize = 32 << 10 + + shimLogPluginBinary = "binary" + shimLogPluginFifo = "fifo" + shimLogPluginFile = "file" +) var ( bufPool = sync.Pool{ @@ -27,76 +33,84 @@ var ( } ) +type stdio struct { + Stdin string + Stdout string + Stderr string + Console bool +} +type IO interface { + io.Closer + Stdin() io.ReadCloser + Stdout() io.Writer + Stderr() io.Writer +} + type ttyIO struct { - Stdin io.ReadCloser - Stdout io.Writer - Stderr io.Writer + io IO + raw *stdio } func (tty *ttyIO) close() { - - if tty.Stdin != nil { - tty.Stdin.Close() - tty.Stdin = nil - } - cf := func(w io.Writer) { - if w == nil { - return - } - if c, ok := w.(io.WriteCloser); ok { - c.Close() - } - } - cf(tty.Stdout) - cf(tty.Stderr) + tty.io.Close() } -func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) { - var in io.ReadCloser - var outw io.Writer - var errw io.Writer +// newTtyIO creates a new ttyIO struct. +// ns(namespace)/id(container ID) are used for containerd binary IO. +// containerd will pass the ns/id as ENV to the binary log driver, +// and the binary log driver will use ns/id to get the log options config file. +// for example nerdctl: https://github.com/containerd/nerdctl/blob/v0.21.0/pkg/logging/logging.go#L102 +func newTtyIO(ctx context.Context, ns, id, stdin, stdout, stderr string, console bool) (*ttyIO, error) { var err error + var io IO - if stdin != "" { - in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return nil, err - } + raw := &stdio{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Console: console, } - if stdout != "" { - outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0) - if err != nil { - return nil, err - } + uri, err := url.Parse(stdout) + if err != nil { + return nil, fmt.Errorf("unable to parse stdout uri: %w", err) } - if !console && stderr != "" { - errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0) - if err != nil { - return nil, err - } + if uri.Scheme == "" { + uri.Scheme = "fifo" } - ttyIO := &ttyIO{ - Stdin: in, - Stdout: outw, - Stderr: errw, + switch uri.Scheme { + case shimLogPluginFifo: + io, err = newPipeIO(ctx, raw) + case shimLogPluginBinary: + io, err = newBinaryIO(ctx, ns, id, uri) + case shimLogPluginFile: + io, err = newFileIO(ctx, raw, uri) + default: + return nil, fmt.Errorf("unknown STDIO scheme %s", uri.Scheme) } - return ttyIO, nil + if err != nil { + return nil, fmt.Errorf("failed to creat io stream: %w", err) + } + + return &ttyIO{ + io: io, + raw: raw, + }, nil } func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { var wg sync.WaitGroup - if tty.Stdin != nil { + if tty.io.Stdin() != nil { wg.Add(1) go func() { shimLog.Debug("stdin io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - io.CopyBuffer(stdinPipe, tty.Stdin, *p) + io.CopyBuffer(stdinPipe, tty.io.Stdin(), *p) // notify that we can close process's io safely. close(stdinCloser) wg.Done() @@ -104,30 +118,30 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO }() } - if tty.Stdout != nil { + if tty.io.Stdout() != nil { wg.Add(1) go func() { shimLog.Debug("stdout io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - io.CopyBuffer(tty.Stdout, stdoutPipe, *p) + io.CopyBuffer(tty.io.Stdout(), stdoutPipe, *p) wg.Done() - if tty.Stdin != nil { + if tty.io.Stdin() != nil { // close stdin to make the other routine stop - tty.Stdin.Close() + tty.io.Stdin().Close() } shimLog.Debug("stdout io stream copy exited") }() } - if tty.Stderr != nil && stderrPipe != nil { + if tty.io.Stderr() != nil && stderrPipe != nil { wg.Add(1) go func() { shimLog.Debug("stderr io stream copy started") p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - io.CopyBuffer(tty.Stderr, stderrPipe, *p) + io.CopyBuffer(tty.io.Stderr(), stderrPipe, *p) wg.Done() shimLog.Debug("stderr io stream copy exited") }() @@ -138,3 +152,10 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO close(exitch) shimLog.Debug("all io stream copy goroutines exited") } + +func wc(w io.WriteCloser) error { + if w == nil { + return nil + } + return w.Close() +} diff --git a/src/runtime/pkg/containerd-shim-v2/stream_test.go b/src/runtime/pkg/containerd-shim-v2/stream_test.go index d5317a172a..ea4f026ca1 100644 --- a/src/runtime/pkg/containerd-shim-v2/stream_test.go +++ b/src/runtime/pkg/containerd-shim-v2/stream_test.go @@ -7,7 +7,6 @@ package containerdshim import ( "context" - "github.com/sirupsen/logrus" "io" "os" "path/filepath" @@ -15,6 +14,8 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" + "github.com/containerd/fifo" "github.com/stretchr/testify/assert" ) @@ -45,7 +46,7 @@ func TestNewTtyIOFifoReopen(t *testing.T) { defer outr.Close() errr = createReadFifo(stderr) defer errr.Close() - tty, err = newTtyIO(ctx, "", stdout, stderr, false) + tty, err = newTtyIO(ctx, "", "", "", stdout, stderr, false) assert.NoError(err) defer tty.close() @@ -72,9 +73,9 @@ func TestNewTtyIOFifoReopen(t *testing.T) { } } - checkFifoWrite(tty.Stdout) + checkFifoWrite(tty.io.Stdout()) checkFifoRead(outr) - checkFifoWrite(tty.Stderr) + checkFifoWrite(tty.io.Stderr()) checkFifoRead(errr) err = outr.Close() @@ -84,8 +85,8 @@ func TestNewTtyIOFifoReopen(t *testing.T) { // Make sure that writing to tty fifo will not get `EPIPE` // when the read side is closed - checkFifoWrite(tty.Stdout) - checkFifoWrite(tty.Stderr) + checkFifoWrite(tty.io.Stdout()) + checkFifoWrite(tty.io.Stderr()) // Reopen the fifo outr = createReadFifo(stdout) @@ -171,7 +172,7 @@ func TestIoCopy(t *testing.T) { defer srcInW.Close() } - tty, err := newTtyIO(ctx, srcStdinPath, dstStdoutPath, dstStderrPath, false) + tty, err := newTtyIO(ctx, "", "", srcStdinPath, dstStdoutPath, dstStderrPath, false) assert.NoError(err) defer tty.close() diff --git a/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/read_closer.go b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/read_closer.go new file mode 100644 index 0000000000..fbc30a6f73 --- /dev/null +++ b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/read_closer.go @@ -0,0 +1,57 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ioutil + +import "io" + +// writeCloseInformer wraps a reader with a close function. +type wrapReadCloser struct { + reader *io.PipeReader + writer *io.PipeWriter +} + +// NewWrapReadCloser creates a wrapReadCloser from a reader. +// NOTE(random-liu): To avoid goroutine leakage, the reader passed in +// must be eventually closed by the caller. +func NewWrapReadCloser(r io.Reader) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + _, _ = io.Copy(pw, r) + pr.Close() + pw.Close() + }() + return &wrapReadCloser{ + reader: pr, + writer: pw, + } +} + +// Read reads up to len(p) bytes into p. +func (w *wrapReadCloser) Read(p []byte) (int, error) { + n, err := w.reader.Read(p) + if err == io.ErrClosedPipe { + return n, io.EOF + } + return n, err +} + +// Close closes read closer. +func (w *wrapReadCloser) Close() error { + w.reader.Close() + w.writer.Close() + return nil +} diff --git a/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/write_closer.go b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/write_closer.go new file mode 100644 index 0000000000..c816c514ad --- /dev/null +++ b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/write_closer.go @@ -0,0 +1,102 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ioutil + +import ( + "io" + "sync" +) + +// writeCloseInformer wraps passed in write closer with a close channel. +// Caller could wait on the close channel for the write closer to be +// closed. +type writeCloseInformer struct { + close chan struct{} + wc io.WriteCloser +} + +// NewWriteCloseInformer creates the writeCloseInformer from a write closer. +func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) { + close := make(chan struct{}) + return &writeCloseInformer{ + close: close, + wc: wc, + }, close +} + +// Write passes through the data into the internal write closer. +func (w *writeCloseInformer) Write(p []byte) (int, error) { + return w.wc.Write(p) +} + +// Close closes the internal write closer and inform the close channel. +func (w *writeCloseInformer) Close() error { + err := w.wc.Close() + close(w.close) + return err +} + +// nopWriteCloser wraps passed in writer with a nop close function. +type nopWriteCloser struct { + w io.Writer +} + +// NewNopWriteCloser creates the nopWriteCloser from a writer. +func NewNopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w: w} +} + +// Write passes through the data into the internal writer. +func (n *nopWriteCloser) Write(p []byte) (int, error) { + return n.w.Write(p) +} + +// Close is a nop close function. +func (n *nopWriteCloser) Close() error { + return nil +} + +// serialWriteCloser wraps a write closer and makes sure all writes +// are done in serial. +// Parallel write won't intersect with each other. Use case: +// 1) Pipe: Write content longer than PIPE_BUF. +// See http://man7.org/linux/man-pages/man7/pipe.7.html +// 2) <3.14 Linux Kernel: write is not atomic +// See http://man7.org/linux/man-pages/man2/write.2.html +type serialWriteCloser struct { + mu sync.Mutex + wc io.WriteCloser +} + +// NewSerialWriteCloser creates a SerialWriteCloser from a write closer. +func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser { + return &serialWriteCloser{wc: wc} +} + +// Write writes a group of byte arrays in order atomically. +func (s *serialWriteCloser) Write(data []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Write(data) +} + +// Close closes the write closer. +func (s *serialWriteCloser) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Close() +} diff --git a/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/writer_group.go b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/writer_group.go new file mode 100644 index 0000000000..0ed550497b --- /dev/null +++ b/src/runtime/vendor/github.com/containerd/containerd/pkg/ioutil/writer_group.go @@ -0,0 +1,105 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ioutil + +import ( + "errors" + "io" + "sync" +) + +// WriterGroup is a group of writers. Writer could be dynamically +// added and removed. +type WriterGroup struct { + mu sync.Mutex + writers map[string]io.WriteCloser + closed bool +} + +var _ io.Writer = &WriterGroup{} + +// NewWriterGroup creates an empty writer group. +func NewWriterGroup() *WriterGroup { + return &WriterGroup{ + writers: make(map[string]io.WriteCloser), + } +} + +// Add adds a writer into the group. The writer will be closed +// if the writer group is closed. +func (g *WriterGroup) Add(key string, w io.WriteCloser) { + g.mu.Lock() + defer g.mu.Unlock() + if g.closed { + w.Close() + return + } + g.writers[key] = w +} + +// Get gets a writer from the group, returns nil if the writer +// doesn't exist. +func (g *WriterGroup) Get(key string) io.WriteCloser { + g.mu.Lock() + defer g.mu.Unlock() + return g.writers[key] +} + +// Remove removes a writer from the group. +func (g *WriterGroup) Remove(key string) { + g.mu.Lock() + defer g.mu.Unlock() + w, ok := g.writers[key] + if !ok { + return + } + w.Close() + delete(g.writers, key) +} + +// Write writes data into each writer. If a writer returns error, +// it will be closed and removed from the writer group. It returns +// error if writer group is empty. +func (g *WriterGroup) Write(p []byte) (int, error) { + g.mu.Lock() + defer g.mu.Unlock() + for k, w := range g.writers { + n, err := w.Write(p) + if err == nil && len(p) == n { + continue + } + // The writer is closed or in bad state, remove it. + w.Close() + delete(g.writers, k) + } + if len(g.writers) == 0 { + return 0, errors.New("writer group is empty") + } + return len(p), nil +} + +// Close closes the writer group. Write will return error after +// closed. +func (g *WriterGroup) Close() { + g.mu.Lock() + defer g.mu.Unlock() + for _, w := range g.writers { + w.Close() + } + g.writers = nil + g.closed = true +} diff --git a/src/runtime/vendor/modules.txt b/src/runtime/vendor/modules.txt index 8c27d91df4..adf9f6ce53 100644 --- a/src/runtime/vendor/modules.txt +++ b/src/runtime/vendor/modules.txt @@ -81,6 +81,7 @@ github.com/containerd/containerd/mount github.com/containerd/containerd/namespaces github.com/containerd/containerd/pkg/cri/annotations github.com/containerd/containerd/pkg/dialer +github.com/containerd/containerd/pkg/ioutil github.com/containerd/containerd/pkg/runtimeoptions/v1 github.com/containerd/containerd/pkg/shutdown github.com/containerd/containerd/pkg/ttrpcutil