Merge pull request #114925 from tkashem/watch-termination

apiserver: terminate watch with rate limiting during shutdown
This commit is contained in:
Kubernetes Prow Robot 2023-02-27 10:26:38 -08:00 committed by GitHub
commit a16fd5467e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 606 additions and 61 deletions

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/wsstream" "k8s.io/apiserver/pkg/util/wsstream"
) )
@ -105,6 +106,11 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
} }
var serverShuttingDownCh <-chan struct{}
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
serverShuttingDownCh = signals.ShuttingDown()
}
ctx := req.Context() ctx := req.Context()
server := &WatchServer{ server := &WatchServer{
@ -132,7 +138,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
return result return result
}, },
TimeoutFactory: &realTimeoutFactory{timeout}, TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
} }
server.ServeHTTP(w, req) server.ServeHTTP(w, req)
@ -156,7 +163,8 @@ type WatchServer struct {
// used to correct the object before we send it to the serializer // used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory TimeoutFactory TimeoutFactory
ServerShuttingDownCh <-chan struct{}
} }
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
@ -230,6 +238,15 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for { for {
select { select {
case <-s.ServerShuttingDownCh:
// the server has signaled that it is shutting down (not accepting
// any new request), all active watch request(s) should return
// immediately here. The WithWatchTerminationDuringShutdown server
// filter will ensure that the response to the client is rate
// limited in order to avoid any thundering herd issue when the
// client(s) try to reestablish the WATCH on the other
// available apiserver instance(s).
return
case <-done: case <-done:
return return
case <-timeoutCh: case <-timeoutCh:

View File

@ -0,0 +1,55 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"context"
)
// The serverShutdownSignalKeyType type is unexported to prevent collisions
type serverShutdownSignalKeyType int
// serverShutdownSignalKey is the context key for storing the
// watch termination interface instance for a WATCH request.
const serverShutdownSignalKey serverShutdownSignalKeyType = iota
// ServerShutdownSignal is associated with the request context so
// the request handler logic has access to signals rlated to
// the server shutdown events
type ServerShutdownSignal interface {
// Signaled when the apiserver is not receiving any new request
ShuttingDown() <-chan struct{}
}
// ServerShutdownSignalFrom returns the ServerShutdownSignal instance
// associated with the request context.
// If there is no ServerShutdownSignal asscoaied with the context,
// nil is returned.
func ServerShutdownSignalFrom(ctx context.Context) ServerShutdownSignal {
ev, _ := ctx.Value(serverShutdownSignalKey).(ServerShutdownSignal)
return ev
}
// WithServerShutdownSignal returns a new context that stores
// the ServerShutdownSignal interface instance.
func WithServerShutdownSignal(parent context.Context, window ServerShutdownSignal) context.Context {
if ServerShutdownSignalFrom(parent) != nil {
return parent // Avoid double registering.
}
return context.WithValue(parent, serverShutdownSignalKey, window)
}

View File

@ -161,6 +161,10 @@ type Config struct {
// handlers associated with non long-running requests // handlers associated with non long-running requests
// to complete while the server is shuting down. // to complete while the server is shuting down.
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
// WatchRequestWaitGroup allows us to wait for all chain
// handlers associated with active watch requests to
// complete while the server is shuting down.
WatchRequestWaitGroup *utilwaitgroup.RateLimitedSafeWaitGroup
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is // DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
// always reported // always reported
DiscoveryAddresses discovery.Addresses DiscoveryAddresses discovery.Addresses
@ -272,6 +276,23 @@ type Config struct {
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form. // AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
// is the maximum duration the apiserver will wait for all active
// watch request(s) to drain.
// Once this grace period elapses, the apiserver will no longer
// wait for any active watch request(s) in flight to drain, it will
// proceed to the next step in the graceful server shutdown process.
// If set to a positive value, the apiserver will keep track of the
// number of active watch request(s) in flight and during shutdown
// it will wait, at most, for the specified duration and allow these
// active watch requests to drain with some rate limiting in effect.
// The default is zero, which implies the apiserver will not keep
// track of active watch request(s) in flight and will not wait
// for them to drain, this maintains backward compatibility.
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration
} }
type RecommendedConfig struct { type RecommendedConfig struct {
@ -371,6 +392,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
Serializer: codecs, Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain, BuildHandlerChainFunc: DefaultBuildHandlerChain,
NonLongRunningRequestWaitGroup: new(utilwaitgroup.SafeWaitGroup), NonLongRunningRequestWaitGroup: new(utilwaitgroup.SafeWaitGroup),
WatchRequestWaitGroup: &utilwaitgroup.RateLimitedSafeWaitGroup{},
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(), DisabledPostStartHooks: sets.NewString(),
PostStartHooks: map[string]PostStartHookConfigEntry{}, PostStartHooks: map[string]PostStartHookConfigEntry{},
@ -408,9 +430,10 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Default to treating watch as a long-running operation // Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources // Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: lifecycleSignals, lifecycleSignals: lifecycleSignals,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
ShutdownWatchTerminationGracePeriod: time.Duration(0),
APIServerID: id, APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(), StorageVersionManager: storageversion.NewDefaultManager(),
@ -670,16 +693,18 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
delegationTarget: delegationTarget, delegationTarget: delegationTarget,
EquivalentResourceRegistry: c.EquivalentResourceRegistry, EquivalentResourceRegistry: c.EquivalentResourceRegistry,
NonLongRunningRequestWaitGroup: c.NonLongRunningRequestWaitGroup, NonLongRunningRequestWaitGroup: c.NonLongRunningRequestWaitGroup,
WatchRequestWaitGroup: c.WatchRequestWaitGroup,
Handler: apiServerHandler, Handler: apiServerHandler,
UnprotectedDebugSocket: debugSocket, UnprotectedDebugSocket: debugSocket,
listedPathProvider: apiServerHandler, listedPathProvider: apiServerHandler,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout, ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration, ShutdownDelayDuration: c.ShutdownDelayDuration,
SecureServingInfo: c.SecureServing, ShutdownWatchTerminationGracePeriod: c.ShutdownWatchTerminationGracePeriod,
ExternalAddress: c.ExternalAddress, SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,
openAPIConfig: c.OpenAPIConfig, openAPIConfig: c.OpenAPIConfig,
openAPIV3Config: c.OpenAPIV3Config, openAPIV3Config: c.OpenAPIV3Config,
@ -907,6 +932,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout) c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup) handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
if c.ShutdownWatchTerminationGracePeriod > 0 {
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
}
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 { if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance) handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
} }

View File

@ -0,0 +1,62 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"errors"
"net/http"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)
func WithWatchTerminationDuringShutdown(handler http.Handler, termination apirequest.ServerShutdownSignal, wg RequestWaitGroup) http.Handler {
if termination == nil || wg == nil {
klog.Warningf("watch termination during shutdown not attached to the handler chain")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
return
}
if !watchVerbs.Has(requestInfo.Verb) {
handler.ServeHTTP(w, req)
return
}
if err := wg.Add(1); err != nil {
// When apiserver is shutting down, signal clients to retry
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
waitGroupWriteRetryAfterToResponse(w)
return
}
// attach ServerShutdownSignal to the watch request so that the
// watch handler loop can return as soon as the server signals
// that it is shutting down.
ctx = apirequest.WithServerShutdownSignal(req.Context(), termination)
req = req.WithContext(ctx)
defer wg.Done()
handler.ServeHTTP(w, req)
})
}

