diff --git a/pkg/kubelet/util/util.go b/pkg/kubelet/util/util.go index 97933afe39b..79473a18184 100644 --- a/pkg/kubelet/util/util.go +++ b/pkg/kubelet/util/util.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/util/filesystem" ) // FromApiserverCache modifies so that the GET request will @@ -29,6 +30,8 @@ func FromApiserverCache(opts *metav1.GetOptions) { opts.ResourceVersion = "0" } +var IsUnixDomainSocket = filesystem.IsUnixDomainSocket + // GetNodenameForKernel gets hostname value to set in the hostname field (the nodename field of struct utsname) of the pod. func GetNodenameForKernel(hostname string, hostDomainName string, setHostnameAsFQDN *bool) (string, error) { kernelHostname := hostname diff --git a/pkg/kubelet/util/util_test.go b/pkg/kubelet/util/util_test.go index 11a82f61ed5..383b1fdb14f 100644 --- a/pkg/kubelet/util/util_test.go +++ b/pkg/kubelet/util/util_test.go @@ -17,12 +17,9 @@ limitations under the License. package util import ( - "net" - "os" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestGetNodenameForKernel(t *testing.T) { @@ -89,63 +86,3 @@ func TestGetNodenameForKernel(t *testing.T) { } } - -func TestIsUnixDomainSocket(t *testing.T) { - tests := []struct { - label string - listenOnSocket bool - expectSocket bool - expectError bool - invalidFile bool - }{ - { - label: "Domain Socket file", - listenOnSocket: true, - expectSocket: true, - expectError: false, - }, - { - label: "Non Existent file", - invalidFile: true, - expectError: true, - }, - { - label: "Regular file", - listenOnSocket: false, - expectSocket: false, - expectError: false, - }, - } - for _, test := range tests { - f, err := os.CreateTemp("", "test-domain-socket") - require.NoErrorf(t, err, "Failed to create file for test purposes: %v while setting up: %s", err, test.label) - addr := f.Name() - f.Close() - var ln *net.UnixListener - if test.listenOnSocket { - os.Remove(addr) - ta, err := net.ResolveUnixAddr("unix", addr) - require.NoErrorf(t, err, "Failed to ResolveUnixAddr: %v while setting up: %s", err, test.label) - ln, err = net.ListenUnix("unix", ta) - require.NoErrorf(t, err, "Failed to ListenUnix: %v while setting up: %s", err, test.label) - } - fileToTest := addr - if test.invalidFile { - fileToTest = fileToTest + ".invalid" - } - result, err := IsUnixDomainSocket(fileToTest) - if test.listenOnSocket { - // this takes care of removing the file associated with the domain socket - ln.Close() - } else { - // explicitly remove regular file - os.Remove(addr) - } - if test.expectError { - assert.Errorf(t, err, "Unexpected nil error from IsUnixDomainSocket for %s", test.label) - } else { - assert.NoErrorf(t, err, "Unexpected error invoking IsUnixDomainSocket for %s", test.label) - } - assert.Equal(t, result, test.expectSocket, "Unexpected result from IsUnixDomainSocket: %v for %s", result, test.label) - } -} diff --git a/pkg/kubelet/util/util_unix.go b/pkg/kubelet/util/util_unix.go index e68a194e6d3..c1dd608a7ea 100644 --- a/pkg/kubelet/util/util_unix.go +++ b/pkg/kubelet/util/util_unix.go @@ -136,18 +136,6 @@ func LocalEndpoint(path, file string) (string, error) { return filepath.Join(u.String(), file+".sock"), nil } -// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file -func IsUnixDomainSocket(filePath string) (bool, error) { - fi, err := os.Stat(filePath) - if err != nil { - return false, fmt.Errorf("stat file %s failed: %v", filePath, err) - } - if fi.Mode()&os.ModeSocket == 0 { - return false, nil - } - return true, nil -} - // NormalizePath is a no-op for Linux for now func NormalizePath(path string) string { return path diff --git a/pkg/kubelet/util/util_windows.go b/pkg/kubelet/util/util_windows.go index b8fc558d2e2..3837b45aa6a 100644 --- a/pkg/kubelet/util/util_windows.go +++ b/pkg/kubelet/util/util_windows.go @@ -24,26 +24,17 @@ import ( "fmt" "net" "net/url" - "os" "path/filepath" "strings" "syscall" "time" "github.com/Microsoft/go-winio" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" ) const ( tcpProtocol = "tcp" npipeProtocol = "npipe" - // Amount of time to wait between attempting to use a Unix domain socket. - // As detailed in https://github.com/kubernetes/kubernetes/issues/104584 - // the first attempt will most likely fail, hence the need to retry - socketDialRetryPeriod = 1 * time.Second - // Overall timeout value to dial a Unix domain socket, including retries - socketDialTimeout = 4 * time.Second ) // CreateListener creates a listener on the specified endpoint. @@ -154,54 +145,6 @@ func GetBootTime() (time.Time, error) { return currentTime.Add(-time.Duration(output) * time.Millisecond).Truncate(time.Second), nil } -// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file -// Note that due to the retry logic inside, it could take up to 4 seconds -// to determine whether or not the file path supplied is a Unix domain socket -func IsUnixDomainSocket(filePath string) (bool, error) { - // Due to the absence of golang support for os.ModeSocket in Windows (https://github.com/golang/go/issues/33357) - // we need to dial the file and check if we receive an error to determine if a file is Unix Domain Socket file. - - // Note that querrying for the Reparse Points (https://docs.microsoft.com/en-us/windows/win32/fileio/reparse-points) - // for the file (using FSCTL_GET_REPARSE_POINT) and checking for reparse tag: reparseTagSocket - // does NOT work in 1809 if the socket file is created within a bind mounted directory by a container - // and the FSCTL is issued in the host by the kubelet. - - // If the file does not exist, it cannot be a Unix domain socket. - if _, err := os.Stat(filePath); os.IsNotExist(err) { - return false, fmt.Errorf("File %s not found. Err: %v", filePath, err) - } - - klog.V(6).InfoS("Function IsUnixDomainSocket starts", "filePath", filePath) - // As detailed in https://github.com/kubernetes/kubernetes/issues/104584 we cannot rely - // on the Unix Domain socket working on the very first try, hence the potential need to - // dial multiple times - var lastSocketErr error - err := wait.PollImmediate(socketDialRetryPeriod, socketDialTimeout, - func() (bool, error) { - klog.V(6).InfoS("Dialing the socket", "filePath", filePath) - var c net.Conn - c, lastSocketErr = net.Dial("unix", filePath) - if lastSocketErr == nil { - c.Close() - klog.V(6).InfoS("Socket dialed successfully", "filePath", filePath) - return true, nil - } - klog.V(6).InfoS("Failed the current attempt to dial the socket, so pausing before retry", - "filePath", filePath, "err", lastSocketErr, "socketDialRetryPeriod", - socketDialRetryPeriod) - return false, nil - }) - - // PollImmediate will return "timed out waiting for the condition" if the function it - // invokes never returns true - if err != nil { - klog.V(2).InfoS("Failed all attempts to dial the socket so marking it as a non-Unix Domain socket. Last socket error along with the error from PollImmediate follow", - "filePath", filePath, "lastSocketErr", lastSocketErr, "err", err) - return false, nil - } - return true, nil -} - // NormalizePath converts FS paths returned by certain go frameworks (like fsnotify) // to native Windows paths that can be passed to Windows specific code func NormalizePath(path string) string { diff --git a/pkg/kubelet/util/util_windows_test.go b/pkg/kubelet/util/util_windows_test.go index 1450365a4c5..fa5ab43a5e5 100644 --- a/pkg/kubelet/util/util_windows_test.go +++ b/pkg/kubelet/util/util_windows_test.go @@ -182,64 +182,6 @@ func TestParseEndpoint(t *testing.T) { } -func TestIsUnixDomainSocketPipe(t *testing.T) { - generatePipeName := func(suffixLen int) string { - rand.Seed(time.Now().UnixNano()) - letter := []rune("abcdef0123456789") - b := make([]rune, suffixLen) - for i := range b { - b[i] = letter[rand.Intn(len(letter))] - } - return "\\\\.\\pipe\\test-pipe" + string(b) - } - testFile := generatePipeName(4) - pipeln, err := winio.ListenPipe(testFile, &winio.PipeConfig{SecurityDescriptor: "D:P(A;;GA;;;BA)(A;;GA;;;SY)"}) - defer pipeln.Close() - - require.NoErrorf(t, err, "Failed to listen on named pipe for test purposes: %v", err) - result, err := IsUnixDomainSocket(testFile) - assert.NoError(t, err, "Unexpected error from IsUnixDomainSocket.") - assert.False(t, result, "Unexpected result: true from IsUnixDomainSocket.") -} - -// This is required as on Windows it's possible for the socket file backing a Unix domain socket to -// exist but not be ready for socket communications yet as per -// https://github.com/kubernetes/kubernetes/issues/104584 -func TestPendingUnixDomainSocket(t *testing.T) { - // Create a temporary file that will simulate the Unix domain socket file in a - // not-yet-ready state. We need this because the Kubelet keeps an eye on file - // changes and acts on them, leading to potential race issues as described in - // the referenced issue above - f, err := os.CreateTemp("", "test-domain-socket") - require.NoErrorf(t, err, "Failed to create file for test purposes: %v", err) - testFile := f.Name() - f.Close() - - // Start the check at this point - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - result, err := IsUnixDomainSocket(testFile) - assert.Nil(t, err, "Unexpected error from IsUnixDomainSocket: %v", err) - assert.True(t, result, "Unexpected result: false from IsUnixDomainSocket.") - wg.Done() - }() - - // Wait a sufficient amount of time to make sure the retry logic kicks in - time.Sleep(socketDialRetryPeriod) - - // Replace the temporary file with an actual Unix domain socket file - os.Remove(testFile) - ta, err := net.ResolveUnixAddr("unix", testFile) - require.NoError(t, err, "Failed to ResolveUnixAddr.") - unixln, err := net.ListenUnix("unix", ta) - require.NoError(t, err, "Failed to ListenUnix.") - - // Wait for the goroutine to finish, then close the socket - wg.Wait() - unixln.Close() -} - func TestNormalizePath(t *testing.T) { tests := []struct { originalpath string diff --git a/pkg/util/filesystem/util_test.go b/pkg/util/filesystem/util_test.go new file mode 100644 index 00000000000..87ec73f067a --- /dev/null +++ b/pkg/util/filesystem/util_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "net" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsUnixDomainSocket(t *testing.T) { + tests := []struct { + label string + listenOnSocket bool + expectSocket bool + expectError bool + invalidFile bool + }{ + { + label: "Domain Socket file", + listenOnSocket: true, + expectSocket: true, + expectError: false, + }, + { + label: "Non Existent file", + invalidFile: true, + expectError: true, + }, + { + label: "Regular file", + listenOnSocket: false, + expectSocket: false, + expectError: false, + }, + } + for _, test := range tests { + f, err := os.CreateTemp("", "test-domain-socket") + require.NoErrorf(t, err, "Failed to create file for test purposes: %v while setting up: %s", err, test.label) + addr := f.Name() + f.Close() + var ln *net.UnixListener + if test.listenOnSocket { + os.Remove(addr) + ta, err := net.ResolveUnixAddr("unix", addr) + require.NoErrorf(t, err, "Failed to ResolveUnixAddr: %v while setting up: %s", err, test.label) + ln, err = net.ListenUnix("unix", ta) + require.NoErrorf(t, err, "Failed to ListenUnix: %v while setting up: %s", err, test.label) + } + fileToTest := addr + if test.invalidFile { + fileToTest = fileToTest + ".invalid" + } + result, err := IsUnixDomainSocket(fileToTest) + if test.listenOnSocket { + // this takes care of removing the file associated with the domain socket + ln.Close() + } else { + // explicitly remove regular file + os.Remove(addr) + } + if test.expectError { + assert.Errorf(t, err, "Unexpected nil error from IsUnixDomainSocket for %s", test.label) + } else { + assert.NoErrorf(t, err, "Unexpected error invoking IsUnixDomainSocket for %s", test.label) + } + assert.Equal(t, result, test.expectSocket, "Unexpected result from IsUnixDomainSocket: %v for %s", result, test.label) + } +} diff --git a/pkg/util/filesystem/util_unix.go b/pkg/util/filesystem/util_unix.go new file mode 100644 index 00000000000..df887f94508 --- /dev/null +++ b/pkg/util/filesystem/util_unix.go @@ -0,0 +1,37 @@ +//go:build freebsd || linux || darwin +// +build freebsd linux darwin + +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "fmt" + "os" +) + +// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file +func IsUnixDomainSocket(filePath string) (bool, error) { + fi, err := os.Stat(filePath) + if err != nil { + return false, fmt.Errorf("stat file %s failed: %v", filePath, err) + } + if fi.Mode()&os.ModeSocket == 0 { + return false, nil + } + return true, nil +} diff --git a/pkg/util/filesystem/util_windows.go b/pkg/util/filesystem/util_windows.go new file mode 100644 index 00000000000..cd6a11ed308 --- /dev/null +++ b/pkg/util/filesystem/util_windows.go @@ -0,0 +1,87 @@ +//go:build windows +// +build windows + +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "fmt" + "net" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +const ( + // Amount of time to wait between attempting to use a Unix domain socket. + // As detailed in https://github.com/kubernetes/kubernetes/issues/104584 + // the first attempt will most likely fail, hence the need to retry + socketDialRetryPeriod = 1 * time.Second + // Overall timeout value to dial a Unix domain socket, including retries + socketDialTimeout = 4 * time.Second +) + +// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file +// Note that due to the retry logic inside, it could take up to 4 seconds +// to determine whether or not the file path supplied is a Unix domain socket +func IsUnixDomainSocket(filePath string) (bool, error) { + // Due to the absence of golang support for os.ModeSocket in Windows (https://github.com/golang/go/issues/33357) + // we need to dial the file and check if we receive an error to determine if a file is Unix Domain Socket file. + + // Note that querrying for the Reparse Points (https://docs.microsoft.com/en-us/windows/win32/fileio/reparse-points) + // for the file (using FSCTL_GET_REPARSE_POINT) and checking for reparse tag: reparseTagSocket + // does NOT work in 1809 if the socket file is created within a bind mounted directory by a container + // and the FSCTL is issued in the host by the kubelet. + + // If the file does not exist, it cannot be a Unix domain socket. + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return false, fmt.Errorf("File %s not found. Err: %v", filePath, err) + } + + klog.V(6).InfoS("Function IsUnixDomainSocket starts", "filePath", filePath) + // As detailed in https://github.com/kubernetes/kubernetes/issues/104584 we cannot rely + // on the Unix Domain socket working on the very first try, hence the potential need to + // dial multiple times + var lastSocketErr error + err := wait.PollImmediate(socketDialRetryPeriod, socketDialTimeout, + func() (bool, error) { + klog.V(6).InfoS("Dialing the socket", "filePath", filePath) + var c net.Conn + c, lastSocketErr = net.Dial("unix", filePath) + if lastSocketErr == nil { + c.Close() + klog.V(6).InfoS("Socket dialed successfully", "filePath", filePath) + return true, nil + } + klog.V(6).InfoS("Failed the current attempt to dial the socket, so pausing before retry", + "filePath", filePath, "err", lastSocketErr, "socketDialRetryPeriod", + socketDialRetryPeriod) + return false, nil + }) + + // PollImmediate will return "timed out waiting for the condition" if the function it + // invokes never returns true + if err != nil { + klog.V(2).InfoS("Failed all attempts to dial the socket so marking it as a non-Unix Domain socket. Last socket error along with the error from PollImmediate follow", + "filePath", filePath, "lastSocketErr", lastSocketErr, "err", err) + return false, nil + } + return true, nil +} diff --git a/pkg/util/filesystem/util_windows_test.go b/pkg/util/filesystem/util_windows_test.go new file mode 100644 index 00000000000..7a4afefce42 --- /dev/null +++ b/pkg/util/filesystem/util_windows_test.go @@ -0,0 +1,89 @@ +//go:build windows +// +build windows + +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "net" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsUnixDomainSocketPipe(t *testing.T) { + generatePipeName := func(suffixLen int) string { + rand.Seed(time.Now().UnixNano()) + letter := []rune("abcdef0123456789") + b := make([]rune, suffixLen) + for i := range b { + b[i] = letter[rand.Intn(len(letter))] + } + return "\\\\.\\pipe\\test-pipe" + string(b) + } + testFile := generatePipeName(4) + pipeln, err := winio.ListenPipe(testFile, &winio.PipeConfig{SecurityDescriptor: "D:P(A;;GA;;;BA)(A;;GA;;;SY)"}) + defer pipeln.Close() + + require.NoErrorf(t, err, "Failed to listen on named pipe for test purposes: %v", err) + result, err := IsUnixDomainSocket(testFile) + assert.NoError(t, err, "Unexpected error from IsUnixDomainSocket.") + assert.False(t, result, "Unexpected result: true from IsUnixDomainSocket.") +} + +// This is required as on Windows it's possible for the socket file backing a Unix domain socket to +// exist but not be ready for socket communications yet as per +// https://github.com/kubernetes/kubernetes/issues/104584 +func TestPendingUnixDomainSocket(t *testing.T) { + // Create a temporary file that will simulate the Unix domain socket file in a + // not-yet-ready state. We need this because the Kubelet keeps an eye on file + // changes and acts on them, leading to potential race issues as described in + // the referenced issue above + f, err := os.CreateTemp("", "test-domain-socket") + require.NoErrorf(t, err, "Failed to create file for test purposes: %v", err) + testFile := f.Name() + f.Close() + + // Start the check at this point + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + result, err := IsUnixDomainSocket(testFile) + assert.Nil(t, err, "Unexpected error from IsUnixDomainSocket: %v", err) + assert.True(t, result, "Unexpected result: false from IsUnixDomainSocket.") + wg.Done() + }() + + // Wait a sufficient amount of time to make sure the retry logic kicks in + time.Sleep(socketDialRetryPeriod) + + // Replace the temporary file with an actual Unix domain socket file + os.Remove(testFile) + ta, err := net.ResolveUnixAddr("unix", testFile) + require.NoError(t, err, "Failed to ResolveUnixAddr.") + unixln, err := net.ListenUnix("unix", ta) + require.NoError(t, err, "Failed to ListenUnix.") + + // Wait for the goroutine to finish, then close the socket + wg.Wait() + unixln.Close() +} diff --git a/pkg/volume/util/hostutil/hostutil_test.go b/pkg/volume/util/hostutil/hostutil_test.go index 886421d4f33..4e68c6f1a9a 100644 --- a/pkg/volume/util/hostutil/hostutil_test.go +++ b/pkg/volume/util/hostutil/hostutil_test.go @@ -119,11 +119,6 @@ func createSocketFile(socketDir string) (string, error) { } func TestGetFileType(t *testing.T) { - // Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023 - if goruntime.GOOS == "windows" { - t.Skip("Skipping test that fails on Windows") - } - hu := NewHostUtil() testCase := []struct { diff --git a/pkg/volume/util/hostutil/hostutil_windows.go b/pkg/volume/util/hostutil/hostutil_windows.go index c039ada4066..51ad0344a13 100644 --- a/pkg/volume/util/hostutil/hostutil_windows.go +++ b/pkg/volume/util/hostutil/hostutil_windows.go @@ -21,12 +21,16 @@ package hostutil import ( "fmt" + "io/fs" "os" "path" "path/filepath" "strings" + "syscall" + "golang.org/x/sys/windows" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/util/filesystem" "k8s.io/mount-utils" utilpath "k8s.io/utils/path" ) @@ -87,9 +91,28 @@ func (hu *HostUtil) MakeRShared(path string) error { return nil } +func isSystemCannotAccessErr(err error) bool { + if fserr, ok := err.(*fs.PathError); ok { + errno, ok := fserr.Err.(syscall.Errno) + return ok && errno == windows.ERROR_CANT_ACCESS_FILE + } + + return false +} + // GetFileType checks for sockets/block/character devices func (hu *(HostUtil)) GetFileType(pathname string) (FileType, error) { - return getFileType(pathname) + filetype, err := getFileType(pathname) + + // os.Stat will return a 1920 error (windows.ERROR_CANT_ACCESS_FILE) if we use it on a Unix Socket + // on Windows. In this case, we need to use a different method to check if it's a Unix Socket. + if isSystemCannotAccessErr(err) { + if isSocket, errSocket := filesystem.IsUnixDomainSocket(pathname); errSocket == nil && isSocket { + return FileTypeSocket, nil + } + } + + return filetype, err } // PathExists checks whether the path exists