shim: support shim v2 logging plugin

Now kata shim only supports stdout/stderr of fifo from
containerd/CRI-O, but shim v2 supports logging plugins,
and nerdctl default will use the binary schema for logs.

This commit will add the others type of log plugins:

- file
- binary

In case of binary, kata shim will receive a stdout/stderr like:

binary:///nerdctl?_NERDCTL_INTERNAL_LOGGING=/var/lib/nerdctl/1935db59

That means the nerdctl process will handle the logs(stdout/stderr)

Fixes: #4420

Signed-off-by: Bin Liu <bin@hyper.sh>
This commit is contained in:
Bin Liu 2022-06-08 20:58:19 +08:00 committed by liubin
parent a238d8c6bd
commit 4e30e11b31
11 changed files with 752 additions and 63 deletions

View File

@ -7,6 +7,7 @@ package containerdshim
import (
"context"
"fmt"
"io"
"os"
sysexec "os/exec"
@ -83,6 +84,11 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
vci.SetLogger(ctx, shimLog)
katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level)
ns, found := namespaces.Namespace(ctx)
if !found {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
s := &service{
id: id,
pid: uint32(os.Getpid()),
@ -91,6 +97,7 @@ func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown fu
events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize),
cancel: shutdown,
namespace: ns,
}
go s.processExits()
@ -129,6 +136,9 @@ type service struct {
id string
// Namespace from upper container engine
namespace string
mu sync.Mutex
eventSendMu sync.Mutex

View File

@ -0,0 +1,216 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io"
"net/url"
"os"
"syscall"
"time"
"golang.org/x/sys/execabs"
"github.com/hashicorp/go-multierror"
)
const (
binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
)
var (
_ IO = &binaryIO{}
)
// binaryIO related code is from https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L311
type binaryIO struct {
cmd *execabs.Cmd
out, err *pipe
}
// https://github.com/containerd/containerd/blob/v1.6.6/pkg/process/io.go#L248
func newBinaryIO(ctx context.Context, ns, id string, uri *url.URL) (bio *binaryIO, err error) {
var closers []func() error
defer func() {
if err == nil {
return
}
result := multierror.Append(err)
for _, fn := range closers {
result = multierror.Append(result, fn())
}
err = multierror.Flatten(result)
}()
out, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
closers = append(closers, out.Close)
serr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
closers = append(closers, serr.Close)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)
cmd := newBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })
// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
return &binaryIO{
cmd: cmd,
out: out,
err: serr,
}, nil
}
// newBinaryCmd returns a Cmd to be used to start a logging binary.
// The Cmd is generated from the provided uri, and the container ID and
// namespace are appended to the Cmd environment.
func newBinaryCmd(binaryURI *url.URL, id, ns string) *execabs.Cmd {
var args []string
for k, vs := range binaryURI.Query() {
args = append(args, k)
if len(vs) > 0 {
args = append(args, vs[0])
}
}
cmd := execabs.Command(binaryURI.Path, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)
return cmd
}
func (bi *binaryIO) Stdin() io.ReadCloser {
return nil
}
func (bi *binaryIO) Stdout() io.Writer {
return bi.out.w
}
func (bi *binaryIO) Stderr() io.Writer {
return bi.err.w
}
func (bi *binaryIO) Close() error {
var (
result *multierror.Error
)
for _, v := range []*pipe{bi.out, bi.err} {
if v != nil {
if err := v.Close(); err != nil {
result = multierror.Append(result, err)
}
}
}
if err := bi.cancel(); err != nil {
result = multierror.Append(result, err)
}
return result.ErrorOrNil()
}
func (bi *binaryIO) cancel() error {
if bi.cmd == nil || bi.cmd.Process == nil {
return nil
}
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := bi.cmd.Process.Signal(syscall.SIGTERM); err != nil {
result := multierror.Append(fmt.Errorf("failed to send SIGTERM: %w", err))
shimLog.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
if err := bi.cmd.Process.Kill(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to kill process after faulty SIGTERM: %w", err))
}
return result.ErrorOrNil()
}
done := make(chan error, 1)
go func() {
done <- bi.cmd.Wait()
}()
select {
case err := <-done:
return err
case <-time.After(binaryIOProcTermTimeout):
shimLog.Warn("failed to wait for shim logger process to exit, killing")
err := bi.cmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to kill shim logger process: %w", err)
}
return nil
}
}
func newPipe() (*pipe, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &pipe{
r: r,
w: w,
}, nil
}
type pipe struct {
r *os.File
w *os.File
}
// https://github.com/containerd/containerd/blob/v1.6.6/vendor/github.com/containerd/go-runc/io.go#L71
func (p *pipe) Close() error {
var result *multierror.Error
if err := p.w.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close write pipe: %w", err))
}
if err := p.r.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close read pipe: %w", err))
}
return multierror.Prefix(result.ErrorOrNil(), "pipe:")
}

