diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 54162fab14c..76008e4ca0d 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -861,10 +861,6 @@ function construct-windows-kubelet-flags { # Turn off kernel memory cgroup notification. flags+=" --kernel-memcg-notification=false" - # TODO(#78628): Re-enable KubeletPodResources when the issue is fixed. - # Force disable KubeletPodResources feature on Windows until #78628 is fixed. - flags+=" --feature-gates=KubeletPodResources=false" - WINDOWS_CONTAINER_RUNTIME_ENDPOINT=${KUBE_WINDOWS_CONTAINER_RUNTIME_ENDPOINT:-npipe:////./pipe/containerd-containerd} flags+=" --container-runtime-endpoint=${WINDOWS_CONTAINER_RUNTIME_ENDPOINT}" diff --git a/cluster/gce/windows/README-GCE-Windows-kube-up.md b/cluster/gce/windows/README-GCE-Windows-kube-up.md index 67adb6e0e38..2e709c7dff0 100644 --- a/cluster/gce/windows/README-GCE-Windows-kube-up.md +++ b/cluster/gce/windows/README-GCE-Windows-kube-up.md @@ -123,7 +123,6 @@ e2e test configuration for the latest environment variables. ```bash KUBE_GCE_ENABLE_IP_ALIASES=true KUBERNETES_NODE_PLATFORM=windows \ - KUBELET_TEST_ARGS=--feature-gates=KubeletPodResources=false \ LOGGING_STACKDRIVER_RESOURCE_TYPES=new NUM_NODES=2 \ NUM_WINDOWS_NODES=3 WINDOWS_NODE_OS_DISTRIBUTION=win2019 \ ./hack/e2e-internal/e2e-up.sh diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3afbd6b21e4..7f91802b567 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2805,7 +2805,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { // ListenAndServePodResources runs the kubelet podresources grpc service func (kl *Kubelet) ListenAndServePodResources() { - socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket) + endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket) if err != nil { klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err) return @@ -2819,7 +2819,7 @@ func (kl *Kubelet) ListenAndServePodResources() { DynamicResources: kl.containerManager, } - server.ListenAndServePodResources(socket, providers) + server.ListenAndServePodResources(endpoint, providers) } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 0c033c9d69b..beaaeabb4a8 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -218,18 +218,19 @@ type PodResourcesProviders struct { } // ListenAndServePodResources initializes a gRPC server to serve the PodResources service -func ListenAndServePodResources(socket string, providers podresources.PodResourcesProviders) { +func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) { server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens)) podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers)) podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers)) - l, err := util.CreateListener(socket) + l, err := util.CreateListener(endpoint) if err != nil { klog.ErrorS(err, "Failed to create listener for podResources endpoint") os.Exit(1) } + klog.InfoS("Starting to serve the podresources API", "endpoint", endpoint) if err := server.Serve(l); err != nil { klog.ErrorS(err, "Failed to serve") os.Exit(1) diff --git a/pkg/kubelet/util/util_windows.go b/pkg/kubelet/util/util_windows.go index af51d45c605..33a279e9a1a 100644 --- a/pkg/kubelet/util/util_windows.go +++ b/pkg/kubelet/util/util_windows.go @@ -24,6 +24,7 @@ import ( "fmt" "net" "net/url" + "path/filepath" "strings" "syscall" "time" @@ -117,9 +118,27 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// LocalEndpoint empty implementation +// LocalEndpoint returns the full path to a named pipe at the given endpoint - unlike on unix, we can't use sockets. func LocalEndpoint(path, file string) (string, error) { - return "", fmt.Errorf("LocalEndpoints are unsupported in this build") + // extract the podresources config name from the path. We only need this on windows because the preferred layout of pipes, + // this is why we have the extra logic in here instead of changing the function signature. Join the file to make sure the + // last path component is a file, so the operation chain works.. + podResourcesDir := filepath.Base(filepath.Dir(filepath.Join(path, file))) + if podResourcesDir == "" { + // should not happen because the user can configure a root directory, and we expected a subdirectory inside + // the user supplied root directory named like "pod-resources" or so. + return "", fmt.Errorf("cannot infer the podresources directory from path %q", path) + } + // windows pipes are expected to use forward slashes: https://learn.microsoft.com/windows/win32/ipc/pipe-names + // so using `url` like we do on unix gives us unclear benefits - see https://github.com/kubernetes/kubernetes/issues/78628 + // So we just construct the path from scratch. + // Format: \\ServerName\pipe\PipeName + // Where where ServerName is either the name of a remote computer or a period, to specify the local computer. + // We only consider PipeName as regular windows path, while the pipe path components are fixed, hence we use constants. + serverPart := `\\.` + pipePart := "pipe" + pipeName := "kubelet-" + podResourcesDir + return npipeProtocol + "://" + filepath.Join(serverPart, pipePart, pipeName), nil } var tickCount = syscall.NewLazyDLL("kernel32.dll").NewProc("GetTickCount64") diff --git a/pkg/kubelet/util/util_windows_test.go b/pkg/kubelet/util/util_windows_test.go index 54d08f0bf96..d718f6add3d 100644 --- a/pkg/kubelet/util/util_windows_test.go +++ b/pkg/kubelet/util/util_windows_test.go @@ -277,3 +277,48 @@ func TestNormalizePath(t *testing.T) { assert.Equal(t, test.normalizedPath, NormalizePath(test.originalpath)) } } + +func TestLocalEndpoint(t *testing.T) { + tests := []struct { + path string + file string + expectError bool + expectedFullPath string + }{ + { + path: "/var/lib/kubelet/pod-resources", + file: "kube.sock", // this is not the default, but it's not relevant here + expectError: false, + expectedFullPath: `npipe://\\.\pipe\kubelet-pod-resources`, + }, + } + for _, test := range tests { + fullPath, err := LocalEndpoint(test.path, test.file) + if test.expectError { + assert.NotNil(t, err, "expected error") + continue + } + assert.Nil(t, err, "expected no error") + assert.Equal(t, test.expectedFullPath, fullPath) + } +} + +func TestLocalEndpointRoundTrip(t *testing.T) { + npipeDialPointer := reflect.ValueOf(npipeDial).Pointer() + expectedDialerName := runtime.FuncForPC(npipeDialPointer).Name() + expectedAddress := "//./pipe/kubelet-pod-resources" + + fullPath, err := LocalEndpoint(`pod-resources`, "kubelet") + require.NoErrorf(t, err, "Failed to create the local endpoint path") + + address, dialer, err := GetAddressAndDialer(fullPath) + require.NoErrorf(t, err, "Failed to parse the endpoint path and get back address and dialer (path=%q)", fullPath) + + dialerPointer := reflect.ValueOf(dialer).Pointer() + actualDialerName := runtime.FuncForPC(dialerPointer).Name() + + assert.Equalf(t, npipeDialPointer, dialerPointer, + "Expected dialer %s, but get %s", expectedDialerName, actualDialerName) + + assert.Equal(t, expectedAddress, address) +}