mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
Move grpc rate limitter from podresource folder
Rate limitter.go file is a generic file implementing grpc Limiter interface. This file can be reuse by other gRPC API not only by podresource. Change-Id: I905a46b5b605fbb175eb9ad6c15019ffdc7f2563
This commit is contained in:
parent
d0dfe64334
commit
122ff5a212
@ -27,17 +27,6 @@ import (
|
|||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// DefaultQPS is determined by empirically reviewing known consumers of the API.
|
|
||||||
// It's at least unlikely that there is a legitimate need to query podresources
|
|
||||||
// more than 100 times per second, the other subsystems are not guaranteed to react
|
|
||||||
// so fast in the first place.
|
|
||||||
DefaultQPS = 100
|
|
||||||
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
|
|
||||||
// See the documentation of DefaultQPS, same caveats apply.
|
|
||||||
DefaultBurstTokens = 10
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
|
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
|
||||||
)
|
)
|
||||||
@ -59,9 +48,10 @@ func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption {
|
// WithRateLimiter creates new rate limiter with unary interceptor.
|
||||||
|
func WithRateLimiter(serviceName string, qps, burstTokens int32) grpc.ServerOption {
|
||||||
qpsVal := gotimerate.Limit(qps)
|
qpsVal := gotimerate.Limit(qps)
|
||||||
burstVal := int(burstTokens)
|
burstVal := int(burstTokens)
|
||||||
klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal)
|
klog.InfoS("Setting rate limiting for endpoint", "service", serviceName, "qps", qpsVal, "burstTokens", burstVal)
|
||||||
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
|
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
|
||||||
}
|
}
|
@ -19,4 +19,14 @@ package podresources
|
|||||||
const (
|
const (
|
||||||
// Socket is the name of the podresources server socket
|
// Socket is the name of the podresources server socket
|
||||||
Socket = "kubelet"
|
Socket = "kubelet"
|
||||||
|
|
||||||
|
// DefaultQPS is determined by empirically reviewing known consumers of the API.
|
||||||
|
// It's at least unlikely that there is a legitimate need to query podresources
|
||||||
|
// more than 100 times per second, the other subsystems are not guaranteed to react
|
||||||
|
// so fast in the first place.
|
||||||
|
DefaultQPS = 100
|
||||||
|
|
||||||
|
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
|
||||||
|
// See the documentation of DefaultQPS, same caveats apply.
|
||||||
|
DefaultBurstTokens = 10
|
||||||
)
|
)
|
||||||
|
@ -78,8 +78,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
|
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||||
|
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||||
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
|
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||||
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
|
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
|
||||||
@ -219,7 +219,7 @@ type PodResourcesProviders struct {
|
|||||||
|
|
||||||
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
|
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
|
||||||
func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
|
func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
|
||||||
server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
|
server := grpc.NewServer(apisgrpc.WithRateLimiter("podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens))
|
||||||
|
|
||||||
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
|
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
|
||||||
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
|
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
|
||||||
|
@ -31,8 +31,8 @@ import (
|
|||||||
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||||
|
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||||
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||||
testutils "k8s.io/kubernetes/test/utils"
|
testutils "k8s.io/kubernetes/test/utils"
|
||||||
@ -974,7 +974,7 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
|
|||||||
framework.ExpectNoError(err, "GetV1Client() failed err %v", err)
|
framework.ExpectNoError(err, "GetV1Client() failed err %v", err)
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
|
tries := podresources.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
|
||||||
errs := []error{}
|
errs := []error{}
|
||||||
|
|
||||||
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
|
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
|
||||||
@ -994,7 +994,7 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
|
|||||||
// constraints than to risk flakes at this stage.
|
// constraints than to risk flakes at this stage.
|
||||||
errLimitExceededCount := 0
|
errLimitExceededCount := 0
|
||||||
for _, err := range errs[1:] {
|
for _, err := range errs[1:] {
|
||||||
if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) {
|
if errors.Is(err, apisgrpc.ErrorLimitExceeded) {
|
||||||
errLimitExceededCount++
|
errLimitExceededCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user