shim: support shim v2 logging plugin

Now kata shim only supports stdout/stderr of fifo from
containerd/CRI-O, but shim v2 supports logging plugins,
and nerdctl default will use the binary schema for logs.

This commit will add the others type of log plugins:

- file
- binary

In case of binary, kata shim will receive a stdout/stderr like:

binary:///nerdctl?_NERDCTL_INTERNAL_LOGGING=/var/lib/nerdctl/1935db59

That means the nerdctl process will handle the logs(stdout/stderr)

Fixes: #4420

Signed-off-by: Bin Liu <bin@hyper.sh>
This commit is contained in:
Bin Liu 2022-06-08 20:58:19 +08:00 committed by liubin
parent a238d8c6bd
commit 4e30e11b31
11 changed files with 752 additions and 63 deletions

View File

@ -7,6 +7,7 @@ package containerdshim
import ( import (
"context" "context"
"fmt"
"io" "io"
"os" "os"
sysexec "os/exec" sysexec "os/exec"
@ -83,6 +84,11 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
vci.SetLogger(ctx, shimLog) vci.SetLogger(ctx, shimLog)
katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level) katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level)
ns, found := namespaces.Namespace(ctx)
if !found {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
s := &service{ s := &service{
id: id, id: id,
pid: uint32(os.Getpid()), pid: uint32(os.Getpid()),
@ -91,6 +97,7 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
events: make(chan interface{}, chSize), events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize), ec: make(chan exit, bufferSize),
cancel: shutdown, cancel: shutdown,
namespace: ns,
} }
go s.processExits() go s.processExits()
@ -129,6 +136,9 @@ type service struct {
id string id string
// Namespace from upper container engine
namespace string
mu sync.Mutex mu sync.Mutex
eventSendMu sync.Mutex eventSendMu sync.Mutex

View File

@ -0,0 +1,216 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io"
"net/url"
"os"
"syscall"
"time"
"golang.org/x/sys/execabs"
"github.com/hashicorp/go-multierror"
)
const (
binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
)
var (
_ IO = &binaryIO{}
)
// binaryIO related code is from https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L311
type binaryIO struct {
cmd *execabs.Cmd
out, err *pipe
}
// https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L248
func newBinaryIO(ctx context.Context, ns, id string, uri *url.URL) (bio *binaryIO, err error) {
var closers []func() error
defer func() {
if err == nil {
return
}
result := multierror.Append(err)
for _, fn := range closers {
result = multierror.Append(result, fn())
}
err = multierror.Flatten(result)
}()
out, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
closers = append(closers, out.Close)
serr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
closers = append(closers, serr.Close)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)
cmd := newBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })
// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
return &binaryIO{
cmd: cmd,
out: out,
err: serr,
}, nil
}
// newBinaryCmd returns a Cmd to be used to start a logging binary.
// The Cmd is generated from the provided uri, and the container ID and
// namespace are appended to the Cmd environment.
func newBinaryCmd(binaryURI *url.URL, id, ns string) *execabs.Cmd {
var args []string
for k, vs := range binaryURI.Query() {
args = append(args, k)
if len(vs) > 0 {
args = append(args, vs[0])
}
}
cmd := execabs.Command(binaryURI.Path, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)
return cmd
}
func (bi *binaryIO) Stdin() io.ReadCloser {
return nil
}
func (bi *binaryIO) Stdout() io.Writer {
return bi.out.w
}
func (bi *binaryIO) Stderr() io.Writer {
return bi.err.w
}
func (bi *binaryIO) Close() error {
var (
result *multierror.Error
)
for _, v := range []*pipe{bi.out, bi.err} {
if v != nil {
if err := v.Close(); err != nil {
result = multierror.Append(result, err)
}
}
}
if err := bi.cancel(); err != nil {
result = multierror.Append(result, err)
}
return result.ErrorOrNil()
}
func (bi *binaryIO) cancel() error {
if bi.cmd == nil || bi.cmd.Process == nil {
return nil
}
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := bi.cmd.Process.Signal(syscall.SIGTERM); err != nil {
result := multierror.Append(fmt.Errorf("failed to send SIGTERM: %w", err))
shimLog.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
if err := bi.cmd.Process.Kill(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to kill process after faulty SIGTERM: %w", err))
}
return result.ErrorOrNil()
}
done := make(chan error, 1)
go func() {
done <- bi.cmd.Wait()
}()
select {
case err := <-done:
return err
case <-time.After(binaryIOProcTermTimeout):
shimLog.Warn("failed to wait for shim logger process to exit, killing")
err := bi.cmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to kill shim logger process: %w", err)
}
return nil
}
}
func newPipe() (*pipe, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &pipe{
r: r,
w: w,
}, nil
}
type pipe struct {
r *os.File
w *os.File
}
// https://github.com/containerd/containerd/blob/v1.6.6/vendor/github.com/containerd/go-runc/io.go#L71
func (p *pipe) Close() error {
var result *multierror.Error
if err := p.w.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close write pipe: %w", err))
}
if err := p.r.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close read pipe: %w", err))
}
return multierror.Prefix(result.ErrorOrNil(), "pipe:")
}

