Merge pull request #115133 from ffromani/podresources-windows

node: create podresources endpoint also on windows
This commit is contained in:
Kubernetes Prow Robot 2023-04-11 15:35:19 -07:00 committed by GitHub
commit 0c969ad660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 11 deletions

View File

@ -861,10 +861,6 @@ function construct-windows-kubelet-flags {
# Turn off kernel memory cgroup notification.
flags+=" --kernel-memcg-notification=false"
# TODO(#78628): Re-enable KubeletPodResources when the issue is fixed.
# Force disable KubeletPodResources feature on Windows until #78628 is fixed.
flags+=" --feature-gates=KubeletPodResources=false"
WINDOWS_CONTAINER_RUNTIME_ENDPOINT=${KUBE_WINDOWS_CONTAINER_RUNTIME_ENDPOINT:-npipe:////./pipe/containerd-containerd}
flags+=" --container-runtime-endpoint=${WINDOWS_CONTAINER_RUNTIME_ENDPOINT}"

View File

@ -123,7 +123,6 @@ e2e test configuration for the latest environment variables.
```bash
KUBE_GCE_ENABLE_IP_ALIASES=true KUBERNETES_NODE_PLATFORM=windows \
KUBELET_TEST_ARGS=--feature-gates=KubeletPodResources=false \
LOGGING_STACKDRIVER_RESOURCE_TYPES=new NUM_NODES=2 \
NUM_WINDOWS_NODES=3 WINDOWS_NODE_OS_DISTRIBUTION=win2019 \
./hack/e2e-internal/e2e-up.sh

View File

@ -2805,7 +2805,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
return
@ -2819,7 +2819,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
DynamicResources: kl.containerManager,
}
server.ListenAndServePodResources(socket, providers)
server.ListenAndServePodResources(endpoint, providers)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

View File

@ -218,18 +218,19 @@ type PodResourcesProviders struct {
}
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, providers podresources.PodResourcesProviders) {
func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
l, err := util.CreateListener(socket)
l, err := util.CreateListener(endpoint)
if err != nil {
klog.ErrorS(err, "Failed to create listener for podResources endpoint")
os.Exit(1)
}
klog.InfoS("Starting to serve the podresources API", "endpoint", endpoint)
if err := server.Serve(l); err != nil {
klog.ErrorS(err, "Failed to serve")
os.Exit(1)

View File

@ -24,6 +24,7 @@ import (
"fmt"
"net"
"net/url"
"path/filepath"
"strings"
"syscall"
"time"
@ -117,9 +118,27 @@ func parseEndpoint(endpoint string) (string, string, error) {
}
}
// LocalEndpoint empty implementation
// LocalEndpoint returns the full path to a named pipe at the given endpoint - unlike on unix, we can't use sockets.
func LocalEndpoint(path, file string) (string, error) {
return "", fmt.Errorf("LocalEndpoints are unsupported in this build")
// extract the podresources config name from the path. We only need this on windows because the preferred layout of pipes,
// this is why we have the extra logic in here instead of changing the function signature. Join the file to make sure the
// last path component is a file, so the operation chain works..
podResourcesDir := filepath.Base(filepath.Dir(filepath.Join(path, file)))
if podResourcesDir == "" {
// should not happen because the user can configure a root directory, and we expected a subdirectory inside
// the user supplied root directory named like "pod-resources" or so.
return "", fmt.Errorf("cannot infer the podresources directory from path %q", path)
}
// windows pipes are expected to use forward slashes: https://learn.microsoft.com/windows/win32/ipc/pipe-names
// so using `url` like we do on unix gives us unclear benefits - see https://github.com/kubernetes/kubernetes/issues/78628
// So we just construct the path from scratch.
// Format: \\ServerName\pipe\PipeName
// Where where ServerName is either the name of a remote computer or a period, to specify the local computer.
// We only consider PipeName as regular windows path, while the pipe path components are fixed, hence we use constants.
serverPart := `\\.`
pipePart := "pipe"
pipeName := "kubelet-" + podResourcesDir
return npipeProtocol + "://" + filepath.Join(serverPart, pipePart, pipeName), nil
}
var tickCount = syscall.NewLazyDLL("kernel32.dll").NewProc("GetTickCount64")

View File

@ -277,3 +277,48 @@ func TestNormalizePath(t *testing.T) {
assert.Equal(t, test.normalizedPath, NormalizePath(test.originalpath))
}
}
func TestLocalEndpoint(t *testing.T) {
tests := []struct {
path string
file string
expectError bool
expectedFullPath string
}{
{
path: "/var/lib/kubelet/pod-resources",
file: "kube.sock", // this is not the default, but it's not relevant here
expectError: false,
expectedFullPath: `npipe://\\.\pipe\kubelet-pod-resources`,
},
}
for _, test := range tests {
fullPath, err := LocalEndpoint(test.path, test.file)
if test.expectError {
assert.NotNil(t, err, "expected error")
continue
}
assert.Nil(t, err, "expected no error")
assert.Equal(t, test.expectedFullPath, fullPath)
}
}
func TestLocalEndpointRoundTrip(t *testing.T) {
npipeDialPointer := reflect.ValueOf(npipeDial).Pointer()
expectedDialerName := runtime.FuncForPC(npipeDialPointer).Name()
expectedAddress := "//./pipe/kubelet-pod-resources"
fullPath, err := LocalEndpoint(`pod-resources`, "kubelet")
require.NoErrorf(t, err, "Failed to create the local endpoint path")
address, dialer, err := GetAddressAndDialer(fullPath)
require.NoErrorf(t, err, "Failed to parse the endpoint path and get back address and dialer (path=%q)", fullPath)
dialerPointer := reflect.ValueOf(dialer).Pointer()
actualDialerName := runtime.FuncForPC(dialerPointer).Name()
assert.Equalf(t, npipeDialPointer, dialerPointer,
"Expected dialer %s, but get %s", expectedDialerName, actualDialerName)
assert.Equal(t, expectedAddress, address)
}