handlers/watch: calculate and record WatchList latency metric.

This commit is contained in:
Lukasz Szaszkiewicz 2023-09-19 03:05:00 +02:00
parent 9410de78b2
commit 772b1f4cd8
2 changed files with 29 additions and 2 deletions

View File

@ -267,7 +267,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
})
return
}

View File

@ -34,6 +34,9 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
// nothing will ever be sent down this channel
@ -61,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
// serveWatch will serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) {
defer watcher.Stop()
options, err := optionsForTransform(mediaTypeOptions, req)
@ -153,6 +156,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
metricsScope: metricsScope,
}
server.ServeHTTP(w, req)
@ -176,6 +181,8 @@ type WatchServer struct {
TimeoutFactory TimeoutFactory
ServerShuttingDownCh <-chan struct{}
metricsScope string
}
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
@ -247,6 +254,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
// unexpected error
@ -280,6 +288,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if len(ch) == 0 {
flusher.Flush()
}
if isWatchListLatencyRecordingRequired {
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
}
buf.Reset()
}
@ -360,3 +371,19 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
}
}
}
func shouldRecordWatchListLatency(event watch.Event) bool {
if event.Type != watch.Bookmark || !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
// as of today the initial-events-end annotation is added only to a single event
// by the watch cache and only when certain conditions are met
//
// for more please read https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to determine if the obj has the required annotation for measuring watchlist latency, obj %T: %v", event.Object, err))
return false
}
return hasAnnotation
}