View File

@ -0,0 +1,80 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"io"
"net/url"
"os"
"path/filepath"
cioutil "github.com/containerd/containerd/pkg/ioutil"
)
var (
_ IO = &fileIO{}
)
// fileIO only support write both stdout/stderr to the same file
type fileIO struct {
outw io.WriteCloser
errw io.WriteCloser
path string
}
// openLogFile opens/creates a container log file with its directory.
func openLogFile(path string) (*os.File, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
return os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
}
func newFileIO(ctx context.Context, stdio *stdio, uri *url.URL) (*fileIO, error) {
var outw, errw, f io.WriteCloser
var err error
logFile := uri.Path
if f, err = openLogFile(logFile); err != nil {
return nil, err
}
if stdio.Stdout != "" {
outw = cioutil.NewSerialWriteCloser(f)
}
if !stdio.Console && stdio.Stderr != "" {
errw = cioutil.NewSerialWriteCloser(f)
}
return &fileIO{
path: logFile,
outw: outw,
errw: errw,
}, nil
}
func (fi *fileIO) Close() error {
if fi.outw != nil {
return wc(fi.outw)
} else if fi.errw != nil {
return wc(fi.errw)
}
return nil
}
func (fi *fileIO) Stdin() io.ReadCloser {
return nil
}
func (fi *fileIO) Stdout() io.Writer {
return fi.outw
}
func (fi *fileIO) Stderr() io.Writer {
return fi.errw
}

View File

@ -0,0 +1,95 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io"
"syscall"
"github.com/containerd/fifo"
"github.com/hashicorp/go-multierror"
)
var (
_ IO = &pipeIO{}
)
type pipeIO struct {
in io.ReadCloser
outw io.WriteCloser
errw io.WriteCloser
}
func newPipeIO(ctx context.Context, stdio *stdio) (*pipeIO, error) {
var in io.ReadCloser
var outw io.WriteCloser
var errw io.WriteCloser
var err error
if stdio.Stdin != "" {
in, err = fifo.OpenFifo(ctx, stdio.Stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
}
if stdio.Stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdio.Stdout, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
if !stdio.Console && stdio.Stderr != "" {
errw, err = fifo.OpenFifo(ctx, stdio.Stderr, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
pipeIO := &pipeIO{
in: in,
outw: outw,
errw: errw,
}
return pipeIO, nil
}
func (pi *pipeIO) Stdin() io.ReadCloser {
return pi.in
}
func (pi *pipeIO) Stdout() io.Writer {
return pi.outw
}
func (pi *pipeIO) Stderr() io.Writer {
return pi.errw
}
func (pi *pipeIO) Close() error {
var result *multierror.Error
if pi.in != nil {
if err := pi.in.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stdin: %w", err))
}
pi.in = nil
}
if err := wc(pi.outw); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stdout: %w", err))
}
if err := wc(pi.errw); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stderr: %w", err))
}
return result.ErrorOrNil()
}

View File

