Merge pull request #105511 from benluddy/apf-delegated-signal

Free APF seats for watches handled by an aggregated apiserver.
This commit is contained in:
Kubernetes Prow Robot 2021-10-21 16:50:37 -07:00 committed by GitHub
commit 313b43a8cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 143 additions and 0 deletions

View File

@ -52,6 +52,17 @@ func WatchInitialized(ctx context.Context) {
}
}
// RequestDelegated informs the priority and fairness dispatcher that
// a given request has been delegated to an aggregated API
// server. No-op when priority and fairness is disabled.
func RequestDelegated(ctx context.Context) {
// The watch initialization signal doesn't traverse request
// boundaries, so we generously fire it as soon as we know
// that the request won't be serviced locally. Safe to call
// for non-watch requests.
WatchInitialized(ctx)
}
// InitializationSignal is an interface that allows sending and handling
// initialization signals.
type InitializationSignal interface {

View File

@ -35,6 +35,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/egressselector"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/x509metrics"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
@ -175,6 +176,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.InterceptRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq)
}

View File

@ -46,6 +46,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@ -899,6 +900,135 @@ func TestProxyCertReload(t *testing.T) {
}
}
type fcInitSignal struct {
nSignals int32
}
func (s *fcInitSignal) SignalCount() int {
return int(atomic.SwapInt32(&s.nSignals, 0))
}
func (s *fcInitSignal) Signal() {
atomic.AddInt32(&s.nSignals, 1)
}
func (s *fcInitSignal) Wait() {
}
type hookedListener struct {
l net.Listener
onAccept func()
}
func (wl *hookedListener) Accept() (net.Conn, error) {
wl.onAccept()
return wl.l.Accept()
}
func (wl *hookedListener) Close() error {
return wl.l.Close()
}
func (wl *hookedListener) Addr() net.Addr {
return wl.l.Addr()
}
func TestFlowControlSignal(t *testing.T) {
for _, tc := range []struct {
Name string
Local bool
Available bool
Request http.Request
SignalExpected bool
}{
{
Name: "local",
Local: true,
SignalExpected: false,
},
{
Name: "unavailable",
Local: false,
Available: false,
SignalExpected: false,
},
{
Name: "request performed",
Local: false,
Available: true,
SignalExpected: true,
},
{
Name: "upgrade request performed",
Local: false,
Available: true,
Request: http.Request{
Header: http.Header{"Connection": []string{"Upgrade"}},
},
SignalExpected: true,
},
} {
t.Run(tc.Name, func(t *testing.T) {
okh := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
var sig fcInitSignal
var signalCountOnAccept int32
backend := httptest.NewUnstartedServer(okh)
backend.Listener = &hookedListener{
l: backend.Listener,
onAccept: func() {
atomic.StoreInt32(&signalCountOnAccept, int32(sig.SignalCount()))
},
}
backend.Start()
defer backend.Close()
p := proxyHandler{
localDelegate: okh,
serviceResolver: &mockedRouter{destinationHost: backend.Listener.Addr().String()},
}
server := httptest.NewServer(contextHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p.ServeHTTP(w, r.WithContext(utilflowcontrol.WithInitializationSignal(r.Context(), &sig)))
}),
&user.DefaultInfo{
Name: "username",
Groups: []string{"one", "two"},
},
))
defer server.Close()
p.handlingInfo.Store(proxyHandlingInfo{
local: tc.Local,
serviceAvailable: tc.Available,
proxyRoundTripper: backend.Client().Transport,
})
surl, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
req := tc.Request
req.URL = surl
_, err = server.Client().Do(&req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if fired := (atomic.LoadInt32(&signalCountOnAccept) > 0); tc.SignalExpected && !fired {
t.Errorf("flow control signal expected but not fired")
} else if fired && !tc.SignalExpected {
t.Errorf("flow control signal fired but not expected")
}
})
}
}
func getCertAndKeyPaths(t *testing.T) (string, string, string) {
dir, err := ioutil.TempDir(os.TempDir(), "k8s-test-handler-proxy-cert")
if err != nil {