diff --git a/Gopkg.lock b/Gopkg.lock index 39f9671e2..c4435e17c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -75,7 +75,7 @@ revision = "0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f" [[projects]] - digest = "1:d9120dea4d91818f1c859242cb96825faf6d375a4e0231263ecaec5905143757" + digest = "1:8787d42e39854db19dd09d79e39df2a9975c00d3560fa3fdfa180a89e36552a2" name = "github.com/containerd/containerd" packages = [ "api/events", @@ -92,7 +92,7 @@ "sys", ] pruneopts = "NUT" - revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5" + revision = "f05672357f56f26751a521175c5a96fc21fa8603" [[projects]] digest = "1:3d1a50e9f27c661df8c5552e7f2f6b9d2a8b641c65aeac7373f8a5c60d9f6856" diff --git a/Gopkg.toml b/Gopkg.toml index 6a0c1f338..d67321f31 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -68,7 +68,7 @@ [[constraint]] name = "github.com/containerd/containerd" - revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5" + revision = "f05672357f56f26751a521175c5a96fc21fa8603" [[override]] branch = "master" diff --git a/vendor/github.com/containerd/containerd/errdefs/grpc.go b/vendor/github.com/containerd/containerd/errdefs/grpc.go index 4eab03ab8..b1542f13d 100644 --- a/vendor/github.com/containerd/containerd/errdefs/grpc.go +++ b/vendor/github.com/containerd/containerd/errdefs/grpc.go @@ -95,7 +95,7 @@ func FromGRPC(err error) error { msg := rebaseMessage(cls, err) if msg != "" { - err = errors.Wrapf(cls, msg) + err = errors.Wrap(cls, msg) } else { err = errors.WithStack(cls) } diff --git a/vendor/github.com/containerd/containerd/mount/mount_unix.go b/vendor/github.com/containerd/containerd/mount/mount_unix.go index 6741293f8..95da9428e 100644 --- a/vendor/github.com/containerd/containerd/mount/mount_unix.go +++ b/vendor/github.com/containerd/containerd/mount/mount_unix.go @@ -1,4 +1,4 @@ -// +build darwin freebsd +// +build darwin freebsd openbsd /* Copyright The containerd Authors. diff --git a/vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go b/vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go deleted file mode 100644 index bbe79767e..000000000 --- a/vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package mount - -/* -#include -#include -#include -*/ -import "C" - -import ( - "fmt" - "reflect" - "unsafe" -) - -// Self retrieves a list of mounts for the current running process. -func Self() ([]Info, error) { - var rawEntries *C.struct_statfs - - count := int(C.getmntinfo(&rawEntries, C.MNT_WAIT)) - if count == 0 { - return nil, fmt.Errorf("Failed to call getmntinfo") - } - - var entries []C.struct_statfs - header := (*reflect.SliceHeader)(unsafe.Pointer(&entries)) - header.Cap = count - header.Len = count - header.Data = uintptr(unsafe.Pointer(rawEntries)) - - var out []Info - for _, entry := range entries { - var mountinfo Info - mountinfo.Mountpoint = C.GoString(&entry.f_mntonname[0]) - mountinfo.Source = C.GoString(&entry.f_mntfromname[0]) - mountinfo.FSType = C.GoString(&entry.f_fstypename[0]) - out = append(out, mountinfo) - } - return out, nil -} - -// PID collects the mounts for a specific process ID. -func PID(pid int) ([]Info, error) { - return nil, fmt.Errorf("mountinfo.PID is not implemented on freebsd") -} diff --git a/vendor/github.com/containerd/containerd/mount/mountinfo_linux.go b/vendor/github.com/containerd/containerd/mount/mountinfo_linux.go index a986f8f4e..369d045d7 100644 --- a/vendor/github.com/containerd/containerd/mount/mountinfo_linux.go +++ b/vendor/github.com/containerd/containerd/mount/mountinfo_linux.go @@ -68,7 +68,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) { numFields := len(fields) if numFields < 10 { // should be at least 10 fields - return nil, fmt.Errorf("Parsing '%s' failed: not enough fields (%d)", text, numFields) + return nil, fmt.Errorf("parsing '%s' failed: not enough fields (%d)", text, numFields) } p := Info{} // ignore any numbers parsing errors, as there should not be any @@ -76,7 +76,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) { p.Parent, _ = strconv.Atoi(fields[1]) mm := strings.Split(fields[2], ":") if len(mm) != 2 { - return nil, fmt.Errorf("Parsing '%s' failed: unexpected minor:major pair %s", text, mm) + return nil, fmt.Errorf("parsing '%s' failed: unexpected minor:major pair %s", text, mm) } p.Major, _ = strconv.Atoi(mm[0]) p.Minor, _ = strconv.Atoi(mm[1]) @@ -101,11 +101,11 @@ func parseInfoFile(r io.Reader) ([]Info, error) { } } if i == numFields { - return nil, fmt.Errorf("Parsing '%s' failed: missing separator ('-')", text) + return nil, fmt.Errorf("parsing '%s' failed: missing separator ('-')", text) } // There should be 3 fields after the separator... if i+4 > numFields { - return nil, fmt.Errorf("Parsing '%s' failed: not enough fields after a separator", text) + return nil, fmt.Errorf("parsing '%s' failed: not enough fields after a separator", text) } // ... but in Linux <= 3.9 mounting a cifs with spaces in a share name // (like "//serv/My Documents") _may_ end up having a space in the last field diff --git a/vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go b/vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go index eba602f1a..ae998db6b 100644 --- a/vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go +++ b/vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go @@ -1,4 +1,4 @@ -// +build !linux,!freebsd,!solaris freebsd,!cgo solaris,!cgo +// +build !linux,!freebsd,!solaris,!openbsd freebsd,!cgo solaris,!cgo openbsd,!cgo /* Copyright The containerd Authors. diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go index 2937f1a9e..45a88db12 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go @@ -31,7 +31,7 @@ import ( // ErrNoSuchProcess is returned when the process no longer exists var ErrNoSuchProcess = errors.New("no such process") -const bufferSize = 32 +const bufferSize = 2048 // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels @@ -47,7 +47,6 @@ func Reap() error { Status: e.Status, } } - } Default.Unlock() return err diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go index da2a2e887..d60d49663 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go @@ -53,11 +53,32 @@ type Shim interface { StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) } +// OptsKey is the context key for the Opts value. +type OptsKey struct{} + +// Opts are context options associated with the shim invocation. +type Opts struct { + BundlePath string + Debug bool +} + +// BinaryOpts allows the configuration of a shims binary setup +type BinaryOpts func(*Config) + +// Config of shim binary options provided by shim implementations +type Config struct { + // NoSubreaper disables setting the shim as a child subreaper + NoSubreaper bool + // NoReaper disables the shim binary from reaping any child process implicitly + NoReaper bool +} + var ( debugFlag bool idFlag string namespaceFlag string socketFlag string + bundlePath string addressFlag string containerdBinaryFlag string action string @@ -68,6 +89,7 @@ func parseFlags() { flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") flag.StringVar(&idFlag, "id", "", "id of the task") flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") + flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)") @@ -107,32 +129,40 @@ func setLogger(ctx context.Context, id string) error { } // Run initializes and runs a shim server -func Run(id string, initFunc Init) { - if err := run(id, initFunc); err != nil { +func Run(id string, initFunc Init, opts ...BinaryOpts) { + var config Config + for _, o := range opts { + o(&config) + } + if err := run(id, initFunc, config); err != nil { fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) os.Exit(1) } } -func run(id string, initFunc Init) error { +func run(id string, initFunc Init, config Config) error { parseFlags() setRuntime() - signals, err := setupSignals() + signals, err := setupSignals(config) if err != nil { return err } - if err := subreaper(); err != nil { - return err + if !config.NoSubreaper { + if err := subreaper(); err != nil { + return err + } } publisher := &remoteEventsPublisher{ address: addressFlag, containerdBinaryPath: containerdBinaryFlag, + noReaper: config.NoReaper, } if namespaceFlag == "" { return fmt.Errorf("shim namespace cannot be empty") } ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) + ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) service, err := initFunc(ctx, idFlag, publisher) @@ -254,4 +284,5 @@ func dumpStacks(logger *logrus.Entry) { type remoteEventsPublisher struct { address string containerdBinaryPath string + noReaper bool } diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go index 6f1ef6e28..08668dc1c 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go @@ -39,9 +39,13 @@ import ( // setupSignals creates a new signal handler for all signals and sets the shim as a // sub-reaper so that the container processes are reparented -func setupSignals() (chan os.Signal, error) { +func setupSignals(config Config) (chan os.Signal, error) { signals := make(chan os.Signal, 32) - signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE) + smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE} + if !config.NoReaper { + smp = append(smp, unix.SIGCHLD) + } + signal.Notify(signals, smp...) return signals, nil } @@ -87,7 +91,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } func openLog(ctx context.Context, _ string) (io.Writer, error) { - return fifo.OpenFifo(context.Background(), "log", unix.O_WRONLY, 0700) + return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) } func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { @@ -102,6 +106,15 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event } cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns) cmd.Stdin = bytes.NewReader(data) + if l.noReaper { + if err := cmd.Start(); err != nil { + return err + } + if err := cmd.Wait(); err != nil { + return errors.Wrap(err, "failed to publish event") + } + return nil + } c, err := Default.Start(cmd) if err != nil { return err diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go index 4e94e7b5d..3bac959db 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go @@ -40,7 +40,7 @@ import ( ) // setupSignals creates a new signal handler for all signals -func setupSignals() (chan os.Signal, error) { +func setupSignals(config Config) (chan os.Signal, error) { signals := make(chan os.Signal, 32) return signals, nil } @@ -119,21 +119,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } } +var _ = (io.WriterTo)(&blockingBuffer{}) +var _ = (io.Writer)(&blockingBuffer{}) + +// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once +// `capacity` is reached the calls to `Write` will block until a successful call +// to `WriterTo` frees up the buffer space. +// +// Note: This has the same threadding semantics as bytes.Buffer with no +// additional locking so multithreading is not supported. +type blockingBuffer struct { + c *sync.Cond + + capacity int + + buffer bytes.Buffer +} + +func newBlockingBuffer(capacity int) *blockingBuffer { + return &blockingBuffer{ + c: sync.NewCond(&sync.Mutex{}), + capacity: capacity, + } +} + +func (bb *blockingBuffer) Len() int { + bb.c.L.Lock() + defer bb.c.L.Unlock() + return bb.buffer.Len() +} + +func (bb *blockingBuffer) Write(p []byte) (int, error) { + if len(p) > bb.capacity { + return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity) + } + + bb.c.L.Lock() + for bb.buffer.Len()+len(p) > bb.capacity { + bb.c.Wait() + } + defer bb.c.L.Unlock() + return bb.buffer.Write(p) +} + +func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) { + bb.c.L.Lock() + defer bb.c.L.Unlock() + defer bb.c.Signal() + return bb.buffer.WriteTo(w) +} + +// deferredShimWriteLogger exists to solve the upstream loggin issue presented +// by using Windows Named Pipes for logging. When containerd restarts it tries +// to reconnect to any shims. This means that the connection to the logger will +// be severed but when containerd starts up it should reconnect and start +// logging again. We abstract all of this logic behind what looks like a simple +// `io.Writer` that can reconnect in the lifetime and buffers logs while +// disconnected. type deferredShimWriteLogger struct { + mu sync.Mutex + ctx context.Context - wg sync.WaitGroup + connected bool + aborted bool + buffer *blockingBuffer + + l net.Listener c net.Conn conerr error } +// beginAccept issues an accept to wait for a connection. Once a connection +// occurs drains any outstanding buffer. While draining the buffer any writes +// are blocked. If the buffer fails to fully drain due to a connection drop a +// call to `beginAccept` is re-issued waiting for another connection from +// containerd. +func (dswl *deferredShimWriteLogger) beginAccept() { + dswl.mu.Lock() + if dswl.connected { + return + } + dswl.mu.Unlock() + + c, err := dswl.l.Accept() + if err == winio.ErrPipeListenerClosed { + dswl.mu.Lock() + dswl.aborted = true + dswl.l.Close() + dswl.conerr = errors.New("connection closed") + dswl.mu.Unlock() + return + } + dswl.mu.Lock() + dswl.connected = true + dswl.c = c + + // Drain the buffer + if dswl.buffer.Len() > 0 { + _, err := dswl.buffer.WriteTo(dswl.c) + if err != nil { + // We lost our connection draining the buffer. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + } + } + dswl.mu.Unlock() +} + func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { - dswl.wg.Wait() - if dswl.c == nil { + dswl.mu.Lock() + defer dswl.mu.Unlock() + + if dswl.aborted { return 0, dswl.conerr } - return dswl.c.Write(p) + + if dswl.connected { + // We have a connection. beginAccept would have drained the buffer so we just write our data to + // the connection directly. + written, err := dswl.c.Write(p) + if err != nil { + // We lost the connection. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + + // We weren't able to write the full `p` bytes. Buffer the rest + if written != len(p) { + w, err := dswl.buffer.Write(p[written:]) + if err != nil { + // We failed to buffer. Return this error + return written + w, err + } + written += w + } + } + + return written, nil + } + + // We are disconnected. Buffer the contents. + return dswl.buffer.Write(p) } // openLog on Windows acts as the server of the log pipe. This allows the @@ -143,26 +272,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { if err != nil { return nil, err } + + dswl := &deferredShimWriteLogger{ + ctx: ctx, + buffer: newBlockingBuffer(64 * 1024), // 64KB, + } l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) if err != nil { return nil, err } - dswl := &deferredShimWriteLogger{ - ctx: ctx, - } - // TODO: JTERRY75 - this will not work with restarts. Only the first - // connection will work and all +1 connections will return 'use of closed - // network connection'. Make this reconnect aware. - dswl.wg.Add(1) - go func() { - c, conerr := l.Accept() - if conerr != nil { - l.Close() - dswl.conerr = conerr - } - dswl.c = c - dswl.wg.Done() - }() + dswl.l = l + go dswl.beginAccept() return dswl, nil } diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/util.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/util.go index ca028a683..77afaa1f3 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/util.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/util.go @@ -24,6 +24,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "time" "github.com/containerd/containerd/namespaces" @@ -32,6 +33,8 @@ import ( const shimBinaryFormat = "containerd-shim-%s-%s" +var runtimePaths sync.Map + // Command returns the shim command with the provided args and configuration func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) { ns, err := namespaces.NamespaceRequired(ctx) @@ -49,19 +52,33 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr } args = append(args, cmdArgs...) name := BinaryName(runtime) + if name == "" { + return nil, fmt.Errorf("invalid runtime name %s, correct runtime name should format like io.containerd.runc.v1", runtime) + } + var cmdPath string - var lerr error - if cmdPath, lerr = exec.LookPath(name); lerr != nil { - if eerr, ok := lerr.(*exec.Error); ok { - if eerr.Err == exec.ErrNotFound { - return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name) + cmdPathI, cmdPathFound := runtimePaths.Load(name) + if cmdPathFound { + cmdPath = cmdPathI.(string) + } else { + var lerr error + if cmdPath, lerr = exec.LookPath(name); lerr != nil { + if eerr, ok := lerr.(*exec.Error); ok { + if eerr.Err == exec.ErrNotFound { + return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name) + } } } + cmdPath, err = filepath.Abs(cmdPath) + if err != nil { + return nil, err + } + if cmdPathI, cmdPathFound = runtimePaths.LoadOrStore(name, cmdPath); cmdPathFound { + // We didn't store cmdPath we loaded an already cached value. Use it. + cmdPath = cmdPathI.(string) + } } - cmdPath, err = filepath.Abs(cmdPath) - if err != nil { - return nil, err - } + cmd := exec.Command(cmdPath, args...) cmd.Dir = path cmd.Env = append(os.Environ(), "GOMAXPROCS=2") @@ -69,10 +86,15 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr return cmd, nil } -// BinaryName returns the shim binary name from the runtime name +// BinaryName returns the shim binary name from the runtime name, +// empty string returns means runtime name is invalid func BinaryName(runtime string) string { + // runtime name should format like $prefix.name.version parts := strings.Split(runtime, ".") - // TODO: add validation for runtime + if len(parts) < 2 { + return "" + } + return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1]) } diff --git a/vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go b/vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go index be9e661ae..986fc754b 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go @@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) { func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { var c net.Conn var lastError error + timedOutError := errors.Errorf("timed out waiting for npipe %s", address) start := time.Now() for { remaining := timeout - time.Now().Sub(start) if remaining <= 0 { - lastError = errors.Errorf("timed out waiting for npipe %s", address) + lastError = timedOutError break } c, lastError = winio.DialPipe(address, &remaining) @@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { if !os.IsNotExist(lastError) { break } + // There is nobody serving the pipe. We limit the timeout for this case + // to 5 seconds because any shim that would serve this endpoint should + // serve it within 5 seconds. We use the passed in timeout for the + // `DialPipe` timeout if the pipe exists however to give the pipe time + // to `Accept` the connection. + if time.Now().Sub(start) >= 5*time.Second { + lastError = timedOutError + break + } time.Sleep(10 * time.Millisecond) } return c, lastError diff --git a/vendor/github.com/containerd/containerd/sys/socket_unix.go b/vendor/github.com/containerd/containerd/sys/socket_unix.go index 0dbca0e33..90fa55c48 100644 --- a/vendor/github.com/containerd/containerd/sys/socket_unix.go +++ b/vendor/github.com/containerd/containerd/sys/socket_unix.go @@ -42,7 +42,7 @@ func CreateUnixSocket(path string) (net.Listener, error) { return net.Listen("unix", path) } -// GetLocalListener returns a listerner out of a unix socket. +// GetLocalListener returns a listener out of a unix socket. func GetLocalListener(path string, uid, gid int) (net.Listener, error) { // Ensure parent directory is created if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {