mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Merge pull request #110075 from luckerby/104584-retry-dial-on-socket-windows-base
Retry Unix domain sockets on Windows nodes for the plugin registration mechanism
This commit is contained in:
		@@ -29,11 +29,19 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/Microsoft/go-winio"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	tcpProtocol   = "tcp"
 | 
			
		||||
	npipeProtocol = "npipe"
 | 
			
		||||
	// Amount of time to wait between attempting to use a Unix domain socket.
 | 
			
		||||
	// As detailed in https://github.com/kubernetes/kubernetes/issues/104584
 | 
			
		||||
	// the first attempt will most likely fail, hence the need to retry
 | 
			
		||||
	socketDialRetryPeriod = 1 * time.Second
 | 
			
		||||
	// Overall timeout value to dial a Unix domain socket, including retries
 | 
			
		||||
	socketDialTimeout = 4 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// CreateListener creates a listener on the specified endpoint.
 | 
			
		||||
@@ -127,6 +135,8 @@ func GetBootTime() (time.Time, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file
 | 
			
		||||
// Note that due to the retry logic inside, it could take up to 4 seconds
 | 
			
		||||
// to determine whether or not the file path supplied is a Unix domain socket
 | 
			
		||||
func IsUnixDomainSocket(filePath string) (bool, error) {
 | 
			
		||||
	// Due to the absence of golang support for os.ModeSocket in Windows (https://github.com/golang/go/issues/33357)
 | 
			
		||||
	// we need to dial the file and check if we receive an error to determine if a file is Unix Domain Socket file.
 | 
			
		||||
@@ -136,12 +146,35 @@ func IsUnixDomainSocket(filePath string) (bool, error) {
 | 
			
		||||
	// does NOT work in 1809 if the socket file is created within a bind mounted directory by a container
 | 
			
		||||
	// and the FSCTL is issued in the host by the kubelet.
 | 
			
		||||
 | 
			
		||||
	c, err := net.Dial("unix", filePath)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		c.Close()
 | 
			
		||||
		return true, nil
 | 
			
		||||
	klog.V(6).InfoS("Function IsUnixDomainSocket starts", "filePath", filePath)
 | 
			
		||||
	// As detailed in https://github.com/kubernetes/kubernetes/issues/104584 we cannot rely
 | 
			
		||||
	// on the Unix Domain socket working on the very first try, hence the potential need to
 | 
			
		||||
	// dial multiple times
 | 
			
		||||
	var lastSocketErr error
 | 
			
		||||
	err := wait.PollImmediate(socketDialRetryPeriod, socketDialTimeout,
 | 
			
		||||
		func() (bool, error) {
 | 
			
		||||
			klog.V(6).InfoS("Dialing the socket", "filePath", filePath)
 | 
			
		||||
			var c net.Conn
 | 
			
		||||
			c, lastSocketErr = net.Dial("unix", filePath)
 | 
			
		||||
			if lastSocketErr == nil {
 | 
			
		||||
				c.Close()
 | 
			
		||||
				klog.V(6).InfoS("Socket dialed successfully", "filePath", filePath)
 | 
			
		||||
				return true, nil
 | 
			
		||||
			}
 | 
			
		||||
			klog.V(6).InfoS("Failed the current attempt to dial the socket, so pausing before retry",
 | 
			
		||||
				"filePath", filePath, "err", lastSocketErr, "socketDialRetryPeriod",
 | 
			
		||||
				socketDialRetryPeriod)
 | 
			
		||||
			return false, nil
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	// PollImmediate will return "timed out waiting for the condition" if the function it
 | 
			
		||||
	// invokes never returns true
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		klog.V(2).InfoS("Failed all attempts to dial the socket so marking it as a non-Unix Domain socket. Last socket error along with the error from PollImmediate follow",
 | 
			
		||||
			"filePath", filePath, "lastSocketErr", lastSocketErr, "err", err)
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NormalizePath converts FS paths returned by certain go frameworks (like fsnotify)
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,7 @@ import (
 | 
			
		||||
	"os"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -232,11 +233,50 @@ func testUnixDomainSocket(t *testing.T, label string) {
 | 
			
		||||
	assert.True(t, result, "Unexpected result: false from IsUnixDomainSocket: %v for %s", result, label)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This is required as on Windows it's possible for the socket file backing a Unix domain socket to
 | 
			
		||||
// exist but not be ready for socket communications yet as per
 | 
			
		||||
// https://github.com/kubernetes/kubernetes/issues/104584
 | 
			
		||||
func testPendingUnixDomainSocket(t *testing.T, label string) {
 | 
			
		||||
	// Create a temporary file that will simulate the Unix domain socket file in a
 | 
			
		||||
	// not-yet-ready state. We need this because the Kubelet keeps an eye on file
 | 
			
		||||
	// changes and acts on them, leading to potential race issues as described in
 | 
			
		||||
	// the referenced issue above
 | 
			
		||||
	f, err := ioutil.TempFile("", "test-domain-socket")
 | 
			
		||||
	require.NoErrorf(t, err, "Failed to create file for test purposes: %v while setting up: %s", err, label)
 | 
			
		||||
	testFile := f.Name()
 | 
			
		||||
	f.Close()
 | 
			
		||||
 | 
			
		||||
	// Start the check at this point
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		result, err := IsUnixDomainSocket(testFile)
 | 
			
		||||
		assert.Nil(t, err, "Unexpected error: %v from IsUnixDomainSocket for %s", err, label)
 | 
			
		||||
		assert.True(t, result, "Unexpected result: false from IsUnixDomainSocket: %v for %s", result, label)
 | 
			
		||||
		wg.Done()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Wait a sufficient amount of time to make sure the retry logic kicks in
 | 
			
		||||
	time.Sleep(socketDialRetryPeriod)
 | 
			
		||||
 | 
			
		||||
	// Replace the temporary file with an actual Unix domain socket file
 | 
			
		||||
	os.Remove(testFile)
 | 
			
		||||
	ta, err := net.ResolveUnixAddr("unix", testFile)
 | 
			
		||||
	require.NoErrorf(t, err, "Failed to ResolveUnixAddr: %v while setting up: %s", err, label)
 | 
			
		||||
	unixln, err := net.ListenUnix("unix", ta)
 | 
			
		||||
	require.NoErrorf(t, err, "Failed to ListenUnix: %v while setting up: %s", err, label)
 | 
			
		||||
 | 
			
		||||
	// Wait for the goroutine to finish, then close the socket
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	unixln.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestIsUnixDomainSocket(t *testing.T) {
 | 
			
		||||
	testPipe(t, "Named Pipe")
 | 
			
		||||
	testRegularFile(t, "Regular File that Exists", true)
 | 
			
		||||
	testRegularFile(t, "Regular File that Does Not Exist", false)
 | 
			
		||||
	testUnixDomainSocket(t, "Unix Domain Socket File")
 | 
			
		||||
	testPendingUnixDomainSocket(t, "Pending Unix Domain Socket File")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNormalizePath(t *testing.T) {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user