diff --git a/pkg/kubelet/apis/podresources/grpc/ratelimit.go b/pkg/kubelet/apis/grpc/ratelimit.go similarity index 72% rename from pkg/kubelet/apis/podresources/grpc/ratelimit.go rename to pkg/kubelet/apis/grpc/ratelimit.go index b9214b13c16..149bcf279b2 100644 --- a/pkg/kubelet/apis/podresources/grpc/ratelimit.go +++ b/pkg/kubelet/apis/grpc/ratelimit.go @@ -27,17 +27,6 @@ import ( "google.golang.org/grpc/status" ) -const ( - // DefaultQPS is determined by empirically reviewing known consumers of the API. - // It's at least unlikely that there is a legitimate need to query podresources - // more than 100 times per second, the other subsystems are not guaranteed to react - // so fast in the first place. - DefaultQPS = 100 - // DefaultBurstTokens is determined by empirically reviewing known consumers of the API. - // See the documentation of DefaultQPS, same caveats apply. - DefaultBurstTokens = 10 -) - var ( ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit") ) @@ -59,9 +48,10 @@ func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor } } -func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption { +// WithRateLimiter creates new rate limiter with unary interceptor. +func WithRateLimiter(serviceName string, qps, burstTokens int32) grpc.ServerOption { qpsVal := gotimerate.Limit(qps) burstVal := int(burstTokens) - klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal) + klog.InfoS("Setting rate limiting for endpoint", "service", serviceName, "qps", qpsVal, "burstTokens", burstVal) return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal))) } diff --git a/pkg/kubelet/apis/podresources/constants.go b/pkg/kubelet/apis/podresources/constants.go index 6cc4c6a261a..97c4b930fd2 100644 --- a/pkg/kubelet/apis/podresources/constants.go +++ b/pkg/kubelet/apis/podresources/constants.go @@ -19,4 +19,14 @@ package podresources const ( // Socket is the name of the podresources server socket Socket = "kubelet" + + // DefaultQPS is determined by empirically reviewing known consumers of the API. + // It's at least unlikely that there is a legitimate need to query podresources + // more than 100 times per second, the other subsystems are not guaranteed to react + // so fast in the first place. + DefaultQPS = 100 + + // DefaultBurstTokens is determined by empirically reviewing known consumers of the API. + // See the documentation of DefaultQPS, same caveats apply. + DefaultBurstTokens = 10 ) diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index f66505ff883..87a017f9ccc 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -78,8 +78,8 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/validation" "k8s.io/kubernetes/pkg/features" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" + apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" - podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober" servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics" @@ -219,7 +219,7 @@ type PodResourcesProviders struct { // ListenAndServePodResources initializes a gRPC server to serve the PodResources service func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) { - server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens)) + server := grpc.NewServer(apisgrpc.WithRateLimiter("podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens)) podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers)) podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers)) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index fbcd5931dbd..d1333a60cc3 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -31,8 +31,8 @@ import ( kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" - podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/util" testutils "k8s.io/kubernetes/test/utils" @@ -974,7 +974,7 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P framework.ExpectNoError(err, "GetV1Client() failed err %v", err) defer conn.Close() - tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens + tries := podresources.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens errs := []error{} ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries)) @@ -994,7 +994,7 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P // constraints than to risk flakes at this stage. errLimitExceededCount := 0 for _, err := range errs[1:] { - if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) { + if errors.Is(err, apisgrpc.ErrorLimitExceeded) { errLimitExceededCount++ } }