View File

@ -0,0 +1,166 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func TestWithWatchTerminationDuringShutdown(t *testing.T) {
tests := []struct {
name string
requestInfo *apirequest.RequestInfo
signal *fakeServerShutdownSignal
wg *fakeRequestWaitGroup
handlerInvoked int
statusCodeExpected int
retryAfterExpected bool
wgInvokedExpected int
signalAttachedToContext bool
}{
{
name: "no RequestInfo attached to request context",
handlerInvoked: 0,
statusCodeExpected: http.StatusInternalServerError,
},
{
name: "request is not a WATCH, not added into wait group",
requestInfo: &apirequest.RequestInfo{Verb: "get"},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
},
{
name: "request is a WATCH, wait group is in waiting mode",
requestInfo: &apirequest.RequestInfo{Verb: "watch"},
wg: &fakeRequestWaitGroup{waiting: true},
handlerInvoked: 0,
signalAttachedToContext: false,
wgInvokedExpected: 1,
retryAfterExpected: true,
statusCodeExpected: http.StatusServiceUnavailable,
},
{
name: "request is a WATCH, wait group is accepting",
requestInfo: &apirequest.RequestInfo{Verb: "watch"},
wg: &fakeRequestWaitGroup{},
signal: &fakeServerShutdownSignal{},
wgInvokedExpected: 1,
signalAttachedToContext: true,
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
handlerInvokedGot int
signalGot *fakeServerShutdownSignal
)
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
handlerInvokedGot++
if signal := apirequest.ServerShutdownSignalFrom(req.Context()); signal != nil {
signalGot, _ = signal.(*fakeServerShutdownSignal)
}
w.WriteHeader(http.StatusOK)
})
handler := WithWatchTerminationDuringShutdown(delegate, test.signal, test.wg)
req, err := http.NewRequest(http.MethodGet, "/apis/groups.k8s.io/v1/namespaces", nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
if test.requestInfo != nil {
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo))
}
w := httptest.NewRecorder()
w.Code = 0
handler.ServeHTTP(w, req)
responseGot := w.Result()
if test.handlerInvoked != handlerInvokedGot {
t.Errorf("expected the handler to be invoked: %d timed, but got: %d", test.handlerInvoked, handlerInvokedGot)
}
if test.statusCodeExpected != responseGot.StatusCode {
t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode)
}
retryAfterGot := retryAfterSent(responseGot)
if test.retryAfterExpected != retryAfterGot {
t.Errorf("expected retry-after: %t, but got: %t, response: %v#", test.retryAfterExpected, retryAfterGot, responseGot)
}
switch {
case test.signalAttachedToContext:
if test.signal == nil || test.signal != signalGot {
t.Errorf("expected request context to have server shutdown signal: %p, but got: %p", test.signal, signalGot)
}
default:
if signalGot != nil {
t.Errorf("expected request context to not have server shutdown signal: %p, but got: %p", test.signal, signalGot)
}
}
if test.wg == nil {
return
}
if test.wg.inflight != 0 {
t.Errorf("expected wait group inflight to be zero, but got: %d", test.wg.inflight)
}
if test.wgInvokedExpected != test.wg.invoked {
t.Errorf("expected wait group Add to be invoked: %d times, but got: %d", test.wgInvokedExpected, test.wg.invoked)
}
})
}
}
type fakeServerShutdownSignal struct{}
func (fakeServerShutdownSignal) ShuttingDown() <-chan struct{} { return nil }
type fakeRequestWaitGroup struct {
waiting bool
invoked, inflight int
}
func (f *fakeRequestWaitGroup) Add(delta int) error {
f.invoked++
if f.waiting {
return fmt.Errorf("waitgroup is in waiting mode")
}
f.inflight += delta
return nil
}
func (f *fakeRequestWaitGroup) Done() { f.inflight-- }
func retryAfterSent(resp *http.Response) bool {
switch {
case resp.StatusCode == http.StatusServiceUnavailable &&
resp.Header.Get("Retry-After") == "1" &&
resp.Header.Get("Content-Type") == runtime.ContentTypeJSON &&
resp.Header.Get("X-Content-Type-Options") == "nosniff":
return true
default:
return false
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server package server
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
gpath "path" gpath "path"
@ -26,6 +27,7 @@ import (
systemd "github.com/coreos/go-systemd/v22/daemon" systemd "github.com/coreos/go-systemd/v22/daemon"
"golang.org/x/time/rate"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -221,6 +223,10 @@ type GenericAPIServer struct {
// handlers associated with non long-running requests // handlers associated with non long-running requests
// to complete while the server is shuting down. // to complete while the server is shuting down.
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
// WatchRequestWaitGroup allows us to wait for all chain
// handlers associated with active watch requests to
// complete while the server is shuting down.
WatchRequestWaitGroup *utilwaitgroup.RateLimitedSafeWaitGroup
// ShutdownDelayDuration allows to block shutdown for some time, e.g. until endpoints pointing to this API server // ShutdownDelayDuration allows to block shutdown for some time, e.g. until endpoints pointing to this API server
// have converged on all node. During this time, the API server keeps serving, /healthz will return 200, // have converged on all node. During this time, the API server keeps serving, /healthz will return 200,
@ -260,6 +266,23 @@ type GenericAPIServer struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is // If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response. // rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool ShutdownSendRetryAfter bool
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
// is the maximum duration the apiserver will wait for all active
// watch request(s) to drain.
// Once this grace period elapses, the apiserver will no longer
// wait for any active watch request(s) in flight to drain, it will
// proceed to the next step in the graceful server shutdown process.
// If set to a positive value, the apiserver will keep track of the
// number of active watch request(s) in flight and during shutdown
// it will wait, at most, for the specified duration and allow these
// active watch requests to drain with some rate limiting in effect.
// The default is zero, which implies the apiserver will not keep
// track of active watch request(s) in flight and will not wait
// for them to drain, this maintains backward compatibility.
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration
} }
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works // DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -447,23 +470,27 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// | NotAcceptingNewRequest (notAcceptingNewRequestCh) // | NotAcceptingNewRequest (notAcceptingNewRequestCh)
// | | // | |
// | | // | |
// | |---------------------------------------------------------| // | |----------------------------------------------------------------------------------|
// | | | | | // | | | | |
// | [without [with | | // | [without [with | |
// | ShutdownSendRetryAfter] ShutdownSendRetryAfter] | | // | ShutdownSendRetryAfter] ShutdownSendRetryAfter] | |
// | | | | | // | | | | |
// | | ---------------| | // | | ---------------| |
// | | | | // | | | |
// | | (NonLongRunningRequestWaitGroup::Wait) | // | | |----------------|-----------------------| |
// | | | | // | | | | |
// | | InFlightRequestsDrained (drainedCh) | // | | (NonLongRunningRequestWaitGroup::Wait) (WatchRequestWaitGroup::Wait) |
// | | | | // | | | | |
// | ----------------------------------------|-----------------| // | | |------------------|---------------------| |
// | | | // | | | |
// | | InFlightRequestsDrained (drainedCh) |
// | | | |
// | |-------------------|---------------------|----------------------------------------|
// | | |
// | stopHttpServerCh (AuditBackend::Shutdown()) // | stopHttpServerCh (AuditBackend::Shutdown())
// | | // | |
// | listenerStoppedCh // | listenerStoppedCh
// | | // | |
// | HTTPServerStoppedListening (httpServerStoppedListeningCh) // | HTTPServerStoppedListening (httpServerStoppedListeningCh)
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
@ -576,9 +603,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
<-preShutdownHooksHasStoppedCh.Signaled() <-preShutdownHooksHasStoppedCh.Signaled()
}() }()
// wait for all in-flight non-long running requests to finish
nonLongRunningRequestDrainedCh := make(chan struct{})
go func() { go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) defer close(nonLongRunningRequestDrainedCh)
defer drainedCh.Signal() defer klog.V(1).Info("[graceful-termination] in-flight non long-running request(s) have drained")
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-notAcceptingNewRequestCh.Signaled() <-notAcceptingNewRequestCh.Signaled()
@ -599,6 +628,47 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
s.NonLongRunningRequestWaitGroup.Wait() s.NonLongRunningRequestWaitGroup.Wait()
}() }()
// wait for all in-flight watches to finish
activeWatchesDrainedCh := make(chan struct{})
go func() {
defer close(activeWatchesDrainedCh)
<-notAcceptingNewRequestCh.Signaled()
if s.ShutdownWatchTerminationGracePeriod <= time.Duration(0) {
klog.V(1).InfoS("[graceful-termination] not going to wait for active watch request(s) to drain")
return
}
// Wait for all active watches to finish
grace := s.ShutdownWatchTerminationGracePeriod
activeBefore, activeAfter, err := s.WatchRequestWaitGroup.Wait(func(count int) (utilwaitgroup.RateLimiter, context.Context, context.CancelFunc) {
qps := float64(count) / grace.Seconds()
// TODO: we don't want the QPS (max requests drained per second) to
// get below a certain floor value, since we want the server to
// drain the active watch requests as soon as possible.
// For now, it's hard coded to 200, and it is subject to change
// based on the result from the scale testing.
if qps < 200 {
qps = 200
}
ctx, cancel := context.WithTimeout(context.Background(), grace)
// We don't expect more than one token to be consumed
// in a single Wait call, so setting burst to 1.
return rate.NewLimiter(rate.Limit(qps), 1), ctx, cancel
})
klog.V(1).InfoS("[graceful-termination] active watch request(s) have drained",
"duration", grace, "activeWatchesBefore", activeBefore, "activeWatchesAfter", activeAfter, "error", err)
}()
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer drainedCh.Signal()
<-nonLongRunningRequestDrainedCh
<-activeWatchesDrainedCh
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated") klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh <-stopCh

