1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-18 00:08:17 +00:00

Add shared rate-limiting option.

This commit is contained in:
Kevin McDermott
2025-06-11 13:51:17 +01:00
parent 819c974c5b
commit 94fddd2481
2 changed files with 162 additions and 5 deletions

View File

@@ -2,7 +2,6 @@ package client
import (
"fmt"
"log"
"net/http"
"os"
"strconv"
@@ -17,6 +16,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
)
type Factory struct {
@@ -219,9 +219,9 @@ func updateConfigFromEnvironment(cfg *rest.Config) {
if v := os.Getenv("RANCHER_CLIENT_QPS"); v != "" {
qps, err := strconv.ParseFloat(v, 32)
if err != nil {
log.Printf("steve: configuring client failed to parse RANCHER_CLIENT_QPS: %s", err)
logrus.Infof("steve: configuring client failed to parse RANCHER_CLIENT_QPS: %s", err)
} else {
log.Printf("steve: configuring client.QPS = %v", qps)
logrus.Infof("steve: configuring client.QPS = %v", qps)
cfg.QPS = float32(qps)
}
}
@@ -230,10 +230,24 @@ func updateConfigFromEnvironment(cfg *rest.Config) {
if v := os.Getenv("RANCHER_CLIENT_BURST"); v != "" {
burst, err := strconv.Atoi(v)
if err != nil {
log.Printf("steve: configuring client failed to parse RANCHER_CLIENT_QPS: %s", err)
logrus.Infof("steve: configuring client failed to parse RANCHER_CLIENT_QPS: %s", err)
} else {
log.Printf("steve: configuring client.Burst = %v", burst)
logrus.Infof("steve: configuring client.Burst = %v", burst)
cfg.Burst = burst
}
}
if v := os.Getenv("RANCHER_CLIENT_SHARED_RATELIMIT"); v != "" {
parsed, err := strconv.ParseBool(v)
if err != nil {
logrus.Infof("steve: configuring client failed to parse RANCHER_CLIENT_SHARED_RATELIMIT: %s", err)
} else {
if parsed {
logrus.Info("steve: configuring client.RateLimiter as shared")
// This will prevent new clients being created with the same
// top-level QPS/Burst values as the RateLimiter already exists.
cfg.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(cfg.QPS, cfg.Burst)
}
}
}
}

143
pkg/client/factory_test.go Normal file
View File

@@ -0,0 +1,143 @@
package client
import (
"context"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/rancher/apiserver/pkg/types"
"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)
func TestFactoryBalancing(t *testing.T) {
queryAPI := func(t *testing.T, wg *sync.WaitGroup, f *Factory, failErrors bool) {
defer wg.Done()
dc, err := f.DynamicClient(&types.APIRequest{}, nil)
assert.NoError(t, err)
for i := 0; i < 20; i++ {
_, err = dc.Resource(schema.GroupVersionResource{
Resource: "namespaces",
Version: "v1",
}).Get(context.TODO(), "testing", metav1.GetOptions{})
if err != nil && failErrors {
t.Error(err)
}
}
}
t.Run("Without distributing the QPS across clients", func(t *testing.T) {
var errorCount int32
srv := startTestServer(t, rate.Limit(10), &errorCount)
cfg := &rest.Config{Host: srv.URL}
t.Setenv("RANCHER_CLIENT_QPS", "9.0")
t.Setenv("RANCHER_CLIENT_BURST", "1")
f, err := NewFactory(cfg, false)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
concurrency := 5
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go queryAPI(t, wg, f, false)
}
wg.Wait()
assert.NotZero(t, errorCount)
})
t.Run("When distributing the QPS across clients", func(t *testing.T) {
var errorCount int32
srv := startTestServer(t, rate.Limit(10), &errorCount)
cfg := &rest.Config{Host: srv.URL}
t.Setenv("RANCHER_CLIENT_QPS", "9.0")
t.Setenv("RANCHER_CLIENT_BURST", "1")
t.Setenv("RANCHER_CLIENT_SHARED_RATELIMIT", "true")
f, err := NewFactory(cfg, false)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
concurrency := 5
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go queryAPI(t, wg, f, true)
}
wg.Wait()
assert.Equal(t, int32(0), errorCount)
})
t.Run("When the Distributed QPS is above the rate limit", func(t *testing.T) {
var errorCount int32
srv := startTestServer(t, rate.Limit(10), &errorCount)
cfg := &rest.Config{Host: srv.URL}
t.Setenv("RANCHER_CLIENT_QPS", "11.0")
t.Setenv("RANCHER_CLIENT_BURST", "1")
t.Setenv("RANCHER_CLIENT_SHARED_RATELIMIT", "true")
f, err := NewFactory(cfg, false)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
concurrency := 5
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go queryAPI(t, wg, f, false)
}
wg.Wait()
assert.NotZero(t, errorCount)
})
}
// This provides a fake K8s API server that uses the provided rate.Limit to
// rate-limit requests, responding with 429 if the rate-limiter is limiting
// requests.
//
// It only allows getting a "testing" namespace and responds with a hard-coded
// Namespace resource in JSON format.
//
// The errors value passed in will be incremented every time a 429 response is
// returned to the client (client-go will consume some 429 responses).
func startTestServer(t *testing.T, limit rate.Limit, errors *int32) *httptest.Server {
start := time.Now()
rl := rate.NewLimiter(limit, 1)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/namespaces/testing" {
w.WriteHeader(http.StatusNotFound)
return
}
if !rl.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
atomic.AddInt32(errors, 1)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"apiVersion":"v1","kind":"Namespace","metadata":{"creationTimestamp":"2025-03-27T10:02:44Z","labels":{"kubernetes.io/metadata.name":"testing"},"name":"testing","resourceVersion":"3319","uid":"5b59c95b-6a85-4107-a57e-2240e46086e8"},"spec":{"finalizers":["kubernetes"]},"status": {"phase": "Active"}}`))
}))
t.Cleanup(func() {
t.Logf("%v errors in %v", *errors, time.Since(start))
ts.Close()
})
return ts
}