diff --git a/pkg/kubelet/apis/podresources/grpc/ratelimit.go b/pkg/kubelet/apis/podresources/grpc/ratelimit.go new file mode 100644 index 00000000000..b9214b13c16 --- /dev/null +++ b/pkg/kubelet/apis/podresources/grpc/ratelimit.go @@ -0,0 +1,67 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + + gotimerate "golang.org/x/time/rate" + "k8s.io/klog/v2" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + // DefaultQPS is determined by empirically reviewing known consumers of the API. + // It's at least unlikely that there is a legitimate need to query podresources + // more than 100 times per second, the other subsystems are not guaranteed to react + // so fast in the first place. + DefaultQPS = 100 + // DefaultBurstTokens is determined by empirically reviewing known consumers of the API. + // See the documentation of DefaultQPS, same caveats apply. + DefaultBurstTokens = 10 +) + +var ( + ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit") +) + +// Limiter defines the interface to perform request rate limiting, +// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter +type Limiter interface { + // Allow reports whether an event may happen now. + Allow() bool +} + +// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting. +func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !limiter.Allow() { + return nil, ErrorLimitExceeded + } + return handler(ctx, req) + } +} + +func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption { + qpsVal := gotimerate.Limit(qps) + burstVal := int(burstTokens) + klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal) + return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal))) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index beac8dbcfd8..302c70df015 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2751,7 +2751,15 @@ func (kl *Kubelet) ListenAndServePodResources() { klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err) return } - server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager) + + providers := server.PodResourcesProviders{ + Pods: kl.podManager, + Devices: kl.containerManager, + Cpus: kl.containerManager, + Memory: kl.containerManager, + } + + server.ListenAndServePodResources(socket, providers) } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index e6d647efb3e..74c5dba9951 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -75,6 +75,7 @@ import ( "k8s.io/kubernetes/pkg/features" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/cri/streaming" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" @@ -209,11 +210,20 @@ func ListenAndServeKubeletReadOnlyServer( } } +type PodResourcesProviders struct { + Pods podresources.PodsProvider + Devices podresources.DevicesProvider + Cpus podresources.CPUsProvider + Memory podresources.MemoryProvider +} + // ListenAndServePodResources initializes a gRPC server to serve the PodResources service -func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider, memoryProvider podresources.MemoryProvider) { - server := grpc.NewServer() - podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider)) - podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider, memoryProvider)) +func ListenAndServePodResources(socket string, providers PodResourcesProviders) { + server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens)) + + podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices)) + podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory)) + l, err := util.CreateListener(socket) if err != nil { klog.ErrorS(err, "Failed to create listener for podResources endpoint") diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 5cf2f834cec..086d99f6e94 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -18,6 +18,7 @@ package e2enode import ( "context" + "errors" "fmt" "os" "strings" @@ -31,6 +32,7 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/util" @@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P ginkgo.By("Ensuring the metrics match the expectations a few more times") gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) }) + }) + ginkgo.Context("with the builtin rate limit values", func() { + ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) { + // ensure APIs have been called at least once + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + ginkgo.By("Connecting to the kubelet endpoint") + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens + errs := []error{} + + ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries)) + startTime := time.Now() + for try := 0; try < tries; try++ { + _, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) + errs = append(errs, err) + } + elapsed := time.Since(startTime) + + ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed)) + + framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0]) + // we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with + // enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while + // we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax + // constraints than to risk flakes at this stage. + errLimitExceededCount := 0 + for _, err := range errs[1:] { + if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) { + errLimitExceededCount++ + } + } + gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed) + + framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries) + }) }) })