View File

@ -0,0 +1,80 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"io"
"net/url"
"os"
"path/filepath"
cioutil "github.com/containerd/containerd/pkg/ioutil"
)
var (
_ IO = &fileIO{}
)
// fileIO only support write both stdout/stderr to the same file
type fileIO struct {
outw io.WriteCloser
errw io.WriteCloser
path string
}
// openLogFile opens/creates a container log file with its directory.
func openLogFile(path string) (*os.File, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
return os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
}
func newFileIO(ctx context.Context, stdio *stdio, uri *url.URL) (*fileIO, error) {
var outw, errw, f io.WriteCloser
var err error
logFile := uri.Path
if f, err = openLogFile(logFile); err != nil {
return nil, err
}
if stdio.Stdout != "" {
outw = cioutil.NewSerialWriteCloser(f)
}
if !stdio.Console && stdio.Stderr != "" {
errw = cioutil.NewSerialWriteCloser(f)
}
return &fileIO{
path: logFile,
outw: outw,
errw: errw,
}, nil
}
func (fi *fileIO) Close() error {
if fi.outw != nil {
return wc(fi.outw)
} else if fi.errw != nil {
return wc(fi.errw)
}
return nil
}
func (fi *fileIO) Stdin() io.ReadCloser {
return nil
}
func (fi *fileIO) Stdout() io.Writer {
return fi.outw
}
func (fi *fileIO) Stderr() io.Writer {
return fi.errw
}

View File

@ -0,0 +1,95 @@
// Copyright (c) 2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io"
"syscall"
"github.com/containerd/fifo"
"github.com/hashicorp/go-multierror"
)
var (
_ IO = &pipeIO{}
)
type pipeIO struct {
in io.ReadCloser
outw io.WriteCloser
errw io.WriteCloser
}
func newPipeIO(ctx context.Context, stdio *stdio) (*pipeIO, error) {
var in io.ReadCloser
var outw io.WriteCloser
var errw io.WriteCloser
var err error
if stdio.Stdin != "" {
in, err = fifo.OpenFifo(ctx, stdio.Stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
}
if stdio.Stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdio.Stdout, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
if !stdio.Console && stdio.Stderr != "" {
errw, err = fifo.OpenFifo(ctx, stdio.Stderr, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
pipeIO := &pipeIO{
in: in,
outw: outw,
errw: errw,
}
return pipeIO, nil
}
func (pi *pipeIO) Stdin() io.ReadCloser {
return pi.in
}
func (pi *pipeIO) Stdout() io.Writer {
return pi.outw
}
func (pi *pipeIO) Stderr() io.Writer {
return pi.errw
}
func (pi *pipeIO) Close() error {
var result *multierror.Error
if pi.in != nil {
if err := pi.in.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stdin: %w", err))
}
pi.in = nil
}
if err := wc(pi.outw); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stdout: %w", err))
}
if err := wc(pi.errw); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close stderr: %w", err))
}
return result.ErrorOrNil()
}

View File

