vendors: upgrade the containerd vendors

kata shimv2 needs the commit of:
f05672357f,
thus upgrade it to the latest.

Signed-off-by: Fupan Li <lifupan@gmail.com>
This commit is contained in:
Fupan Li 2018-12-11 02:50:36 +00:00
parent 4cc94b6063
commit e4a3fd5565
14 changed files with 250 additions and 116 deletions

4
Gopkg.lock generated
View File

@ -75,7 +75,7 @@
revision = "0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f"
[[projects]]
digest = "1:d9120dea4d91818f1c859242cb96825faf6d375a4e0231263ecaec5905143757"
digest = "1:8787d42e39854db19dd09d79e39df2a9975c00d3560fa3fdfa180a89e36552a2"
name = "github.com/containerd/containerd"
packages = [
"api/events",
@ -92,7 +92,7 @@
"sys",
]
pruneopts = "NUT"
revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5"
revision = "f05672357f56f26751a521175c5a96fc21fa8603"
[[projects]]
digest = "1:3d1a50e9f27c661df8c5552e7f2f6b9d2a8b641c65aeac7373f8a5c60d9f6856"

View File

@ -68,7 +68,7 @@
[[constraint]]
name = "github.com/containerd/containerd"
revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5"
revision = "f05672357f56f26751a521175c5a96fc21fa8603"
[[override]]
branch = "master"

View File

@ -95,7 +95,7 @@ func FromGRPC(err error) error {
msg := rebaseMessage(cls, err)
if msg != "" {
err = errors.Wrapf(cls, msg)
err = errors.Wrap(cls, msg)
} else {
err = errors.WithStack(cls)
}

View File

@ -1,4 +1,4 @@
// +build darwin freebsd
// +build darwin freebsd openbsd
/*
Copyright The containerd Authors.

View File

@ -1,61 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mount
/*
#include <sys/param.h>
#include <sys/ucred.h>
#include <sys/mount.h>
*/
import "C"
import (
"fmt"
"reflect"
"unsafe"
)
// Self retrieves a list of mounts for the current running process.
func Self() ([]Info, error) {
var rawEntries *C.struct_statfs
count := int(C.getmntinfo(&rawEntries, C.MNT_WAIT))
if count == 0 {
return nil, fmt.Errorf("Failed to call getmntinfo")
}
var entries []C.struct_statfs
header := (*reflect.SliceHeader)(unsafe.Pointer(&entries))
header.Cap = count
header.Len = count
header.Data = uintptr(unsafe.Pointer(rawEntries))
var out []Info
for _, entry := range entries {
var mountinfo Info
mountinfo.Mountpoint = C.GoString(&entry.f_mntonname[0])
mountinfo.Source = C.GoString(&entry.f_mntfromname[0])
mountinfo.FSType = C.GoString(&entry.f_fstypename[0])
out = append(out, mountinfo)
}
return out, nil
}
// PID collects the mounts for a specific process ID.
func PID(pid int) ([]Info, error) {
return nil, fmt.Errorf("mountinfo.PID is not implemented on freebsd")
}

View File

@ -68,7 +68,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
numFields := len(fields)
if numFields < 10 {
// should be at least 10 fields
return nil, fmt.Errorf("Parsing '%s' failed: not enough fields (%d)", text, numFields)
return nil, fmt.Errorf("parsing '%s' failed: not enough fields (%d)", text, numFields)
}
p := Info{}
// ignore any numbers parsing errors, as there should not be any
@ -76,7 +76,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
p.Parent, _ = strconv.Atoi(fields[1])
mm := strings.Split(fields[2], ":")
if len(mm) != 2 {
return nil, fmt.Errorf("Parsing '%s' failed: unexpected minor:major pair %s", text, mm)
return nil, fmt.Errorf("parsing '%s' failed: unexpected minor:major pair %s", text, mm)
}
p.Major, _ = strconv.Atoi(mm[0])
p.Minor, _ = strconv.Atoi(mm[1])
@ -101,11 +101,11 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
}
}
if i == numFields {
return nil, fmt.Errorf("Parsing '%s' failed: missing separator ('-')", text)
return nil, fmt.Errorf("parsing '%s' failed: missing separator ('-')", text)
}
// There should be 3 fields after the separator...
if i+4 > numFields {
return nil, fmt.Errorf("Parsing '%s' failed: not enough fields after a separator", text)
return nil, fmt.Errorf("parsing '%s' failed: not enough fields after a separator", text)
}
// ... but in Linux <= 3.9 mounting a cifs with spaces in a share name
// (like "//serv/My Documents") _may_ end up having a space in the last field

