mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-24 21:51:37 +00:00 
			
		
		
		
	vendors: upgrade the containerd vendors
kata shimv2 needs the commit of:
f05672357f,
thus upgrade it to the latest.
Signed-off-by: Fupan Li <lifupan@gmail.com>
			
			
This commit is contained in:
		
							
								
								
									
										4
									
								
								Gopkg.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										4
									
								
								Gopkg.lock
									
									
									
										generated
									
									
									
								
							| @@ -75,7 +75,7 @@ | ||||
|   revision = "0650fd9eeb50bab4fc99dceb9f2e14cf58f36e7f" | ||||
|  | ||||
| [[projects]] | ||||
|   digest = "1:d9120dea4d91818f1c859242cb96825faf6d375a4e0231263ecaec5905143757" | ||||
|   digest = "1:8787d42e39854db19dd09d79e39df2a9975c00d3560fa3fdfa180a89e36552a2" | ||||
|   name = "github.com/containerd/containerd" | ||||
|   packages = [ | ||||
|     "api/events", | ||||
| @@ -92,7 +92,7 @@ | ||||
|     "sys", | ||||
|   ] | ||||
|   pruneopts = "NUT" | ||||
|   revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5" | ||||
|   revision = "f05672357f56f26751a521175c5a96fc21fa8603" | ||||
|  | ||||
| [[projects]] | ||||
|   digest = "1:3d1a50e9f27c661df8c5552e7f2f6b9d2a8b641c65aeac7373f8a5c60d9f6856" | ||||
|   | ||||
| @@ -68,7 +68,7 @@ | ||||
|  | ||||
| [[constraint]] | ||||
|  name = "github.com/containerd/containerd" | ||||
|  revision = "29eab28b8e4e18231b6b2f077ab653c719d25dd5" | ||||
|  revision = "f05672357f56f26751a521175c5a96fc21fa8603" | ||||
|  | ||||
| [[override]] | ||||
|   branch = "master" | ||||
|   | ||||
							
								
								
									
										2
									
								
								vendor/github.com/containerd/containerd/errdefs/grpc.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/github.com/containerd/containerd/errdefs/grpc.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -95,7 +95,7 @@ func FromGRPC(err error) error { | ||||
|  | ||||
| 	msg := rebaseMessage(cls, err) | ||||
| 	if msg != "" { | ||||
| 		err = errors.Wrapf(cls, msg) | ||||
| 		err = errors.Wrap(cls, msg) | ||||
| 	} else { | ||||
| 		err = errors.WithStack(cls) | ||||
| 	} | ||||
|   | ||||
							
								
								
									
										2
									
								
								vendor/github.com/containerd/containerd/mount/mount_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/github.com/containerd/containerd/mount/mount_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -1,4 +1,4 @@ | ||||
| // +build darwin freebsd | ||||
| // +build darwin freebsd openbsd | ||||
|  | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|   | ||||
							
								
								
									
										61
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										61
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_freebsd.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -1,61 +0,0 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package mount | ||||
|  | ||||
| /* | ||||
| #include <sys/param.h> | ||||
| #include <sys/ucred.h> | ||||
| #include <sys/mount.h> | ||||
| */ | ||||
| import "C" | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"unsafe" | ||||
| ) | ||||
|  | ||||
| // Self retrieves a list of mounts for the current running process. | ||||
| func Self() ([]Info, error) { | ||||
| 	var rawEntries *C.struct_statfs | ||||
|  | ||||
| 	count := int(C.getmntinfo(&rawEntries, C.MNT_WAIT)) | ||||
| 	if count == 0 { | ||||
| 		return nil, fmt.Errorf("Failed to call getmntinfo") | ||||
| 	} | ||||
|  | ||||
| 	var entries []C.struct_statfs | ||||
| 	header := (*reflect.SliceHeader)(unsafe.Pointer(&entries)) | ||||
| 	header.Cap = count | ||||
| 	header.Len = count | ||||
| 	header.Data = uintptr(unsafe.Pointer(rawEntries)) | ||||
|  | ||||
| 	var out []Info | ||||
| 	for _, entry := range entries { | ||||
| 		var mountinfo Info | ||||
| 		mountinfo.Mountpoint = C.GoString(&entry.f_mntonname[0]) | ||||
| 		mountinfo.Source = C.GoString(&entry.f_mntfromname[0]) | ||||
| 		mountinfo.FSType = C.GoString(&entry.f_fstypename[0]) | ||||
| 		out = append(out, mountinfo) | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| // PID collects the mounts for a specific process ID. | ||||
| func PID(pid int) ([]Info, error) { | ||||
| 	return nil, fmt.Errorf("mountinfo.PID is not implemented on freebsd") | ||||
| } | ||||
							
								
								
									
										8
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_linux.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_linux.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -68,7 +68,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) { | ||||
| 		numFields := len(fields) | ||||
| 		if numFields < 10 { | ||||
| 			// should be at least 10 fields | ||||
| 			return nil, fmt.Errorf("Parsing '%s' failed: not enough fields (%d)", text, numFields) | ||||
| 			return nil, fmt.Errorf("parsing '%s' failed: not enough fields (%d)", text, numFields) | ||||
| 		} | ||||
| 		p := Info{} | ||||
| 		// ignore any numbers parsing errors, as there should not be any | ||||
| @@ -76,7 +76,7 @@ func parseInfoFile(r io.Reader) ([]Info, error) { | ||||
| 		p.Parent, _ = strconv.Atoi(fields[1]) | ||||
| 		mm := strings.Split(fields[2], ":") | ||||
| 		if len(mm) != 2 { | ||||
| 			return nil, fmt.Errorf("Parsing '%s' failed: unexpected minor:major pair %s", text, mm) | ||||
| 			return nil, fmt.Errorf("parsing '%s' failed: unexpected minor:major pair %s", text, mm) | ||||
| 		} | ||||
| 		p.Major, _ = strconv.Atoi(mm[0]) | ||||
| 		p.Minor, _ = strconv.Atoi(mm[1]) | ||||
| @@ -101,11 +101,11 @@ func parseInfoFile(r io.Reader) ([]Info, error) { | ||||
| 			} | ||||
| 		} | ||||
| 		if i == numFields { | ||||
| 			return nil, fmt.Errorf("Parsing '%s' failed: missing separator ('-')", text) | ||||
| 			return nil, fmt.Errorf("parsing '%s' failed: missing separator ('-')", text) | ||||
| 		} | ||||
| 		// There should be 3 fields after the separator... | ||||
| 		if i+4 > numFields { | ||||
| 			return nil, fmt.Errorf("Parsing '%s' failed: not enough fields after a separator", text) | ||||
| 			return nil, fmt.Errorf("parsing '%s' failed: not enough fields after a separator", text) | ||||
| 		} | ||||
| 		// ... but in Linux <= 3.9 mounting a cifs with spaces in a share name | ||||
| 		// (like "//serv/My Documents") _may_ end up having a space in the last field | ||||
|   | ||||
							
								
								
									
										2
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/github.com/containerd/containerd/mount/mountinfo_unsupported.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -1,4 +1,4 @@ | ||||
| // +build !linux,!freebsd,!solaris freebsd,!cgo solaris,!cgo | ||||
| // +build !linux,!freebsd,!solaris,!openbsd freebsd,!cgo solaris,!cgo openbsd,!cgo | ||||
|  | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|   | ||||
							
								
								
									
										3
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/reaper_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -31,7 +31,7 @@ import ( | ||||
| // ErrNoSuchProcess is returned when the process no longer exists | ||||
| var ErrNoSuchProcess = errors.New("no such process") | ||||
|  | ||||
| const bufferSize = 32 | ||||
| const bufferSize = 2048 | ||||
|  | ||||
| // Reap should be called when the process receives an SIGCHLD.  Reap will reap | ||||
| // all exited processes and close their wait channels | ||||
| @@ -47,7 +47,6 @@ func Reap() error { | ||||
| 				Status:    e.Status, | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| 	Default.Unlock() | ||||
| 	return err | ||||
|   | ||||
							
								
								
									
										43
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										43
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -53,11 +53,32 @@ type Shim interface { | ||||
| 	StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) | ||||
| } | ||||
|  | ||||
| // OptsKey is the context key for the Opts value. | ||||
| type OptsKey struct{} | ||||
|  | ||||
| // Opts are context options associated with the shim invocation. | ||||
| type Opts struct { | ||||
| 	BundlePath string | ||||
| 	Debug      bool | ||||
| } | ||||
|  | ||||
| // BinaryOpts allows the configuration of a shims binary setup | ||||
| type BinaryOpts func(*Config) | ||||
|  | ||||
| // Config of shim binary options provided by shim implementations | ||||
| type Config struct { | ||||
| 	// NoSubreaper disables setting the shim as a child subreaper | ||||
| 	NoSubreaper bool | ||||
| 	// NoReaper disables the shim binary from reaping any child process implicitly | ||||
| 	NoReaper bool | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	debugFlag            bool | ||||
| 	idFlag               string | ||||
| 	namespaceFlag        string | ||||
| 	socketFlag           string | ||||
| 	bundlePath           string | ||||
| 	addressFlag          string | ||||
| 	containerdBinaryFlag string | ||||
| 	action               string | ||||
| @@ -68,6 +89,7 @@ func parseFlags() { | ||||
| 	flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") | ||||
| 	flag.StringVar(&idFlag, "id", "", "id of the task") | ||||
| 	flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") | ||||
| 	flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") | ||||
|  | ||||
| 	flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") | ||||
| 	flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)") | ||||
| @@ -107,32 +129,40 @@ func setLogger(ctx context.Context, id string) error { | ||||
| } | ||||
|  | ||||
| // Run initializes and runs a shim server | ||||
| func Run(id string, initFunc Init) { | ||||
| 	if err := run(id, initFunc); err != nil { | ||||
| func Run(id string, initFunc Init, opts ...BinaryOpts) { | ||||
| 	var config Config | ||||
| 	for _, o := range opts { | ||||
| 		o(&config) | ||||
| 	} | ||||
| 	if err := run(id, initFunc, config); err != nil { | ||||
| 		fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func run(id string, initFunc Init) error { | ||||
| func run(id string, initFunc Init, config Config) error { | ||||
| 	parseFlags() | ||||
| 	setRuntime() | ||||
|  | ||||
| 	signals, err := setupSignals() | ||||
| 	signals, err := setupSignals(config) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := subreaper(); err != nil { | ||||
| 		return err | ||||
| 	if !config.NoSubreaper { | ||||
| 		if err := subreaper(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	publisher := &remoteEventsPublisher{ | ||||
| 		address:              addressFlag, | ||||
| 		containerdBinaryPath: containerdBinaryFlag, | ||||
| 		noReaper:             config.NoReaper, | ||||
| 	} | ||||
| 	if namespaceFlag == "" { | ||||
| 		return fmt.Errorf("shim namespace cannot be empty") | ||||
| 	} | ||||
| 	ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) | ||||
| 	ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) | ||||
| 	ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) | ||||
|  | ||||
| 	service, err := initFunc(ctx, idFlag, publisher) | ||||
| @@ -254,4 +284,5 @@ func dumpStacks(logger *logrus.Entry) { | ||||
| type remoteEventsPublisher struct { | ||||
| 	address              string | ||||
| 	containerdBinaryPath string | ||||
| 	noReaper             bool | ||||
| } | ||||
|   | ||||
							
								
								
									
										19
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										19
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -39,9 +39,13 @@ import ( | ||||
|  | ||||
| // setupSignals creates a new signal handler for all signals and sets the shim as a | ||||
| // sub-reaper so that the container processes are reparented | ||||
| func setupSignals() (chan os.Signal, error) { | ||||
| func setupSignals(config Config) (chan os.Signal, error) { | ||||
| 	signals := make(chan os.Signal, 32) | ||||
| 	signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE) | ||||
| 	smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE} | ||||
| 	if !config.NoReaper { | ||||
| 		smp = append(smp, unix.SIGCHLD) | ||||
| 	} | ||||
| 	signal.Notify(signals, smp...) | ||||
| 	return signals, nil | ||||
| } | ||||
|  | ||||
| @@ -87,7 +91,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { | ||||
| } | ||||
|  | ||||
| func openLog(ctx context.Context, _ string) (io.Writer, error) { | ||||
| 	return fifo.OpenFifo(context.Background(), "log", unix.O_WRONLY, 0700) | ||||
| 	return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) | ||||
| } | ||||
|  | ||||
| func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { | ||||
| @@ -102,6 +106,15 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event | ||||
| 	} | ||||
| 	cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns) | ||||
| 	cmd.Stdin = bytes.NewReader(data) | ||||
| 	if l.noReaper { | ||||
| 		if err := cmd.Start(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if err := cmd.Wait(); err != nil { | ||||
| 			return errors.Wrap(err, "failed to publish event") | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 	c, err := Default.Start(cmd) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|   | ||||
							
								
								
									
										162
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										162
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -40,7 +40,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| // setupSignals creates a new signal handler for all signals | ||||
| func setupSignals() (chan os.Signal, error) { | ||||
| func setupSignals(config Config) (chan os.Signal, error) { | ||||
| 	signals := make(chan os.Signal, 32) | ||||
| 	return signals, nil | ||||
| } | ||||
| @@ -119,21 +119,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| var _ = (io.WriterTo)(&blockingBuffer{}) | ||||
| var _ = (io.Writer)(&blockingBuffer{}) | ||||
|  | ||||
| // blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once | ||||
| // `capacity` is reached the calls to `Write` will block until a successful call | ||||
| // to `WriterTo` frees up the buffer space. | ||||
| // | ||||
| // Note: This has the same threadding semantics as bytes.Buffer with no | ||||
| // additional locking so multithreading is not supported. | ||||
| type blockingBuffer struct { | ||||
| 	c *sync.Cond | ||||
|  | ||||
| 	capacity int | ||||
|  | ||||
| 	buffer bytes.Buffer | ||||
| } | ||||
|  | ||||
| func newBlockingBuffer(capacity int) *blockingBuffer { | ||||
| 	return &blockingBuffer{ | ||||
| 		c:        sync.NewCond(&sync.Mutex{}), | ||||
| 		capacity: capacity, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (bb *blockingBuffer) Len() int { | ||||
| 	bb.c.L.Lock() | ||||
| 	defer bb.c.L.Unlock() | ||||
| 	return bb.buffer.Len() | ||||
| } | ||||
|  | ||||
| func (bb *blockingBuffer) Write(p []byte) (int, error) { | ||||
| 	if len(p) > bb.capacity { | ||||
| 		return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity) | ||||
| 	} | ||||
|  | ||||
| 	bb.c.L.Lock() | ||||
| 	for bb.buffer.Len()+len(p) > bb.capacity { | ||||
| 		bb.c.Wait() | ||||
| 	} | ||||
| 	defer bb.c.L.Unlock() | ||||
| 	return bb.buffer.Write(p) | ||||
| } | ||||
|  | ||||
| func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) { | ||||
| 	bb.c.L.Lock() | ||||
| 	defer bb.c.L.Unlock() | ||||
| 	defer bb.c.Signal() | ||||
| 	return bb.buffer.WriteTo(w) | ||||
| } | ||||
|  | ||||
| // deferredShimWriteLogger exists to solve the upstream loggin issue presented | ||||
| // by using Windows Named Pipes for logging. When containerd restarts it tries | ||||
| // to reconnect to any shims. This means that the connection to the logger will | ||||
| // be severed but when containerd starts up it should reconnect and start | ||||
| // logging again. We abstract all of this logic behind what looks like a simple | ||||
| // `io.Writer` that can reconnect in the lifetime and buffers logs while | ||||
| // disconnected. | ||||
| type deferredShimWriteLogger struct { | ||||
| 	mu sync.Mutex | ||||
|  | ||||
| 	ctx context.Context | ||||
|  | ||||
| 	wg sync.WaitGroup | ||||
| 	connected bool | ||||
| 	aborted   bool | ||||
|  | ||||
| 	buffer *blockingBuffer | ||||
|  | ||||
| 	l      net.Listener | ||||
| 	c      net.Conn | ||||
| 	conerr error | ||||
| } | ||||
|  | ||||
| // beginAccept issues an accept to wait for a connection. Once a connection | ||||
| // occurs drains any outstanding buffer. While draining the buffer any writes | ||||
| // are blocked. If the buffer fails to fully drain due to a connection drop a | ||||
| // call to `beginAccept` is re-issued waiting for another connection from | ||||
| // containerd. | ||||
| func (dswl *deferredShimWriteLogger) beginAccept() { | ||||
| 	dswl.mu.Lock() | ||||
| 	if dswl.connected { | ||||
| 		return | ||||
| 	} | ||||
| 	dswl.mu.Unlock() | ||||
|  | ||||
| 	c, err := dswl.l.Accept() | ||||
| 	if err == winio.ErrPipeListenerClosed { | ||||
| 		dswl.mu.Lock() | ||||
| 		dswl.aborted = true | ||||
| 		dswl.l.Close() | ||||
| 		dswl.conerr = errors.New("connection closed") | ||||
| 		dswl.mu.Unlock() | ||||
| 		return | ||||
| 	} | ||||
| 	dswl.mu.Lock() | ||||
| 	dswl.connected = true | ||||
| 	dswl.c = c | ||||
|  | ||||
| 	// Drain the buffer | ||||
| 	if dswl.buffer.Len() > 0 { | ||||
| 		_, err := dswl.buffer.WriteTo(dswl.c) | ||||
| 		if err != nil { | ||||
| 			// We lost our connection draining the buffer. | ||||
| 			dswl.connected = false | ||||
| 			dswl.c.Close() | ||||
| 			go dswl.beginAccept() | ||||
| 		} | ||||
| 	} | ||||
| 	dswl.mu.Unlock() | ||||
| } | ||||
|  | ||||
| func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { | ||||
| 	dswl.wg.Wait() | ||||
| 	if dswl.c == nil { | ||||
| 	dswl.mu.Lock() | ||||
| 	defer dswl.mu.Unlock() | ||||
|  | ||||
| 	if dswl.aborted { | ||||
| 		return 0, dswl.conerr | ||||
| 	} | ||||
| 	return dswl.c.Write(p) | ||||
|  | ||||
| 	if dswl.connected { | ||||
| 		// We have a connection. beginAccept would have drained the buffer so we just write our data to | ||||
| 		// the connection directly. | ||||
| 		written, err := dswl.c.Write(p) | ||||
| 		if err != nil { | ||||
| 			// We lost the connection. | ||||
| 			dswl.connected = false | ||||
| 			dswl.c.Close() | ||||
| 			go dswl.beginAccept() | ||||
|  | ||||
| 			// We weren't able to write the full `p` bytes. Buffer the rest | ||||
| 			if written != len(p) { | ||||
| 				w, err := dswl.buffer.Write(p[written:]) | ||||
| 				if err != nil { | ||||
| 					// We failed to buffer. Return this error | ||||
| 					return written + w, err | ||||
| 				} | ||||
| 				written += w | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return written, nil | ||||
| 	} | ||||
|  | ||||
| 	// We are disconnected. Buffer the contents. | ||||
| 	return dswl.buffer.Write(p) | ||||
| } | ||||
|  | ||||
| // openLog on Windows acts as the server of the log pipe. This allows the | ||||
| @@ -143,26 +272,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	dswl := &deferredShimWriteLogger{ | ||||
| 		ctx:    ctx, | ||||
| 		buffer: newBlockingBuffer(64 * 1024), // 64KB, | ||||
| 	} | ||||
| 	l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	dswl := &deferredShimWriteLogger{ | ||||
| 		ctx: ctx, | ||||
| 	} | ||||
| 	// TODO: JTERRY75 - this will not work with restarts. Only the first | ||||
| 	// connection will work and all +1 connections will return 'use of closed | ||||
| 	// network connection'. Make this reconnect aware. | ||||
| 	dswl.wg.Add(1) | ||||
| 	go func() { | ||||
| 		c, conerr := l.Accept() | ||||
| 		if conerr != nil { | ||||
| 			l.Close() | ||||
| 			dswl.conerr = conerr | ||||
| 		} | ||||
| 		dswl.c = c | ||||
| 		dswl.wg.Done() | ||||
| 	}() | ||||
| 	dswl.l = l | ||||
| 	go dswl.beginAccept() | ||||
| 	return dswl, nil | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										44
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										44
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -24,6 +24,7 @@ import ( | ||||
| 	"os/exec" | ||||
| 	"path/filepath" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| @@ -32,6 +33,8 @@ import ( | ||||
|  | ||||
| const shimBinaryFormat = "containerd-shim-%s-%s" | ||||
|  | ||||
| var runtimePaths sync.Map | ||||
|  | ||||
| // Command returns the shim command with the provided args and configuration | ||||
| func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) { | ||||
| 	ns, err := namespaces.NamespaceRequired(ctx) | ||||
| @@ -49,19 +52,33 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr | ||||
| 	} | ||||
| 	args = append(args, cmdArgs...) | ||||
| 	name := BinaryName(runtime) | ||||
| 	if name == "" { | ||||
| 		return nil, fmt.Errorf("invalid runtime name %s, correct runtime name should format like io.containerd.runc.v1", runtime) | ||||
| 	} | ||||
|  | ||||
| 	var cmdPath string | ||||
| 	var lerr error | ||||
| 	if cmdPath, lerr = exec.LookPath(name); lerr != nil { | ||||
| 		if eerr, ok := lerr.(*exec.Error); ok { | ||||
| 			if eerr.Err == exec.ErrNotFound { | ||||
| 				return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name) | ||||
| 	cmdPathI, cmdPathFound := runtimePaths.Load(name) | ||||
| 	if cmdPathFound { | ||||
| 		cmdPath = cmdPathI.(string) | ||||
| 	} else { | ||||
| 		var lerr error | ||||
| 		if cmdPath, lerr = exec.LookPath(name); lerr != nil { | ||||
| 			if eerr, ok := lerr.(*exec.Error); ok { | ||||
| 				if eerr.Err == exec.ErrNotFound { | ||||
| 					return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		cmdPath, err = filepath.Abs(cmdPath) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		if cmdPathI, cmdPathFound = runtimePaths.LoadOrStore(name, cmdPath); cmdPathFound { | ||||
| 			// We didn't store cmdPath we loaded an already cached value. Use it. | ||||
| 			cmdPath = cmdPathI.(string) | ||||
| 		} | ||||
| 	} | ||||
| 	cmdPath, err = filepath.Abs(cmdPath) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	cmd := exec.Command(cmdPath, args...) | ||||
| 	cmd.Dir = path | ||||
| 	cmd.Env = append(os.Environ(), "GOMAXPROCS=2") | ||||
| @@ -69,10 +86,15 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr | ||||
| 	return cmd, nil | ||||
| } | ||||
|  | ||||
| // BinaryName returns the shim binary name from the runtime name | ||||
| // BinaryName returns the shim binary name from the runtime name, | ||||
| // empty string returns means runtime name is invalid | ||||
| func BinaryName(runtime string) string { | ||||
| 	// runtime name should format like $prefix.name.version | ||||
| 	parts := strings.Split(runtime, ".") | ||||
| 	// TODO: add validation for runtime | ||||
| 	if len(parts) < 2 { | ||||
| 		return "" | ||||
| 	} | ||||
|  | ||||
| 	return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1]) | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										12
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										12
									
								
								vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) { | ||||
| func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { | ||||
| 	var c net.Conn | ||||
| 	var lastError error | ||||
| 	timedOutError := errors.Errorf("timed out waiting for npipe %s", address) | ||||
| 	start := time.Now() | ||||
| 	for { | ||||
| 		remaining := timeout - time.Now().Sub(start) | ||||
| 		if remaining <= 0 { | ||||
| 			lastError = errors.Errorf("timed out waiting for npipe %s", address) | ||||
| 			lastError = timedOutError | ||||
| 			break | ||||
| 		} | ||||
| 		c, lastError = winio.DialPipe(address, &remaining) | ||||
| @@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { | ||||
| 		if !os.IsNotExist(lastError) { | ||||
| 			break | ||||
| 		} | ||||
| 		// There is nobody serving the pipe. We limit the timeout for this case | ||||
| 		// to 5 seconds because any shim that would serve this endpoint should | ||||
| 		// serve it within 5 seconds. We use the passed in timeout for the | ||||
| 		// `DialPipe` timeout if the pipe exists however to give the pipe time | ||||
| 		// to `Accept` the connection. | ||||
| 		if time.Now().Sub(start) >= 5*time.Second { | ||||
| 			lastError = timedOutError | ||||
| 			break | ||||
| 		} | ||||
| 		time.Sleep(10 * time.Millisecond) | ||||
| 	} | ||||
| 	return c, lastError | ||||
|   | ||||
							
								
								
									
										2
									
								
								vendor/github.com/containerd/containerd/sys/socket_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/github.com/containerd/containerd/sys/socket_unix.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -42,7 +42,7 @@ func CreateUnixSocket(path string) (net.Listener, error) { | ||||
| 	return net.Listen("unix", path) | ||||
| } | ||||
|  | ||||
| // GetLocalListener returns a listerner out of a unix socket. | ||||
| // GetLocalListener returns a listener out of a unix socket. | ||||
| func GetLocalListener(path string, uid, gid int) (net.Listener, error) { | ||||
| 	// Ensure parent directory is created | ||||
| 	if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user