Merge pull request #116459 from ffromani/podresources-ratelimit-minimal

add podresources DOS prevention using rate limit
This commit is contained in:
Kubernetes Prow Robot 2023-03-14 08:36:45 -07:00 committed by GitHub
commit 204a9a1f17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 132 additions and 5 deletions

View File

@ -0,0 +1,67 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
gotimerate "golang.org/x/time/rate"
"k8s.io/klog/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// DefaultQPS is determined by empirically reviewing known consumers of the API.
// It's at least unlikely that there is a legitimate need to query podresources
// more than 100 times per second, the other subsystems are not guaranteed to react
// so fast in the first place.
DefaultQPS = 100
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
// See the documentation of DefaultQPS, same caveats apply.
DefaultBurstTokens = 10
)
var (
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
)
// Limiter defines the interface to perform request rate limiting,
// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter
type Limiter interface {
// Allow reports whether an event may happen now.
Allow() bool
}
// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.
func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !limiter.Allow() {
return nil, ErrorLimitExceeded
}
return handler(ctx, req)
}
}
func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption {
qpsVal := gotimerate.Limit(qps)
burstVal := int(burstTokens)
klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal)
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
}

View File

@ -2751,7 +2751,15 @@ func (kl *Kubelet) ListenAndServePodResources() {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager)
providers := server.PodResourcesProviders{
Pods: kl.podManager,
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
}
server.ListenAndServePodResources(socket, providers)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

View File

@ -75,6 +75,7 @@ import (
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
@ -209,11 +210,20 @@ func ListenAndServeKubeletReadOnlyServer(
}
}
type PodResourcesProviders struct {
Pods podresources.PodsProvider
Devices podresources.DevicesProvider
Cpus podresources.CPUsProvider
Memory podresources.MemoryProvider
}
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider, memoryProvider podresources.MemoryProvider) {
server := grpc.NewServer()
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider, memoryProvider))
func ListenAndServePodResources(socket string, providers PodResourcesProviders) {
server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory))
l, err := util.CreateListener(socket)
if err != nil {
klog.ErrorS(err, "Failed to create listener for podResources endpoint")

View File

@ -18,6 +18,7 @@ package e2enode
import (
"context"
"errors"
"fmt"
"os"
"strings"
@ -31,6 +32,7 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
ginkgo.By("Ensuring the metrics match the expectations a few more times")
gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
})
})
ginkgo.Context("with the builtin rate limit values", func() {
ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) {
// ensure APIs have been called at least once
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
ginkgo.By("Connecting to the kubelet endpoint")
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
errs := []error{}
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
startTime := time.Now()
for try := 0; try < tries; try++ {
_, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
errs = append(errs, err)
}
elapsed := time.Since(startTime)
ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed))
framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0])
// we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with
// enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while
// we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax
// constraints than to risk flakes at this stage.
errLimitExceededCount := 0
for _, err := range errs[1:] {
if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) {
errLimitExceededCount++
}
}
gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed)
framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries)
})
})
})