@ -8,6 +8,7 @@ package containerdshim
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd/api/types/task"
@ -75,7 +76,7 @@ func startContainer(ctx context.Context, s *service, c *container) (retErr error
c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
tty, err := newTtyIO(ctx, s.namespace, c.id, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
@ -141,7 +142,7 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (e *
execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
tty, err := newTtyIO(ctx, s.namespace, execs.id, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}

View File

@ -7,16 +7,22 @@ package containerdshim
import (
"context"
"fmt"
"io"
"net/url"
"sync"
"syscall"
"github.com/containerd/fifo"
"github.com/sirupsen/logrus"
)
// The buffer size used to specify the buffer for IO streams copy
const bufSize = 32 << 10
const (
// The buffer size used to specify the buffer for IO streams copy
bufSize = 32 << 10
shimLogPluginBinary = "binary"
shimLogPluginFifo = "fifo"
shimLogPluginFile = "file"
)
var (
bufPool = sync.Pool{
@ -27,76 +33,84 @@ var (
}
)
type stdio struct {
Stdin string
Stdout string
Stderr string
Console bool
}
type IO interface {
io.Closer
Stdin() io.ReadCloser
Stdout() io.Writer
Stderr() io.Writer
}
type ttyIO struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
io IO
raw *stdio
}
func (tty *ttyIO) close() {
if tty.Stdin != nil {
tty.Stdin.Close()
tty.Stdin = nil
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
tty.io.Close()
}
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var in io.ReadCloser
var outw io.Writer
var errw io.Writer
// newTtyIO creates a new ttyIO struct.
// ns(namespace)/id(container ID) are used for containerd binary IO.
// containerd will pass the ns/id as ENV to the binary log driver,
// and the binary log driver will use ns/id to get the log options config file.
// for example nerdctl: https://github.com/containerd/nerdctl/blob/v0.21.0/pkg/logging/logging.go#L102
func newTtyIO(ctx context.Context, ns, id, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var err error
var io IO
if stdin != "" {
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
raw := &stdio{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Console: console,
}
if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
uri, err := url.Parse(stdout)
if err != nil {
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
}
if !console && stderr != "" {
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
if uri.Scheme == "" {
uri.Scheme = "fifo"
}
ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
switch uri.Scheme {
case shimLogPluginFifo:
io, err = newPipeIO(ctx, raw)
case shimLogPluginBinary:
io, err = newBinaryIO(ctx, ns, id, uri)
case shimLogPluginFile:
io, err = newFileIO(ctx, raw, uri)
default:
return nil, fmt.Errorf("unknown STDIO scheme %s", uri.Scheme)
}
return ttyIO, nil
if err != nil {
return nil, fmt.Errorf("failed to creat io stream: %w", err)
}
return &ttyIO{
io: io,
raw: raw,
}, nil
}
func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
if tty.Stdin != nil {
if tty.io.Stdin() != nil {
wg.Add(1)
go func() {
shimLog.Debug("stdin io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
io.CopyBuffer(stdinPipe, tty.io.Stdin(), *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
@ -104,30 +118,30 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
}()
}
if tty.Stdout != nil {
if tty.io.Stdout() != nil {
wg.Add(1)
go func() {
shimLog.Debug("stdout io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
io.CopyBuffer(tty.io.Stdout(), stdoutPipe, *p)
wg.Done()
if tty.Stdin != nil {
if tty.io.Stdin() != nil {
// close stdin to make the other routine stop
tty.Stdin.Close()
tty.io.Stdin().Close()
}
shimLog.Debug("stdout io stream copy exited")
}()
}
if tty.Stderr != nil && stderrPipe != nil {
if tty.io.Stderr() != nil && stderrPipe != nil {
wg.Add(1)
go func() {
shimLog.Debug("stderr io stream copy started")
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
io.CopyBuffer(tty.io.Stderr(), stderrPipe, *p)
wg.Done()
shimLog.Debug("stderr io stream copy exited")
}()
@ -138,3 +152,10 @@ func ioCopy(shimLog *logrus.Entry, exitch, stdinCloser chan struct{}, tty *ttyIO
close(exitch)
shimLog.Debug("all io stream copy goroutines exited")
}
func wc(w io.WriteCloser) error {
if w == nil {
return nil
}
return w.Close()
}

View File

@ -7,7 +7,6 @@ package containerdshim
import (
"context"
"github.com/sirupsen/logrus"
"io"
"os"
"path/filepath"
@ -15,6 +14,8 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/fifo"
"github.com/stretchr/testify/assert"
)
@ -45,7 +46,7 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
defer outr.Close()
errr = createReadFifo(stderr)
defer errr.Close()
tty, err = newTtyIO(ctx, "", stdout, stderr, false)
tty, err = newTtyIO(ctx, "", "", "", stdout, stderr, false)
assert.NoError(err)
defer tty.close()
@ -72,9 +73,9 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
}
}
checkFifoWrite(tty.Stdout)
checkFifoWrite(tty.io.Stdout())
checkFifoRead(outr)
checkFifoWrite(tty.Stderr)
checkFifoWrite(tty.io.Stderr())
checkFifoRead(errr)
err = outr.Close()
@ -84,8 +85,8 @@ func TestNewTtyIOFifoReopen(t *testing.T) {
// Make sure that writing to tty fifo will not get `EPIPE`
// when the read side is closed
checkFifoWrite(tty.Stdout)
checkFifoWrite(tty.Stderr)
checkFifoWrite(tty.io.Stdout())
checkFifoWrite(tty.io.Stderr())
// Reopen the fifo
outr = createReadFifo(stdout)
@ -171,7 +172,7 @@ func TestIoCopy(t *testing.T) {
defer srcInW.Close()
}
tty, err := newTtyIO(ctx, srcStdinPath, dstStdoutPath, dstStderrPath, false)
tty, err := newTtyIO(ctx, "", "", srcStdinPath, dstStdoutPath, dstStderrPath, false)
assert.NoError(err)
defer tty.close()

View File

@ -0,0 +1,57 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import "io"
// writeCloseInformer wraps a reader with a close function.
type wrapReadCloser struct {
reader *io.PipeReader
writer *io.PipeWriter
}
// NewWrapReadCloser creates a wrapReadCloser from a reader.
// NOTE(random-liu): To avoid goroutine leakage, the reader passed in
// must be eventually closed by the caller.
func NewWrapReadCloser(r io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
_, _ = io.Copy(pw, r)
pr.Close()
pw.Close()
}()
return &wrapReadCloser{
reader: pr,
writer: pw,
}
}
// Read reads up to len(p) bytes into p.
func (w *wrapReadCloser) Read(p []byte) (int, error) {
n, err := w.reader.Read(p)
if err == io.ErrClosedPipe {
return n, io.EOF
}
return n, err
}
// Close closes read closer.
func (w *wrapReadCloser) Close() error {
w.reader.Close()
w.writer.Close()
return nil
}

View File

@ -0,0 +1,102 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"io"
"sync"
)
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
// closed.
type writeCloseInformer struct {
close chan struct{}
wc io.WriteCloser
}
// NewWriteCloseInformer creates the writeCloseInformer from a write closer.
func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) {
close := make(chan struct{})
return &writeCloseInformer{
close: close,
wc: wc,
}, close
}
// Write passes through the data into the internal write closer.
func (w *writeCloseInformer) Write(p []byte) (int, error) {
return w.wc.Write(p)
}
// Close closes the internal write closer and inform the close channel.
func (w *writeCloseInformer) Close() error {
err := w.wc.Close()
close(w.close)
return err
}
// nopWriteCloser wraps passed in writer with a nop close function.
type nopWriteCloser struct {
w io.Writer
}
// NewNopWriteCloser creates the nopWriteCloser from a writer.
func NewNopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w: w}
}
// Write passes through the data into the internal writer.
func (n *nopWriteCloser) Write(p []byte) (int, error) {
return n.w.Write(p)
}
// Close is a nop close function.
func (n *nopWriteCloser) Close() error {
return nil
}
// serialWriteCloser wraps a write closer and makes sure all writes
// are done in serial.
// Parallel write won't intersect with each other. Use case:
// 1) Pipe: Write content longer than PIPE_BUF.
// See http://man7.org/linux/man-pages/man7/pipe.7.html
// 2) <3.14 Linux Kernel: write is not atomic
// See http://man7.org/linux/man-pages/man2/write.2.html
type serialWriteCloser struct {
mu sync.Mutex
wc io.WriteCloser
}
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
return &serialWriteCloser{wc: wc}
}
// Write writes a group of byte arrays in order atomically.
func (s *serialWriteCloser) Write(data []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Write(data)
}
// Close closes the write closer.
func (s *serialWriteCloser) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Close()
}

View File

@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"errors"
"io"
"sync"
)
// WriterGroup is a group of writers. Writer could be dynamically
// added and removed.
type WriterGroup struct {
mu sync.Mutex
writers map[string]io.WriteCloser
closed bool
}
var _ io.Writer = &WriterGroup{}
// NewWriterGroup creates an empty writer group.
func NewWriterGroup() *WriterGroup {
return &WriterGroup{
writers: make(map[string]io.WriteCloser),
}
}
// Add adds a writer into the group. The writer will be closed
// if the writer group is closed.
func (g *WriterGroup) Add(key string, w io.WriteCloser) {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
w.Close()
return
}
g.writers[key] = w
}
// Get gets a writer from the group, returns nil if the writer
// doesn't exist.
func (g *WriterGroup) Get(key string) io.WriteCloser {
g.mu.Lock()
defer g.mu.Unlock()
return g.writers[key]
}
// Remove removes a writer from the group.
func (g *WriterGroup) Remove(key string) {
g.mu.Lock()
defer g.mu.Unlock()
w, ok := g.writers[key]
if !ok {
return
}
w.Close()
delete(g.writers, key)
}
// Write writes data into each writer. If a writer returns error,
// it will be closed and removed from the writer group. It returns
// error if writer group is empty.
func (g *WriterGroup) Write(p []byte) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()
for k, w := range g.writers {
n, err := w.Write(p)
if err == nil && len(p) == n {
continue
}
// The writer is closed or in bad state, remove it.
w.Close()
delete(g.writers, k)
}
if len(g.writers) == 0 {
return 0, errors.New("writer group is empty")
}
return len(p), nil
}
// Close closes the writer group. Write will return error after
// closed.
func (g *WriterGroup) Close() {
g.mu.Lock()
defer g.mu.Unlock()
for _, w := range g.writers {
w.Close()
}
g.writers = nil
g.closed = true
}

View File

@ -79,6 +79,7 @@ github.com/containerd/containerd/mount
github.com/containerd/containerd/namespaces
github.com/containerd/containerd/pkg/cri/annotations
github.com/containerd/containerd/pkg/dialer
github.com/containerd/containerd/pkg/ioutil
github.com/containerd/containerd/pkg/runtimeoptions/v1
github.com/containerd/containerd/pkg/shutdown
github.com/containerd/containerd/pkg/ttrpcutil