apiserver: watch gets notified on server shutdown signal

This commit is contained in:
Abu Kashem 2023-01-13 18:02:41 -05:00
parent d49bff855f
commit 697d967108
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
3 changed files with 82 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/wsstream"
)
@ -105,6 +106,11 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
var serverShuttingDownCh <-chan struct{}
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
serverShuttingDownCh = signals.ShuttingDown()
}
ctx := req.Context()
server := &WatchServer{
@ -132,7 +138,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
}
server.ServeHTTP(w, req)
@ -156,7 +163,8 @@ type WatchServer struct {
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory
TimeoutFactory TimeoutFactory
ServerShuttingDownCh <-chan struct{}
}
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
@ -230,6 +238,15 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for {
select {
case <-s.ServerShuttingDownCh:
// the server has signaled that it is shutting down (not accepting
// any new request), all active watch request(s) should return
// immediately here. The WithWatchTerminationDuringShutdown server
// filter will ensure that the response to the client is rate
// limited in order to avoid any thundering herd issue when the
// client(s) try to reestablish the WATCH on the other
// available apiserver instance(s).
return
case <-done:
return
case <-timeoutCh:

View File

@ -0,0 +1,55 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"context"
)
// The serverShutdownSignalKeyType type is unexported to prevent collisions
type serverShutdownSignalKeyType int
// serverShutdownSignalKey is the context key for storing the
// watch termination interface instance for a WATCH request.
const serverShutdownSignalKey serverShutdownSignalKeyType = iota
// ServerShutdownSignal is associated with the request context so
// the request handler logic has access to signals rlated to
// the server shutdown events
type ServerShutdownSignal interface {
// Signaled when the apiserver is not receiving any new request
ShuttingDown() <-chan struct{}
}
// ServerShutdownSignalFrom returns the ServerShutdownSignal instance
// associated with the request context.
// If there is no ServerShutdownSignal asscoaied with the context,
// nil is returned.
func ServerShutdownSignalFrom(ctx context.Context) ServerShutdownSignal {
ev, _ := ctx.Value(serverShutdownSignalKey).(ServerShutdownSignal)
return ev
}
// WithServerShutdownSignal returns a new context that stores
// the ServerShutdownSignal interface instance.
func WithServerShutdownSignal(parent context.Context, window ServerShutdownSignal) context.Context {
if ServerShutdownSignalFrom(parent) != nil {
return parent // Avoid double registering.
}
return context.WithValue(parent, serverShutdownSignalKey, window)
}

View File

@ -146,6 +146,14 @@ type lifecycleSignals struct {
MuxAndDiscoveryComplete lifecycleSignal
}
// ShuttingDown returns the lifecycle signal that is signaled when
// the server is not accepting any new requests.
// this is the lifecycle event that is exported to the request handler
// logic to indicate that the server is shutting down.
func (s lifecycleSignals) ShuttingDown() <-chan struct{} {
return s.NotAcceptingNewRequest.Signaled()
}
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
// to coordinate lifecycle of the apiserver
func newLifecycleSignals() lifecycleSignals {