View File

@ -38,6 +38,7 @@ import (
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -147,7 +148,7 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep {
// | close(stopHttpServerCh) NonLongRunningRequestWaitGroup.Wait() // | close(stopHttpServerCh) NonLongRunningRequestWaitGroup.Wait()
// | | | // | | |
// | server.Shutdown(timeout=60s) | // | server.Shutdown(timeout=60s) |
// | | | // | | WatchRequestWaitGroup.Wait()
// | stop listener (net/http) | // | stop listener (net/http) |
// | | | // | | |
// | |-------------------------------------| | // | |-------------------------------------| |
@ -176,8 +177,10 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
connReusingClient := newClient(false) connReusingClient := newClient(false)
doer := setupDoer(t, s.SecureServingInfo) doer := setupDoer(t, s.SecureServingInfo)
// handler for a request that we want to keep in flight through to the end // handler for a non long-running and a watch request that
inflightRequest := setupInFlightReuestHandler(s) // we want to keep in flight through to the end.
inflightNonLongRunning := setupInFlightNonLongRunningRequestHandler(s)
inflightWatch := setupInFlightWatchRequestHandler(s)
// API calls from the pre-shutdown hook(s) must succeed up to // API calls from the pre-shutdown hook(s) must succeed up to
// the point where the HTTP server is shut down. // the point where the HTTP server is shut down.
@ -204,10 +207,13 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}() }()
waitForAPIServerStarted(t, doer) waitForAPIServerStarted(t, doer)
// fire a request now so it is in-flight on the server now, and // fire the non long-running and the watch request so it is
// we will unblock it after ShutdownDelayDuration elapses // in-flight on the server now, and we will unblock them
inflightRequest.launch(doer, connReusingClient) // after ShutdownDelayDuration elapses.
waitForeverUntil(t, inflightRequest.startedCh, "in-flight request did not reach the server") inflightNonLongRunning.launch(doer, connReusingClient)
waitForeverUntil(t, inflightNonLongRunning.startedCh, "in-flight non long-running request did not reach the server")
inflightWatch.launch(doer, connReusingClient)
waitForeverUntil(t, inflightWatch.startedCh, "in-flight watch request did not reach the server")
// /readyz should return OK // /readyz should return OK
resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second)
@ -300,13 +306,21 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
t.Errorf("Expected error %v, but got: %v %v", syscall.ECONNREFUSED, resultGot.err, resultGot.response) t.Errorf("Expected error %v, but got: %v %v", syscall.ECONNREFUSED, resultGot.err, resultGot.response)
} }
// the server has stopped listening but we still have a request // the server has stopped listening but we still have a non long-running,
// in flight, let it unblock and we expect the request to succeed. // and a watch request in flight, unblock both of these, and we expect
inFlightResultGot := inflightRequest.unblockAndWaitForResult(t) // the requests to return appropriate response to the caller.
if err := assertResponseStatusCode(inFlightResultGot, http.StatusOK); err != nil { inflightNonLongRunningResultGot := inflightNonLongRunning.unblockAndWaitForResult(t)
if err := assertResponseStatusCode(inflightNonLongRunningResultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error()) t.Errorf("%s", err.Error())
} }
if err := assertRequestAudited(inFlightResultGot, fakeAudit); err != nil { if err := assertRequestAudited(inflightNonLongRunningResultGot, fakeAudit); err != nil {
t.Errorf("%s", err.Error())
}
inflightWatchResultGot := inflightWatch.unblockAndWaitForResult(t)
if err := assertResponseStatusCode(inflightWatchResultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
if err := assertRequestAudited(inflightWatchResultGot, fakeAudit); err != nil {
t.Errorf("%s", err.Error()) t.Errorf("%s", err.Error())
} }
@ -359,6 +373,8 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
// | | // | |
// | NonLongRunningRequestWaitGroup.Wait() // | NonLongRunningRequestWaitGroup.Wait()
// | | // | |
// | WatchRequestWaitGroup.Wait()
// | |
// | (InFlightRequestsDrained) // | (InFlightRequestsDrained)
// | | // | |
// | | // | |
@ -384,8 +400,10 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
connReusingClient := newClient(false) connReusingClient := newClient(false)
doer := setupDoer(t, s.SecureServingInfo) doer := setupDoer(t, s.SecureServingInfo)
// handler for a request that we want to keep in flight through to the end // handler for a non long-running and a watch request that
inflightRequest := setupInFlightReuestHandler(s) // we want to keep in flight through to the end.
inflightNonLongRunning := setupInFlightNonLongRunningRequestHandler(s)
inflightWatch := setupInFlightWatchRequestHandler(s)
// API calls from the pre-shutdown hook(s) must succeed up to // API calls from the pre-shutdown hook(s) must succeed up to
// the point where the HTTP server is shut down. // the point where the HTTP server is shut down.
@ -412,10 +430,13 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}() }()
waitForAPIServerStarted(t, doer) waitForAPIServerStarted(t, doer)
// fire a request now so it is in-flight on the server now, and // fire the non long-running and the watch request so it is
// we will unblock it after ShutdownDelayDuration elapses // in-flight on the server now, and we will unblock them
inflightRequest.launch(doer, connReusingClient) // after ShutdownDelayDuration elapses.
waitForeverUntil(t, inflightRequest.startedCh, "in-flight request did not reach the server") inflightNonLongRunning.launch(doer, connReusingClient)
waitForeverUntil(t, inflightNonLongRunning.startedCh, "in-flight request did not reach the server")
inflightWatch.launch(doer, connReusingClient)
waitForeverUntil(t, inflightWatch.startedCh, "in-flight watch request did not reach the server")
// /readyz should return OK // /readyz should return OK
resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second)
@ -487,12 +508,21 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
t.Errorf("%s", err.Error()) t.Errorf("%s", err.Error())
} }
// we still have a request in flight, let it unblock and we expect the request to succeed. // we still have a non long-running, and a watch request in flight,
inFlightResultGot := inflightRequest.unblockAndWaitForResult(t) // unblock both of these, and we expect the requests
if err := assertResponseStatusCode(inFlightResultGot, http.StatusOK); err != nil { // to return appropriate response to the caller.
inflightNonLongRunningResultGot := inflightNonLongRunning.unblockAndWaitForResult(t)
if err := assertResponseStatusCode(inflightNonLongRunningResultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error()) t.Errorf("%s", err.Error())
} }
if err := assertRequestAudited(inFlightResultGot, fakeAudit); err != nil { if err := assertRequestAudited(inflightNonLongRunningResultGot, fakeAudit); err != nil {
t.Errorf("%s", err.Error())
}
inflightWatchResultGot := inflightWatch.unblockAndWaitForResult(t)
if err := assertResponseStatusCode(inflightWatchResultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
if err := assertRequestAudited(inflightWatchResultGot, fakeAudit); err != nil {
t.Errorf("%s", err.Error()) t.Errorf("%s", err.Error())
} }
@ -663,12 +693,12 @@ type inFlightRequest struct {
url string url string
} }
func setupInFlightReuestHandler(s *GenericAPIServer) *inFlightRequest { func setupInFlightNonLongRunningRequestHandler(s *GenericAPIServer) *inFlightRequest {
inflight := &inFlightRequest{ inflight := &inFlightRequest{
blockedCh: make(chan struct{}), blockedCh: make(chan struct{}),
startedCh: make(chan struct{}), startedCh: make(chan struct{}),
resultCh: make(chan result), resultCh: make(chan result),
url: "/in-flight-request-as-designed", url: "/in-flight-non-long-running-request-as-designed",
} }
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(inflight.startedCh) close(inflight.startedCh)
@ -680,6 +710,37 @@ func setupInFlightReuestHandler(s *GenericAPIServer) *inFlightRequest {
return inflight return inflight
} }
func setupInFlightWatchRequestHandler(s *GenericAPIServer) *inFlightRequest {
inflight := &inFlightRequest{
blockedCh: make(chan struct{}),
startedCh: make(chan struct{}),
resultCh: make(chan result),
url: "/apis/watches.group/v1/namespaces/foo/bar?watch=true",
}
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(inflight.startedCh)
// this request handler blocks until we deliberately unblock it.
<-inflight.blockedCh
// this simulates a watch well enough for our test
signals := apirequest.ServerShutdownSignalFrom(req.Context())
if signals == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
select {
case <-signals.ShuttingDown():
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusInternalServerError)
})
s.Handler.NonGoRestfulMux.Handle("/apis/watches.group/v1/namespaces/foo/bar", handler)
return inflight
}
func (ifr *inFlightRequest) launch(doer doer, client *http.Client) { func (ifr *inFlightRequest) launch(doer doer, client *http.Client) {
go func() { go func() {
result := doer.Do(client, func(httptrace.GotConnInfo) {}, ifr.url, 0) result := doer.Do(client, func(httptrace.GotConnInfo) {}, ifr.url, 0)
@ -950,6 +1011,8 @@ func newGenericAPIServer(t *testing.T, fAudit *fakeAudit, keepListening bool) *G
config, _ := setUp(t) config, _ := setUp(t)
config.ShutdownDelayDuration = 100 * time.Millisecond config.ShutdownDelayDuration = 100 * time.Millisecond
config.ShutdownSendRetryAfter = keepListening config.ShutdownSendRetryAfter = keepListening
// we enable watch draining, any positive value will do that
config.ShutdownWatchTerminationGracePeriod = 2 * time.Second
config.AuditPolicyRuleEvaluator = fAudit config.AuditPolicyRuleEvaluator = fAudit
config.AuditBackend = fAudit config.AuditBackend = fAudit

View File

@ -146,6 +146,14 @@ type lifecycleSignals struct {
MuxAndDiscoveryComplete lifecycleSignal MuxAndDiscoveryComplete lifecycleSignal
} }
// ShuttingDown returns the lifecycle signal that is signaled when
// the server is not accepting any new requests.
// this is the lifecycle event that is exported to the request handler
// logic to indicate that the server is shutting down.
func (s lifecycleSignals) ShuttingDown() <-chan struct{} {
return s.NotAcceptingNewRequest.Signaled()
}
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used // newLifecycleSignals returns an instance of lifecycleSignals interface to be used
// to coordinate lifecycle of the apiserver // to coordinate lifecycle of the apiserver
func newLifecycleSignals() lifecycleSignals { func newLifecycleSignals() lifecycleSignals {

View File

@ -73,21 +73,39 @@ type ServerRunOptions struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is // If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response. // rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool ShutdownSendRetryAfter bool
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
// is the maximum duration the apiserver will wait for all active
// watch request(s) to drain.
// Once this grace period elapses, the apiserver will no longer
// wait for any active watch request(s) in flight to drain, it will
// proceed to the next step in the graceful server shutdown process.
// If set to a positive value, the apiserver will keep track of the
// number of active watch request(s) in flight and during shutdown
// it will wait, at most, for the specified duration and allow these
// active watch requests to drain with some rate limiting in effect.
// The default is zero, which implies the apiserver will not keep
// track of active watch request(s) in flight and will not wait
// for them to drain, this maintains backward compatibility.
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration
} }
func NewServerRunOptions() *ServerRunOptions { func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{}) defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{ return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight, MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout, RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod, LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout, MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration, ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
EnablePriorityAndFairness: true, MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
ShutdownSendRetryAfter: false, EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
} }
} }
@ -107,6 +125,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
c.PublicAddress = s.AdvertiseAddress c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
c.ShutdownWatchTerminationGracePeriod = s.ShutdownWatchTerminationGracePeriod
return nil return nil
} }
@ -160,6 +179,10 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value")) errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
} }
if s.ShutdownWatchTerminationGracePeriod < 0 {
errors = append(errors, fmt.Errorf("shutdown-watch-termination-grace-period, if provided, can not be a negative value"))
}
if s.JSONPatchMaxCopyBytes < 0 { if s.JSONPatchMaxCopyBytes < 0 {
errors = append(errors, fmt.Errorf("ServerRunOptions.JSONPatchMaxCopyBytes can not be negative value")) errors = append(errors, fmt.Errorf("ServerRunOptions.JSONPatchMaxCopyBytes can not be negative value"))
} }
@ -315,5 +338,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
fs.DurationVar(&s.ShutdownWatchTerminationGracePeriod, "shutdown-watch-termination-grace-period", s.ShutdownWatchTerminationGracePeriod, ""+
"This option, if set, represents the maximum amount of grace period the apiserver will wait "+
"for active watch request(s) to drain during the graceful server shutdown window.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fs) utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
} }

