mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-03 02:26:37 +00:00
vendors: upgrade the containerd vendors
kata shimv2 needs the commit of:
f05672357f
,
thus upgrade it to the latest.
Signed-off-by: Fupan Li <lifupan@gmail.com>
This commit is contained in:
parent
4cc94b6063
commit
e4a3fd5565
4
Gopkg.lock
generated
4
Gopkg.lock
generated
@ -75,7 +75,7 @@
|
|||||||
revision = "0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f"
|
revision = "0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:d9120dea4d91818f1c859242cb96825faf6d375a4e0231263ecaec5905143757"
|
digest = "1:8787d42e39854db19dd09d79e39df2a9975c00d3560fa3fdfa180a89e36552a2"
|
||||||
name = "github.com/containerd/containerd"
|
name = "github.com/containerd/containerd"
|
||||||
packages = [
|
packages = [
|
||||||
"api/events",
|
"api/events",
|
||||||
@ -92,7 +92,7 @@
|
|||||||
"sys",
|
"sys",
|
||||||
]
|
]
|
||||||
pruneopts = "NUT"
|
pruneopts = "NUT"
|
||||||
revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5"
|
revision = "f05672357f56f26751a521175c5a96fc21fa8603"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:3d1a50e9f27c661df8c5552e7f2f6b9d2a8b641c65aeac7373f8a5c60d9f6856"
|
digest = "1:3d1a50e9f27c661df8c5552e7f2f6b9d2a8b641c65aeac7373f8a5c60d9f6856"
|
||||||
|
@ -68,7 +68,7 @@
|
|||||||
|
|
||||||
[[constraint]]
|
[[constraint]]
|
||||||
name = "github.com/containerd/containerd"
|
name = "github.com/containerd/containerd"
|
||||||
revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5"
|
revision = "f05672357f56f26751a521175c5a96fc21fa8603"
|
||||||
|
|
||||||
[[override]]
|
[[override]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
|
2
vendor/github.com/containerd/containerd/errdefs/grpc.go
generated
vendored
2
vendor/github.com/containerd/containerd/errdefs/grpc.go
generated
vendored
@ -95,7 +95,7 @@ func FromGRPC(err error) error {
|
|||||||
|
|
||||||
msg := rebaseMessage(cls, err)
|
msg := rebaseMessage(cls, err)
|
||||||
if msg != "" {
|
if msg != "" {
|
||||||
err = errors.Wrapf(cls, msg)
|
err = errors.Wrap(cls, msg)
|
||||||
} else {
|
} else {
|
||||||
err = errors.WithStack(cls)
|
err = errors.WithStack(cls)
|
||||||
}
|
}
|
||||||
|
2
vendor/github.com/containerd/containerd/mount/mount_unix.go
generated
vendored
2
vendor/github.com/containerd/containerd/mount/mount_unix.go
generated
vendored
@ -1,4 +1,4 @@
|
|||||||
// +build darwin freebsd
|
// +build darwin freebsd openbsd
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Copyright The containerd Authors.
|
Copyright The containerd Authors.
|
||||||
|
61
vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go
generated
vendored
61
vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go
generated
vendored
@ -1,61 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright The containerd Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package mount
|
|
||||||
|
|
||||||
/*
|
|
||||||
#include <sys/param.h>
|
|
||||||
#include <sys/ucred.h>
|
|
||||||
#include <sys/mount.h>
|
|
||||||
*/
|
|
||||||
import "C"
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"unsafe"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Self retrieves a list of mounts for the current running process.
|
|
||||||
func Self() ([]Info, error) {
|
|
||||||
var rawEntries *C.struct_statfs
|
|
||||||
|
|
||||||
count := int(C.getmntinfo(&rawEntries, C.MNT_WAIT))
|
|
||||||
if count == 0 {
|
|
||||||
return nil, fmt.Errorf("Failed to call getmntinfo")
|
|
||||||
}
|
|
||||||
|
|
||||||
var entries []C.struct_statfs
|
|
||||||
header := (*reflect.SliceHeader)(unsafe.Pointer(&entries))
|
|
||||||
header.Cap = count
|
|
||||||
header.Len = count
|
|
||||||
header.Data = uintptr(unsafe.Pointer(rawEntries))
|
|
||||||
|
|
||||||
var out []Info
|
|
||||||
for _, entry := range entries {
|
|
||||||
var mountinfo Info
|
|
||||||
mountinfo.Mountpoint = C.GoString(&entry.f_mntonname[0])
|
|
||||||
mountinfo.Source = C.GoString(&entry.f_mntfromname[0])
|
|
||||||
mountinfo.FSType = C.GoString(&entry.f_fstypename[0])
|
|
||||||
out = append(out, mountinfo)
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PID collects the mounts for a specific process ID.
|
|
||||||
func PID(pid int) ([]Info, error) {
|
|
||||||
return nil, fmt.Errorf("mountinfo.PID is not implemented on freebsd")
|
|
||||||
}
|
|
8
vendor/github.com/containerd/containerd/mount/mountinfo_linux.go
generated
vendored
8
vendor/github.com/containerd/containerd/mount/mountinfo_linux.go
generated
vendored
@ -68,7 +68,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
|
|||||||
numFields := len(fields)
|
numFields := len(fields)
|
||||||
if numFields < 10 {
|
if numFields < 10 {
|
||||||
// should be at least 10 fields
|
// should be at least 10 fields
|
||||||
return nil, fmt.Errorf("Parsing '%s' failed: not enough fields (%d)", text, numFields)
|
return nil, fmt.Errorf("parsing '%s' failed: not enough fields (%d)", text, numFields)
|
||||||
}
|
}
|
||||||
p := Info{}
|
p := Info{}
|
||||||
// ignore any numbers parsing errors, as there should not be any
|
// ignore any numbers parsing errors, as there should not be any
|
||||||
@ -76,7 +76,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
|
|||||||
p.Parent, _ = strconv.Atoi(fields[1])
|
p.Parent, _ = strconv.Atoi(fields[1])
|
||||||
mm := strings.Split(fields[2], ":")
|
mm := strings.Split(fields[2], ":")
|
||||||
if len(mm) != 2 {
|
if len(mm) != 2 {
|
||||||
return nil, fmt.Errorf("Parsing '%s' failed: unexpected minor:major pair %s", text, mm)
|
return nil, fmt.Errorf("parsing '%s' failed: unexpected minor:major pair %s", text, mm)
|
||||||
}
|
}
|
||||||
p.Major, _ = strconv.Atoi(mm[0])
|
p.Major, _ = strconv.Atoi(mm[0])
|
||||||
p.Minor, _ = strconv.Atoi(mm[1])
|
p.Minor, _ = strconv.Atoi(mm[1])
|
||||||
@ -101,11 +101,11 @@ func parseInfoFile(r io.Reader) ([]Info, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if i == numFields {
|
if i == numFields {
|
||||||
return nil, fmt.Errorf("Parsing '%s' failed: missing separator ('-')", text)
|
return nil, fmt.Errorf("parsing '%s' failed: missing separator ('-')", text)
|
||||||
}
|
}
|
||||||
// There should be 3 fields after the separator...
|
// There should be 3 fields after the separator...
|
||||||
if i+4 > numFields {
|
if i+4 > numFields {
|
||||||
return nil, fmt.Errorf("Parsing '%s' failed: not enough fields after a separator", text)
|
return nil, fmt.Errorf("parsing '%s' failed: not enough fields after a separator", text)
|
||||||
}
|
}
|
||||||
// ... but in Linux <= 3.9 mounting a cifs with spaces in a share name
|
// ... but in Linux <= 3.9 mounting a cifs with spaces in a share name
|
||||||
// (like "//serv/My Documents") _may_ end up having a space in the last field
|
// (like "//serv/My Documents") _may_ end up having a space in the last field
|
||||||
|
2
vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go
generated
vendored
2
vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go
generated
vendored
@ -1,4 +1,4 @@
|
|||||||
// +build !linux,!freebsd,!solaris freebsd,!cgo solaris,!cgo
|
// +build !linux,!freebsd,!solaris,!openbsd freebsd,!cgo solaris,!cgo openbsd,!cgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Copyright The containerd Authors.
|
Copyright The containerd Authors.
|
||||||
|
3
vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go
generated
vendored
3
vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go
generated
vendored
@ -31,7 +31,7 @@ import (
|
|||||||
// ErrNoSuchProcess is returned when the process no longer exists
|
// ErrNoSuchProcess is returned when the process no longer exists
|
||||||
var ErrNoSuchProcess = errors.New("no such process")
|
var ErrNoSuchProcess = errors.New("no such process")
|
||||||
|
|
||||||
const bufferSize = 32
|
const bufferSize = 2048
|
||||||
|
|
||||||
// Reap should be called when the process receives an SIGCHLD. Reap will reap
|
// Reap should be called when the process receives an SIGCHLD. Reap will reap
|
||||||
// all exited processes and close their wait channels
|
// all exited processes and close their wait channels
|
||||||
@ -47,7 +47,6 @@ func Reap() error {
|
|||||||
Status: e.Status,
|
Status: e.Status,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Default.Unlock()
|
Default.Unlock()
|
||||||
return err
|
return err
|
||||||
|
43
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
43
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
@ -53,11 +53,32 @@ type Shim interface {
|
|||||||
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
|
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OptsKey is the context key for the Opts value.
|
||||||
|
type OptsKey struct{}
|
||||||
|
|
||||||
|
// Opts are context options associated with the shim invocation.
|
||||||
|
type Opts struct {
|
||||||
|
BundlePath string
|
||||||
|
Debug bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// BinaryOpts allows the configuration of a shims binary setup
|
||||||
|
type BinaryOpts func(*Config)
|
||||||
|
|
||||||
|
// Config of shim binary options provided by shim implementations
|
||||||
|
type Config struct {
|
||||||
|
// NoSubreaper disables setting the shim as a child subreaper
|
||||||
|
NoSubreaper bool
|
||||||
|
// NoReaper disables the shim binary from reaping any child process implicitly
|
||||||
|
NoReaper bool
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
debugFlag bool
|
debugFlag bool
|
||||||
idFlag string
|
idFlag string
|
||||||
namespaceFlag string
|
namespaceFlag string
|
||||||
socketFlag string
|
socketFlag string
|
||||||
|
bundlePath string
|
||||||
addressFlag string
|
addressFlag string
|
||||||
containerdBinaryFlag string
|
containerdBinaryFlag string
|
||||||
action string
|
action string
|
||||||
@ -68,6 +89,7 @@ func parseFlags() {
|
|||||||
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
||||||
flag.StringVar(&idFlag, "id", "", "id of the task")
|
flag.StringVar(&idFlag, "id", "", "id of the task")
|
||||||
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
|
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
|
||||||
|
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
|
||||||
|
|
||||||
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
||||||
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
|
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
|
||||||
@ -107,32 +129,40 @@ func setLogger(ctx context.Context, id string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run initializes and runs a shim server
|
// Run initializes and runs a shim server
|
||||||
func Run(id string, initFunc Init) {
|
func Run(id string, initFunc Init, opts ...BinaryOpts) {
|
||||||
if err := run(id, initFunc); err != nil {
|
var config Config
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&config)
|
||||||
|
}
|
||||||
|
if err := run(id, initFunc, config); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
|
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(id string, initFunc Init) error {
|
func run(id string, initFunc Init, config Config) error {
|
||||||
parseFlags()
|
parseFlags()
|
||||||
setRuntime()
|
setRuntime()
|
||||||
|
|
||||||
signals, err := setupSignals()
|
signals, err := setupSignals(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := subreaper(); err != nil {
|
if !config.NoSubreaper {
|
||||||
return err
|
if err := subreaper(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
publisher := &remoteEventsPublisher{
|
publisher := &remoteEventsPublisher{
|
||||||
address: addressFlag,
|
address: addressFlag,
|
||||||
containerdBinaryPath: containerdBinaryFlag,
|
containerdBinaryPath: containerdBinaryFlag,
|
||||||
|
noReaper: config.NoReaper,
|
||||||
}
|
}
|
||||||
if namespaceFlag == "" {
|
if namespaceFlag == "" {
|
||||||
return fmt.Errorf("shim namespace cannot be empty")
|
return fmt.Errorf("shim namespace cannot be empty")
|
||||||
}
|
}
|
||||||
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
|
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
|
||||||
|
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
||||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
|
||||||
|
|
||||||
service, err := initFunc(ctx, idFlag, publisher)
|
service, err := initFunc(ctx, idFlag, publisher)
|
||||||
@ -254,4 +284,5 @@ func dumpStacks(logger *logrus.Entry) {
|
|||||||
type remoteEventsPublisher struct {
|
type remoteEventsPublisher struct {
|
||||||
address string
|
address string
|
||||||
containerdBinaryPath string
|
containerdBinaryPath string
|
||||||
|
noReaper bool
|
||||||
}
|
}
|
||||||
|
19
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
19
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
@ -39,9 +39,13 @@ import (
|
|||||||
|
|
||||||
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
||||||
// sub-reaper so that the container processes are reparented
|
// sub-reaper so that the container processes are reparented
|
||||||
func setupSignals() (chan os.Signal, error) {
|
func setupSignals(config Config) (chan os.Signal, error) {
|
||||||
signals := make(chan os.Signal, 32)
|
signals := make(chan os.Signal, 32)
|
||||||
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
|
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
|
||||||
|
if !config.NoReaper {
|
||||||
|
smp = append(smp, unix.SIGCHLD)
|
||||||
|
}
|
||||||
|
signal.Notify(signals, smp...)
|
||||||
return signals, nil
|
return signals, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,7 +91,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
||||||
return fifo.OpenFifo(context.Background(), "log", unix.O_WRONLY, 0700)
|
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||||
@ -102,6 +106,15 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
}
|
}
|
||||||
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
|
||||||
cmd.Stdin = bytes.NewReader(data)
|
cmd.Stdin = bytes.NewReader(data)
|
||||||
|
if l.noReaper {
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
return errors.Wrap(err, "failed to publish event")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
c, err := Default.Start(cmd)
|
c, err := Default.Start(cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
162
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
162
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
@ -40,7 +40,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// setupSignals creates a new signal handler for all signals
|
// setupSignals creates a new signal handler for all signals
|
||||||
func setupSignals() (chan os.Signal, error) {
|
func setupSignals(config Config) (chan os.Signal, error) {
|
||||||
signals := make(chan os.Signal, 32)
|
signals := make(chan os.Signal, 32)
|
||||||
return signals, nil
|
return signals, nil
|
||||||
}
|
}
|
||||||
@ -119,21 +119,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ = (io.WriterTo)(&blockingBuffer{})
|
||||||
|
var _ = (io.Writer)(&blockingBuffer{})
|
||||||
|
|
||||||
|
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
|
||||||
|
// `capacity` is reached the calls to `Write` will block until a successful call
|
||||||
|
// to `WriterTo` frees up the buffer space.
|
||||||
|
//
|
||||||
|
// Note: This has the same threadding semantics as bytes.Buffer with no
|
||||||
|
// additional locking so multithreading is not supported.
|
||||||
|
type blockingBuffer struct {
|
||||||
|
c *sync.Cond
|
||||||
|
|
||||||
|
capacity int
|
||||||
|
|
||||||
|
buffer bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlockingBuffer(capacity int) *blockingBuffer {
|
||||||
|
return &blockingBuffer{
|
||||||
|
c: sync.NewCond(&sync.Mutex{}),
|
||||||
|
capacity: capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb *blockingBuffer) Len() int {
|
||||||
|
bb.c.L.Lock()
|
||||||
|
defer bb.c.L.Unlock()
|
||||||
|
return bb.buffer.Len()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb *blockingBuffer) Write(p []byte) (int, error) {
|
||||||
|
if len(p) > bb.capacity {
|
||||||
|
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
|
||||||
|
}
|
||||||
|
|
||||||
|
bb.c.L.Lock()
|
||||||
|
for bb.buffer.Len()+len(p) > bb.capacity {
|
||||||
|
bb.c.Wait()
|
||||||
|
}
|
||||||
|
defer bb.c.L.Unlock()
|
||||||
|
return bb.buffer.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
|
||||||
|
bb.c.L.Lock()
|
||||||
|
defer bb.c.L.Unlock()
|
||||||
|
defer bb.c.Signal()
|
||||||
|
return bb.buffer.WriteTo(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
|
||||||
|
// by using Windows Named Pipes for logging. When containerd restarts it tries
|
||||||
|
// to reconnect to any shims. This means that the connection to the logger will
|
||||||
|
// be severed but when containerd starts up it should reconnect and start
|
||||||
|
// logging again. We abstract all of this logic behind what looks like a simple
|
||||||
|
// `io.Writer` that can reconnect in the lifetime and buffers logs while
|
||||||
|
// disconnected.
|
||||||
type deferredShimWriteLogger struct {
|
type deferredShimWriteLogger struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
wg sync.WaitGroup
|
connected bool
|
||||||
|
aborted bool
|
||||||
|
|
||||||
|
buffer *blockingBuffer
|
||||||
|
|
||||||
|
l net.Listener
|
||||||
c net.Conn
|
c net.Conn
|
||||||
conerr error
|
conerr error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// beginAccept issues an accept to wait for a connection. Once a connection
|
||||||
|
// occurs drains any outstanding buffer. While draining the buffer any writes
|
||||||
|
// are blocked. If the buffer fails to fully drain due to a connection drop a
|
||||||
|
// call to `beginAccept` is re-issued waiting for another connection from
|
||||||
|
// containerd.
|
||||||
|
func (dswl *deferredShimWriteLogger) beginAccept() {
|
||||||
|
dswl.mu.Lock()
|
||||||
|
if dswl.connected {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dswl.mu.Unlock()
|
||||||
|
|
||||||
|
c, err := dswl.l.Accept()
|
||||||
|
if err == winio.ErrPipeListenerClosed {
|
||||||
|
dswl.mu.Lock()
|
||||||
|
dswl.aborted = true
|
||||||
|
dswl.l.Close()
|
||||||
|
dswl.conerr = errors.New("connection closed")
|
||||||
|
dswl.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dswl.mu.Lock()
|
||||||
|
dswl.connected = true
|
||||||
|
dswl.c = c
|
||||||
|
|
||||||
|
// Drain the buffer
|
||||||
|
if dswl.buffer.Len() > 0 {
|
||||||
|
_, err := dswl.buffer.WriteTo(dswl.c)
|
||||||
|
if err != nil {
|
||||||
|
// We lost our connection draining the buffer.
|
||||||
|
dswl.connected = false
|
||||||
|
dswl.c.Close()
|
||||||
|
go dswl.beginAccept()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dswl.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
|
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
|
||||||
dswl.wg.Wait()
|
dswl.mu.Lock()
|
||||||
if dswl.c == nil {
|
defer dswl.mu.Unlock()
|
||||||
|
|
||||||
|
if dswl.aborted {
|
||||||
return 0, dswl.conerr
|
return 0, dswl.conerr
|
||||||
}
|
}
|
||||||
return dswl.c.Write(p)
|
|
||||||
|
if dswl.connected {
|
||||||
|
// We have a connection. beginAccept would have drained the buffer so we just write our data to
|
||||||
|
// the connection directly.
|
||||||
|
written, err := dswl.c.Write(p)
|
||||||
|
if err != nil {
|
||||||
|
// We lost the connection.
|
||||||
|
dswl.connected = false
|
||||||
|
dswl.c.Close()
|
||||||
|
go dswl.beginAccept()
|
||||||
|
|
||||||
|
// We weren't able to write the full `p` bytes. Buffer the rest
|
||||||
|
if written != len(p) {
|
||||||
|
w, err := dswl.buffer.Write(p[written:])
|
||||||
|
if err != nil {
|
||||||
|
// We failed to buffer. Return this error
|
||||||
|
return written + w, err
|
||||||
|
}
|
||||||
|
written += w
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We are disconnected. Buffer the contents.
|
||||||
|
return dswl.buffer.Write(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
// openLog on Windows acts as the server of the log pipe. This allows the
|
// openLog on Windows acts as the server of the log pipe. This allows the
|
||||||
@ -143,26 +272,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dswl := &deferredShimWriteLogger{
|
||||||
|
ctx: ctx,
|
||||||
|
buffer: newBlockingBuffer(64 * 1024), // 64KB,
|
||||||
|
}
|
||||||
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
|
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
dswl := &deferredShimWriteLogger{
|
dswl.l = l
|
||||||
ctx: ctx,
|
go dswl.beginAccept()
|
||||||
}
|
|
||||||
// TODO: JTERRY75 - this will not work with restarts. Only the first
|
|
||||||
// connection will work and all +1 connections will return 'use of closed
|
|
||||||
// network connection'. Make this reconnect aware.
|
|
||||||
dswl.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
c, conerr := l.Accept()
|
|
||||||
if conerr != nil {
|
|
||||||
l.Close()
|
|
||||||
dswl.conerr = conerr
|
|
||||||
}
|
|
||||||
dswl.c = c
|
|
||||||
dswl.wg.Done()
|
|
||||||
}()
|
|
||||||
return dswl, nil
|
return dswl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
44
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
44
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
@ -24,6 +24,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
@ -32,6 +33,8 @@ import (
|
|||||||
|
|
||||||
const shimBinaryFormat = "containerd-shim-%s-%s"
|
const shimBinaryFormat = "containerd-shim-%s-%s"
|
||||||
|
|
||||||
|
var runtimePaths sync.Map
|
||||||
|
|
||||||
// Command returns the shim command with the provided args and configuration
|
// Command returns the shim command with the provided args and configuration
|
||||||
func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) {
|
func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) {
|
||||||
ns, err := namespaces.NamespaceRequired(ctx)
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
@ -49,19 +52,33 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
|
|||||||
}
|
}
|
||||||
args = append(args, cmdArgs...)
|
args = append(args, cmdArgs...)
|
||||||
name := BinaryName(runtime)
|
name := BinaryName(runtime)
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("invalid runtime name %s, correct runtime name should format like io.containerd.runc.v1", runtime)
|
||||||
|
}
|
||||||
|
|
||||||
var cmdPath string
|
var cmdPath string
|
||||||
var lerr error
|
cmdPathI, cmdPathFound := runtimePaths.Load(name)
|
||||||
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
|
if cmdPathFound {
|
||||||
if eerr, ok := lerr.(*exec.Error); ok {
|
cmdPath = cmdPathI.(string)
|
||||||
if eerr.Err == exec.ErrNotFound {
|
} else {
|
||||||
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
|
var lerr error
|
||||||
|
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
|
||||||
|
if eerr, ok := lerr.(*exec.Error); ok {
|
||||||
|
if eerr.Err == exec.ErrNotFound {
|
||||||
|
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cmdPath, err = filepath.Abs(cmdPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if cmdPathI, cmdPathFound = runtimePaths.LoadOrStore(name, cmdPath); cmdPathFound {
|
||||||
|
// We didn't store cmdPath we loaded an already cached value. Use it.
|
||||||
|
cmdPath = cmdPathI.(string)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cmdPath, err = filepath.Abs(cmdPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd := exec.Command(cmdPath, args...)
|
cmd := exec.Command(cmdPath, args...)
|
||||||
cmd.Dir = path
|
cmd.Dir = path
|
||||||
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
|
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
|
||||||
@ -69,10 +86,15 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
|
|||||||
return cmd, nil
|
return cmd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BinaryName returns the shim binary name from the runtime name
|
// BinaryName returns the shim binary name from the runtime name,
|
||||||
|
// empty string returns means runtime name is invalid
|
||||||
func BinaryName(runtime string) string {
|
func BinaryName(runtime string) string {
|
||||||
|
// runtime name should format like $prefix.name.version
|
||||||
parts := strings.Split(runtime, ".")
|
parts := strings.Split(runtime, ".")
|
||||||
// TODO: add validation for runtime
|
if len(parts) < 2 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
|
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
12
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
12
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) {
|
|||||||
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||||
var c net.Conn
|
var c net.Conn
|
||||||
var lastError error
|
var lastError error
|
||||||
|
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
for {
|
for {
|
||||||
remaining := timeout - time.Now().Sub(start)
|
remaining := timeout - time.Now().Sub(start)
|
||||||
if remaining <= 0 {
|
if remaining <= 0 {
|
||||||
lastError = errors.Errorf("timed out waiting for npipe %s", address)
|
lastError = timedOutError
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
c, lastError = winio.DialPipe(address, &remaining)
|
c, lastError = winio.DialPipe(address, &remaining)
|
||||||
@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
|||||||
if !os.IsNotExist(lastError) {
|
if !os.IsNotExist(lastError) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
// There is nobody serving the pipe. We limit the timeout for this case
|
||||||
|
// to 5 seconds because any shim that would serve this endpoint should
|
||||||
|
// serve it within 5 seconds. We use the passed in timeout for the
|
||||||
|
// `DialPipe` timeout if the pipe exists however to give the pipe time
|
||||||
|
// to `Accept` the connection.
|
||||||
|
if time.Now().Sub(start) >= 5*time.Second {
|
||||||
|
lastError = timedOutError
|
||||||
|
break
|
||||||
|
}
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
return c, lastError
|
return c, lastError
|
||||||
|
2
vendor/github.com/containerd/containerd/sys/socket_unix.go
generated
vendored
2
vendor/github.com/containerd/containerd/sys/socket_unix.go
generated
vendored
@ -42,7 +42,7 @@ func CreateUnixSocket(path string) (net.Listener, error) {
|
|||||||
return net.Listen("unix", path)
|
return net.Listen("unix", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLocalListener returns a listerner out of a unix socket.
|
// GetLocalListener returns a listener out of a unix socket.
|
||||||
func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
|
func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
|
||||||
// Ensure parent directory is created
|
// Ensure parent directory is created
|
||||||
if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {
|
if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user