@ -8,6 +8,7 @@ package containerdshim
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
@ -75,7 +76,7 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
c.stdinPipe = stdin c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" { if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) tty, err := newTtyIO(ctx, s.namespace, c.id, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil { if err != nil {
return err return err
} }
@ -141,7 +142,7 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e *
execs.stdinPipe = stdin execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) tty, err := newTtyIO(ctx, s.namespace, execs.id, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -7,16 +7,22 @@ package containerdshim
import ( import (
"context" "context"
"fmt"
"io" "io"
"net/url"
"sync" "sync"
"syscall"
"github.com/containerd/fifo"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// The buffer size used to specify the buffer for IO streams copy const (
const bufSize = 32 << 10 // The buffer size used to specify the buffer for IO streams copy
bufSize = 32 << 10
shimLogPluginBinary = "binary"
shimLogPluginFifo = "fifo"
shimLogPluginFile = "file"
)
var ( var (
bufPool = sync.Pool{ bufPool = sync.Pool{
@ -27,76 +33,84 @@ var (
} }
) )
type stdio struct {
Stdin string
Stdout string
Stderr string
Console bool
}
type IO interface {
io.Closer
Stdin() io.ReadCloser
Stdout() io.Writer
Stderr() io.Writer
}
type ttyIO struct { type ttyIO struct {
Stdin io.ReadCloser io IO
Stdout io.Writer raw *stdio
Stderr io.Writer
} }
func (tty *ttyIO) close() { func (tty *ttyIO) close() {
tty.io.Close()
if tty.Stdin != nil {
tty.Stdin.Close()
tty.Stdin = nil
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
} }
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) { // newTtyIO creates a new ttyIO struct.
var in io.ReadCloser // ns(namespace)/id(container ID) are used for containerd binary IO.
var outw io.Writer // containerd will pass the ns/id as ENV to the binary log driver,
var errw io.Writer // and the binary log driver will use ns/id to get the log options config file.
// for example nerdctl: https://github.com/containerd/nerdctl/blob/v0.21.0/pkg/logging/logging.go#L102
func newTtyIO(ctx context.Context, ns, id, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var err error var err error
var io IO
if stdin != "" { raw := &stdio{
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Console: console,
}
uri, err := url.Parse(stdout)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
} }
if uri.Scheme == "" {
uri.Scheme = "fifo"
}
switch uri.Scheme {
case shimLogPluginFifo:
io, err = newPipeIO(ctx, raw)
case shimLogPluginBinary:
io, err = newBinaryIO(ctx, ns, id, uri)
case shimLogPluginFile:
io, err = newFileIO(ctx, raw, uri)
default:
return nil, fmt.Errorf("unknown STDIO scheme %s", uri.Scheme)
} }
if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to creat io stream: %w", err)
}
} }
if !console && stderr != "" { return &ttyIO{
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0) io: io,
if err != nil { raw: raw,
return nil, err }, nil
}
}
ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
}
return ttyIO, nil
} }
func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup var wg sync.WaitGroup
if tty.Stdin != nil { if tty.io.Stdin() != nil {
wg.Add(1) wg.Add(1)
go func() { go func() {
shimLog.Debug("stdin io stream copy started") shimLog.Debug("stdin io stream copy started")
p := bufPool.Get().(*[]byte) p := bufPool.Get().(*[]byte)
defer bufPool.Put(p) defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p) io.CopyBuffer(stdinPipe, tty.io.Stdin(), *p)
// notify that we can close process's io safely. // notify that we can close process's io safely.
close(stdinCloser) close(stdinCloser)
wg.Done() wg.Done()
@ -104,30 +118,30 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
}() }()
} }
if tty.Stdout != nil { if tty.io.Stdout() != nil {
wg.Add(1) wg.Add(1)
go func() { go func() {
shimLog.Debug("stdout io stream copy started") shimLog.Debug("stdout io stream copy started")
p := bufPool.Get().(*[]byte) p := bufPool.Get().(*[]byte)
defer bufPool.Put(p) defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p) io.CopyBuffer(tty.io.Stdout(), stdoutPipe, *p)
wg.Done() wg.Done()
if tty.Stdin != nil { if tty.io.Stdin() != nil {
// close stdin to make the other routine stop // close stdin to make the other routine stop
tty.Stdin.Close() tty.io.Stdin().Close()
} }
shimLog.Debug("stdout io stream copy exited") shimLog.Debug("stdout io stream copy exited")
}() }()
} }
if tty.Stderr != nil && stderrPipe != nil { if tty.io.Stderr() != nil && stderrPipe != nil {
wg.Add(1) wg.Add(1)
go func() { go func() {
shimLog.Debug("stderr io stream copy started") shimLog.Debug("stderr io stream copy started")
p := bufPool.Get().(*[]byte) p := bufPool.Get().(*[]byte)
defer bufPool.Put(p) defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p) io.CopyBuffer(tty.io.Stderr(), stderrPipe, *p)
wg.Done() wg.Done()
shimLog.Debug("stderr io stream copy exited") shimLog.Debug("stderr io stream copy exited")
}() }()
@ -138,3 +152,10 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
close(exitch) close(exitch)
shimLog.Debug("all io stream copy goroutines exited") shimLog.Debug("all io stream copy goroutines exited")
} }
func wc(w io.WriteCloser) error {
if w == nil {
return nil
}
return w.Close()
}

View File

@ -7,7 +7,6 @@ package containerdshim
import ( import (
"context" "context"
"github.com/sirupsen/logrus"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -15,6 +14,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -45,7 +46,7 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
defer outr.Close() defer outr.Close()
errr = createReadFifo(stderr) errr = createReadFifo(stderr)
defer errr.Close() defer errr.Close()
tty, err = newTtyIO(ctx, "", stdout, stderr, false) tty, err = newTtyIO(ctx, "", "", "", stdout, stderr, false)
assert.NoError(err) assert.NoError(err)
defer tty.close() defer tty.close()
@ -72,9 +73,9 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
} }
} }
checkFifoWrite(tty.Stdout) checkFifoWrite(tty.io.Stdout())
checkFifoRead(outr) checkFifoRead(outr)
checkFifoWrite(tty.Stderr) checkFifoWrite(tty.io.Stderr())
checkFifoRead(errr) checkFifoRead(errr)
err = outr.Close() err = outr.Close()
@ -84,8 +85,8 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
// Make sure that writing to tty fifo will not get `EPIPE` // Make sure that writing to tty fifo will not get `EPIPE`
// when the read side is closed // when the read side is closed
checkFifoWrite(tty.Stdout) checkFifoWrite(tty.io.Stdout())
checkFifoWrite(tty.Stderr) checkFifoWrite(tty.io.Stderr())
// Reopen the fifo // Reopen the fifo
outr = createReadFifo(stdout) outr = createReadFifo(stdout)
@ -171,7 +172,7 @@ func TestIoCopy(t *testing.T) {
defer srcInW.Close() defer srcInW.Close()
} }
tty, err := newTtyIO(ctx, srcStdinPath, dstStdoutPath, dstStderrPath, false) tty, err := newTtyIO(ctx, "", "", srcStdinPath, dstStdoutPath, dstStderrPath, false)
assert.NoError(err) assert.NoError(err)
defer tty.close() defer tty.close()

View File

@ -0,0 +1,57 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import "io"
// writeCloseInformer wraps a reader with a close function.
type wrapReadCloser struct {
reader *io.PipeReader
writer *io.PipeWriter
}
// NewWrapReadCloser creates a wrapReadCloser from a reader.
// NOTE(random-liu): To avoid goroutine leakage, the reader passed in
// must be eventually closed by the caller.
func NewWrapReadCloser(r io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
_, _ = io.Copy(pw, r)
pr.Close()
pw.Close()
}()
return &wrapReadCloser{
reader: pr,
writer: pw,
}
}
// Read reads up to len(p) bytes into p.
func (w *wrapReadCloser) Read(p []byte) (int, error) {
n, err := w.reader.Read(p)
if err == io.ErrClosedPipe {
return n, io.EOF
}
return n, err
}
// Close closes read closer.
func (w *wrapReadCloser) Close() error {
w.reader.Close()
w.writer.Close()
return nil
}

View File

@ -0,0 +1,102 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"io"
"sync"
)
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
// closed.
type writeCloseInformer struct {
close chan struct{}
wc io.WriteCloser
}
// NewWriteCloseInformer creates the writeCloseInformer from a write closer.
func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) {
close := make(chan struct{})
return &writeCloseInformer{
close: close,
wc: wc,
}, close
}
// Write passes through the data into the internal write closer.
func (w *writeCloseInformer) Write(p []byte) (int, error) {
return w.wc.Write(p)
}
// Close closes the internal write closer and inform the close channel.
func (w *writeCloseInformer) Close() error {
err := w.wc.Close()
close(w.close)
return err
}
// nopWriteCloser wraps passed in writer with a nop close function.
type nopWriteCloser struct {
w io.Writer
}
// NewNopWriteCloser creates the nopWriteCloser from a writer.
func NewNopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w: w}
}
// Write passes through the data into the internal writer.
func (n *nopWriteCloser) Write(p []byte) (int, error) {
return n.w.Write(p)
}
// Close is a nop close function.
func (n *nopWriteCloser) Close() error {
return nil
}
// serialWriteCloser wraps a write closer and makes sure all writes
// are done in serial.
// Parallel write won't intersect with each other. Use case:
// 1) Pipe: Write content longer than PIPE_BUF.
// See http://man7.org/linux/man-pages/man7/pipe.7.html
// 2) <3.14 Linux Kernel: write is not atomic
// See http://man7.org/linux/man-pages/man2/write.2.html
type serialWriteCloser struct {
mu sync.Mutex
wc io.WriteCloser
}
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
return &serialWriteCloser{wc: wc}
}
// Write writes a group of byte arrays in order atomically.
func (s *serialWriteCloser) Write(data []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Write(data)
}
// Close closes the write closer.
func (s *serialWriteCloser) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Close()
}

View File

@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"errors"
"io"
"sync"
)
// WriterGroup is a group of writers. Writer could be dynamically
// added and removed.
type WriterGroup struct {
mu sync.Mutex
writers map[string]io.WriteCloser
closed bool
}
var _ io.Writer = &WriterGroup{}
// NewWriterGroup creates an empty writer group.
func NewWriterGroup() *WriterGroup {
return &WriterGroup{
writers: make(map[string]io.WriteCloser),
}
}
// Add adds a writer into the group. The writer will be closed
// if the writer group is closed.
func (g *WriterGroup) Add(key string, w io.WriteCloser) {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
w.Close()
return
}
g.writers[key] = w
}
// Get gets a writer from the group, returns nil if the writer
// doesn't exist.
func (g *WriterGroup) Get(key string) io.WriteCloser {
g.mu.Lock()
defer g.mu.Unlock()
return g.writers[key]
}
// Remove removes a writer from the group.
func (g *WriterGroup) Remove(key string) {
g.mu.Lock()
defer g.mu.Unlock()
w, ok := g.writers[key]
if !ok {
return
}
w.Close()
delete(g.writers, key)
}
// Write writes data into each writer. If a writer returns error,
// it will be closed and removed from the writer group. It returns
// error if writer group is empty.
func (g *WriterGroup) Write(p []byte) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()
for k, w := range g.writers {
n, err := w.Write(p)
if err == nil && len(p) == n {
continue
}
// The writer is closed or in bad state, remove it.
w.Close()
delete(g.writers, k)
}
if len(g.writers) == 0 {
return 0, errors.New("writer group is empty")
}
return len(p), nil
}
// Close closes the writer group. Write will return error after
// closed.
func (g *WriterGroup) Close() {
g.mu.Lock()
defer g.mu.Unlock()
for _, w := range g.writers {
w.Close()
}
g.writers = nil
g.closed = true
}

View File

@ -79,6 +79,7 @@ github.com/containerd/containerd/mount
github.com/containerd/containerd/namespaces github.com/containerd/containerd/namespaces
github.com/containerd/containerd/pkg/cri/annotations github.com/containerd/containerd/pkg/cri/annotations
github.com/containerd/containerd/pkg/dialer github.com/containerd/containerd/pkg/dialer
github.com/containerd/containerd/pkg/ioutil
github.com/containerd/containerd/pkg/runtimeoptions/v1 github.com/containerd/containerd/pkg/runtimeoptions/v1
github.com/containerd/containerd/pkg/shutdown github.com/containerd/containerd/pkg/shutdown
github.com/containerd/containerd/pkg/ttrpcutil github.com/containerd/containerd/pkg/ttrpcutil