View File

@ -261,3 +261,52 @@ func TestValidateCorsAllowedOriginList(t *testing.T) {
} }
} }
} }
func TestServerRunOptionsWithShutdownWatchTerminationGracePeriod(t *testing.T) {
tests := []struct {
name string
optionsFn func() *ServerRunOptions
errShouldContain string
}{
{
name: "default should be valid",
optionsFn: func() *ServerRunOptions {
return NewServerRunOptions()
},
},
{
name: "negative not allowed",
optionsFn: func() *ServerRunOptions {
o := NewServerRunOptions()
o.ShutdownWatchTerminationGracePeriod = -time.Second
return o
},
errShouldContain: "shutdown-watch-termination-grace-period, if provided, can not be a negative value",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
options := test.optionsFn()
errsGot := options.Validate()
switch {
case len(test.errShouldContain) == 0:
if len(errsGot) != 0 {
t.Errorf("expected no error, but got: %v", errsGot)
}
default:
if len(errsGot) == 0 ||
!strings.Contains(utilerrors.NewAggregate(errsGot).Error(), test.errShouldContain) {
t.Errorf("expected error to contain: %s, but got: %v", test.errShouldContain, errsGot)
}
}
})
}
t.Run("default should be zero", func(t *testing.T) {
options := NewServerRunOptions()
if options.ShutdownWatchTerminationGracePeriod != time.Duration(0) {
t.Errorf("expected default of ShutdownWatchTerminationGracePeriod to be zero, but got: %s", options.ShutdownWatchTerminationGracePeriod)
}
})
}