From 1873915be6be40df20fe35a8e0c7e8e0a620111a Mon Sep 17 00:00:00 2001 From: Ben Luddy Date: Wed, 6 Oct 2021 10:16:46 -0400 Subject: [PATCH] Free APF seats for watches handled by an aggregated apiserver. --- .../pkg/util/flowcontrol/apf_context.go | 11 ++ .../pkg/apiserver/handler_proxy.go | 2 + .../pkg/apiserver/handler_proxy_test.go | 130 ++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go index 6497e3fff5e..1cd59049de3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go @@ -52,6 +52,17 @@ func WatchInitialized(ctx context.Context) { } } +// RequestDelegated informs the priority and fairness dispatcher that +// a given request has been delegated to an aggregated API +// server. No-op when priority and fairness is disabled. +func RequestDelegated(ctx context.Context) { + // The watch initialization signal doesn't traverse request + // boundaries, so we generously fire it as soon as we know + // that the request won't be serviced locally. Safe to call + // for non-watch requests. + WatchInitialized(ctx) +} + // InitializationSignal is an interface that allows sending and handling // initialization signals. type InitializationSignal interface { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 22b7562f9c9..b2122ae6a08 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -35,6 +35,7 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server/egressselector" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/apiserver/pkg/util/x509metrics" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" @@ -175,6 +176,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) handler.InterceptRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects) + utilflowcontrol.RequestDelegated(req.Context()) handler.ServeHTTP(w, newReq) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index def125a528f..f5a832bf7ed 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/egressselector" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -899,6 +900,135 @@ func TestProxyCertReload(t *testing.T) { } } +type fcInitSignal struct { + nSignals int32 +} + +func (s *fcInitSignal) SignalCount() int { + return int(atomic.SwapInt32(&s.nSignals, 0)) +} + +func (s *fcInitSignal) Signal() { + atomic.AddInt32(&s.nSignals, 1) +} + +func (s *fcInitSignal) Wait() { +} + +type hookedListener struct { + l net.Listener + onAccept func() +} + +func (wl *hookedListener) Accept() (net.Conn, error) { + wl.onAccept() + return wl.l.Accept() +} + +func (wl *hookedListener) Close() error { + return wl.l.Close() +} + +func (wl *hookedListener) Addr() net.Addr { + return wl.l.Addr() +} + +func TestFlowControlSignal(t *testing.T) { + for _, tc := range []struct { + Name string + Local bool + Available bool + Request http.Request + SignalExpected bool + }{ + { + Name: "local", + Local: true, + SignalExpected: false, + }, + { + Name: "unavailable", + Local: false, + Available: false, + SignalExpected: false, + }, + { + Name: "request performed", + Local: false, + Available: true, + SignalExpected: true, + }, + { + Name: "upgrade request performed", + Local: false, + Available: true, + Request: http.Request{ + Header: http.Header{"Connection": []string{"Upgrade"}}, + }, + SignalExpected: true, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + okh := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + var sig fcInitSignal + + var signalCountOnAccept int32 + backend := httptest.NewUnstartedServer(okh) + backend.Listener = &hookedListener{ + l: backend.Listener, + onAccept: func() { + atomic.StoreInt32(&signalCountOnAccept, int32(sig.SignalCount())) + }, + } + backend.Start() + defer backend.Close() + + p := proxyHandler{ + localDelegate: okh, + serviceResolver: &mockedRouter{destinationHost: backend.Listener.Addr().String()}, + } + + server := httptest.NewServer(contextHandler( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p.ServeHTTP(w, r.WithContext(utilflowcontrol.WithInitializationSignal(r.Context(), &sig))) + }), + &user.DefaultInfo{ + Name: "username", + Groups: []string{"one", "two"}, + }, + )) + defer server.Close() + + p.handlingInfo.Store(proxyHandlingInfo{ + local: tc.Local, + serviceAvailable: tc.Available, + proxyRoundTripper: backend.Client().Transport, + }) + + surl, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + req := tc.Request + req.URL = surl + _, err = server.Client().Do(&req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if fired := (atomic.LoadInt32(&signalCountOnAccept) > 0); tc.SignalExpected && !fired { + t.Errorf("flow control signal expected but not fired") + } else if fired && !tc.SignalExpected { + t.Errorf("flow control signal fired but not expected") + } + }) + } +} + func getCertAndKeyPaths(t *testing.T) (string, string, string) { dir, err := ioutil.TempDir(os.TempDir(), "k8s-test-handler-proxy-cert") if err != nil {