View File

@ -1,4 +1,4 @@
// +build !linux,!freebsd,!solaris freebsd,!cgo solaris,!cgo
// +build !linux,!freebsd,!solaris,!openbsd freebsd,!cgo solaris,!cgo openbsd,!cgo
/*
Copyright The containerd Authors.

View File

@ -31,7 +31,7 @@ import (
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 32
const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
@ -47,7 +47,6 @@ func Reap() error {
Status: e.Status,
}
}
}
Default.Unlock()
return err

View File

@ -53,11 +53,32 @@ type Shim interface {
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
}
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
BundlePath string
Debug bool
}
// BinaryOpts allows the configuration of a shims binary setup
type BinaryOpts func(*Config)
// Config of shim binary options provided by shim implementations
type Config struct {
// NoSubreaper disables setting the shim as a child subreaper
NoSubreaper bool
// NoReaper disables the shim binary from reaping any child process implicitly
NoReaper bool
}
var (
debugFlag bool
idFlag string
namespaceFlag string
socketFlag string
bundlePath string
addressFlag string
containerdBinaryFlag string
action string
@ -68,6 +89,7 @@ func parseFlags() {
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
@ -107,32 +129,40 @@ func setLogger(ctx context.Context, id string) error {
}
// Run initializes and runs a shim server
func Run(id string, initFunc Init) {
if err := run(id, initFunc); err != nil {
func Run(id string, initFunc Init, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
os.Exit(1)
}
}
func run(id string, initFunc Init) error {
func run(id string, initFunc Init, config Config) error {
parseFlags()
setRuntime()
signals, err := setupSignals()
signals, err := setupSignals(config)
if err != nil {
return err
}
if err := subreaper(); err != nil {
return err
if !config.NoSubreaper {
if err := subreaper(); err != nil {
return err
}
}
publisher := &remoteEventsPublisher{
address: addressFlag,
containerdBinaryPath: containerdBinaryFlag,
noReaper: config.NoReaper,
}
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
service, err := initFunc(ctx, idFlag, publisher)
@ -254,4 +284,5 @@ func dumpStacks(logger *logrus.Entry) {
type remoteEventsPublisher struct {
address string
containerdBinaryPath string
noReaper bool
}

View File

@ -39,9 +39,13 @@ import (
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
if !config.NoReaper {
smp = append(smp, unix.SIGCHLD)
}
signal.Notify(signals, smp...)
return signals, nil
}
@ -87,7 +91,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
}
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(context.Background(), "log", unix.O_WRONLY, 0700)
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
@ -102,6 +106,15 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
if l.noReaper {
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return errors.Wrap(err, "failed to publish event")
}
return nil
}
c, err := Default.Start(cmd)
if err != nil {
return err

View File

@ -40,7 +40,7 @@ import (
)
// setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) {
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
return signals, nil
}
@ -119,21 +119,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
}
}
var _ = (io.WriterTo)(&blockingBuffer{})
var _ = (io.Writer)(&blockingBuffer{})
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
// `capacity` is reached the calls to `Write` will block until a successful call
// to `WriterTo` frees up the buffer space.
//
// Note: This has the same threadding semantics as bytes.Buffer with no
// additional locking so multithreading is not supported.
type blockingBuffer struct {
c *sync.Cond
capacity int
buffer bytes.Buffer
}
func newBlockingBuffer(capacity int) *blockingBuffer {
return &blockingBuffer{
c: sync.NewCond(&sync.Mutex{}),
capacity: capacity,
}
}
func (bb *blockingBuffer) Len() int {
bb.c.L.Lock()
defer bb.c.L.Unlock()
return bb.buffer.Len()
}
func (bb *blockingBuffer) Write(p []byte) (int, error) {
if len(p) > bb.capacity {
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
}
bb.c.L.Lock()
for bb.buffer.Len()+len(p) > bb.capacity {
bb.c.Wait()
}
defer bb.c.L.Unlock()
return bb.buffer.Write(p)
}
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
bb.c.L.Lock()
defer bb.c.L.Unlock()
defer bb.c.Signal()
return bb.buffer.WriteTo(w)
}
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the lifetime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context
wg sync.WaitGroup
connected bool
aborted bool
buffer *blockingBuffer
l net.Listener
c net.Conn
conerr error
}
// beginAccept issues an accept to wait for a connection. Once a connection
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == winio.ErrPipeListenerClosed {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.wg.Wait()
if dswl.c == nil {
dswl.mu.Lock()
defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr
}
return dswl.c.Write(p)
if dswl.connected {
// We have a connection. beginAccept would have drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
}
// openLog on Windows acts as the server of the log pipe. This allows the
@ -143,26 +272,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
buffer: newBlockingBuffer(64 * 1024), // 64KB,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
}
// TODO: JTERRY75 - this will not work with restarts. Only the first
// connection will work and all +1 connections will return 'use of closed
// network connection'. Make this reconnect aware.
dswl.wg.Add(1)
go func() {
c, conerr := l.Accept()
if conerr != nil {
l.Close()
dswl.conerr = conerr
}
dswl.c = c
dswl.wg.Done()
}()
dswl.l = l
go dswl.beginAccept()
return dswl, nil
}

View File

@ -24,6 +24,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/containerd/containerd/namespaces"
@ -32,6 +33,8 @@ import (
const shimBinaryFormat = "containerd-shim-%s-%s"
var runtimePaths sync.Map
// Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
@ -49,19 +52,33 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
}
args = append(args, cmdArgs...)
name := BinaryName(runtime)
if name == "" {
return nil, fmt.Errorf("invalid runtime name %s, correct runtime name should format like io.containerd.runc.v1", runtime)
}
var cmdPath string
var lerr error
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
if eerr, ok := lerr.(*exec.Error); ok {
if eerr.Err == exec.ErrNotFound {
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
cmdPathI, cmdPathFound := runtimePaths.Load(name)
if cmdPathFound {
cmdPath = cmdPathI.(string)
} else {
var lerr error
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
if eerr, ok := lerr.(*exec.Error); ok {
if eerr.Err == exec.ErrNotFound {
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
}
}
}
cmdPath, err = filepath.Abs(cmdPath)
if err != nil {
return nil, err
}
if cmdPathI, cmdPathFound = runtimePaths.LoadOrStore(name, cmdPath); cmdPathFound {
// We didn't store cmdPath we loaded an already cached value. Use it.
cmdPath = cmdPathI.(string)
}
}
cmdPath, err = filepath.Abs(cmdPath)
if err != nil {
return nil, err
}
cmd := exec.Command(cmdPath, args...)
cmd.Dir = path
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
@ -69,10 +86,15 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
return cmd, nil
}
// BinaryName returns the shim binary name from the runtime name
// BinaryName returns the shim binary name from the runtime name,
// empty string returns means runtime name is invalid
func BinaryName(runtime string) string {
// runtime name should format like $prefix.name.version
parts := strings.Split(runtime, ".")
// TODO: add validation for runtime
if len(parts) < 2 {
return ""
}
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
}

View File

@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) {
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Now().Sub(start)
if remaining <= 0 {
lastError = errors.Errorf("timed out waiting for npipe %s", address)
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Now().Sub(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
return c, lastError

View File

@ -42,7 +42,7 @@ func CreateUnixSocket(path string) (net.Listener, error) {
return net.Listen("unix", path)
}
// GetLocalListener returns a listerner out of a unix socket.
// GetLocalListener returns a listener out of a unix socket.
func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
// Ensure parent directory is created
if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {