mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #120490 from p0lyn0mial/upstream-watch-list-cache-metrics
collect watch-list requests latency metric
This commit is contained in:
commit
bdedc21fd9
@ -267,7 +267,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
|||||||
}
|
}
|
||||||
requestInfo, _ := request.RequestInfoFrom(ctx)
|
requestInfo, _ := request.RequestInfoFrom(ctx)
|
||||||
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
|
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
|
||||||
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
|
serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,9 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/features"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nothing will ever be sent down this channel
|
// nothing will ever be sent down this channel
|
||||||
@ -61,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
|||||||
|
|
||||||
// serveWatch will serve a watch response.
|
// serveWatch will serve a watch response.
|
||||||
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
|
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
|
||||||
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
|
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) {
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
|
|
||||||
options, err := optionsForTransform(mediaTypeOptions, req)
|
options, err := optionsForTransform(mediaTypeOptions, req)
|
||||||
@ -153,6 +156,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
|
|
||||||
TimeoutFactory: &realTimeoutFactory{timeout},
|
TimeoutFactory: &realTimeoutFactory{timeout},
|
||||||
ServerShuttingDownCh: serverShuttingDownCh,
|
ServerShuttingDownCh: serverShuttingDownCh,
|
||||||
|
|
||||||
|
metricsScope: metricsScope,
|
||||||
}
|
}
|
||||||
|
|
||||||
server.ServeHTTP(w, req)
|
server.ServeHTTP(w, req)
|
||||||
@ -176,6 +181,8 @@ type WatchServer struct {
|
|||||||
|
|
||||||
TimeoutFactory TimeoutFactory
|
TimeoutFactory TimeoutFactory
|
||||||
ServerShuttingDownCh <-chan struct{}
|
ServerShuttingDownCh <-chan struct{}
|
||||||
|
|
||||||
|
metricsScope string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
|
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
|
||||||
@ -247,6 +254,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
|
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
|
||||||
|
|
||||||
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||||
// unexpected error
|
// unexpected error
|
||||||
@ -280,6 +288,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
if len(ch) == 0 {
|
if len(ch) == 0 {
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
|
if isWatchListLatencyRecordingRequired {
|
||||||
|
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
|
||||||
|
}
|
||||||
|
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
}
|
}
|
||||||
@ -360,3 +371,19 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func shouldRecordWatchListLatency(event watch.Event) bool {
|
||||||
|
if event.Type != watch.Bookmark || !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// as of today the initial-events-end annotation is added only to a single event
|
||||||
|
// by the watch cache and only when certain conditions are met
|
||||||
|
//
|
||||||
|
// for more please read https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list
|
||||||
|
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unable to determine if the obj has the required annotation for measuring watchlist latency, obj %T: %v", event.Object, err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return hasAnnotation
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ package metrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -26,8 +27,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
restful "github.com/emicklei/go-restful/v3"
|
restful "github.com/emicklei/go-restful/v3"
|
||||||
|
|
||||||
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
utilsets "k8s.io/apimachinery/pkg/util/sets"
|
utilsets "k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
@ -280,6 +285,17 @@ var (
|
|||||||
[]string{"code_path"},
|
[]string{"code_path"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
watchListLatencies = compbasemetrics.NewHistogramVec(
|
||||||
|
&compbasemetrics.HistogramOpts{
|
||||||
|
Subsystem: APIServerComponent,
|
||||||
|
Name: "watch_list_duration_seconds",
|
||||||
|
Help: "Response latency distribution in seconds for watch list requests broken by group, version, resource and scope.",
|
||||||
|
Buckets: []float64{0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 2, 4, 6, 8, 10, 15, 20, 30, 45, 60},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{"group", "version", "resource", "scope"},
|
||||||
|
)
|
||||||
|
|
||||||
metrics = []resettableCollector{
|
metrics = []resettableCollector{
|
||||||
deprecatedRequestGauge,
|
deprecatedRequestGauge,
|
||||||
requestCounter,
|
requestCounter,
|
||||||
@ -300,6 +316,7 @@ var (
|
|||||||
requestAbortsTotal,
|
requestAbortsTotal,
|
||||||
requestPostTimeoutTotal,
|
requestPostTimeoutTotal,
|
||||||
requestTimestampComparisonDuration,
|
requestTimestampComparisonDuration,
|
||||||
|
watchListLatencies,
|
||||||
}
|
}
|
||||||
|
|
||||||
// these are the valid request methods which we report in our metrics. Any other request methods
|
// these are the valid request methods which we report in our metrics. Any other request methods
|
||||||
@ -511,6 +528,18 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp
|
|||||||
fn()
|
fn()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordWatchListLatency simply records response latency for watch list requests.
|
||||||
|
func RecordWatchListLatency(ctx context.Context, gvr schema.GroupVersionResource, metricsScope string) {
|
||||||
|
requestReceivedTimestamp, ok := request.ReceivedTimestampFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unable to measure watchlist latency because no received ts found in the ctx, gvr: %s", gvr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
elapsedSeconds := time.Since(requestReceivedTimestamp).Seconds()
|
||||||
|
|
||||||
|
watchListLatencies.WithContext(ctx).WithLabelValues(gvr.Group, gvr.Version, gvr.Resource, metricsScope).Observe(elapsedSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
|
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
|
||||||
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
|
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
|
||||||
func MonitorRequest(req *http.Request, verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, httpCode, respSize int, elapsed time.Duration) {
|
func MonitorRequest(req *http.Request, verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, httpCode, respSize int, elapsed time.Duration) {
|
||||||
@ -621,6 +650,26 @@ func CleanScope(requestInfo *request.RequestInfo) string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CleanListScope computes the request scope for metrics.
|
||||||
|
//
|
||||||
|
// Note that normally we would use CleanScope for computation.
|
||||||
|
// But due to the same reasons mentioned in determineRequestNamespaceAndName we cannot.
|
||||||
|
func CleanListScope(ctx context.Context, opts *metainternalversion.ListOptions) string {
|
||||||
|
namespace, name := determineRequestNamespaceAndName(ctx, opts)
|
||||||
|
if len(name) > 0 {
|
||||||
|
return "resource"
|
||||||
|
}
|
||||||
|
if len(namespace) > 0 {
|
||||||
|
return "namespace"
|
||||||
|
}
|
||||||
|
if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
|
||||||
|
if requestInfo.IsResourceRequest {
|
||||||
|
return "cluster"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
// CanonicalVerb distinguishes LISTs from GETs (and HEADs). It assumes verb is
|
// CanonicalVerb distinguishes LISTs from GETs (and HEADs). It assumes verb is
|
||||||
// UPPERCASE.
|
// UPPERCASE.
|
||||||
func CanonicalVerb(verb string, scope string) string {
|
func CanonicalVerb(verb string, scope string) string {
|
||||||
@ -655,6 +704,30 @@ func CleanVerb(verb string, request *http.Request, requestInfo *request.RequestI
|
|||||||
return reportedVerb
|
return reportedVerb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// determineRequestNamespaceAndName computes name and namespace for the given requests
|
||||||
|
//
|
||||||
|
// note that the logic of this function was copy&pasted from cacher.go
|
||||||
|
// after an unsuccessful attempt of moving it to RequestInfo
|
||||||
|
//
|
||||||
|
// see: https://github.com/kubernetes/kubernetes/pull/120520
|
||||||
|
func determineRequestNamespaceAndName(ctx context.Context, opts *metainternalversion.ListOptions) (namespace, name string) {
|
||||||
|
if requestNamespace, ok := request.NamespaceFrom(ctx); ok && len(requestNamespace) > 0 {
|
||||||
|
namespace = requestNamespace
|
||||||
|
} else if opts != nil && opts.FieldSelector != nil {
|
||||||
|
if selectorNamespace, ok := opts.FieldSelector.RequiresExactMatch("metadata.namespace"); ok {
|
||||||
|
namespace = selectorNamespace
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if requestInfo, ok := request.RequestInfoFrom(ctx); ok && requestInfo != nil && len(requestInfo.Name) > 0 {
|
||||||
|
name = requestInfo.Name
|
||||||
|
} else if opts != nil && opts.FieldSelector != nil {
|
||||||
|
if selectorName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name"); ok {
|
||||||
|
name = selectorName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// cleanVerb additionally ensures that unknown verbs don't clog up the metrics.
|
// cleanVerb additionally ensures that unknown verbs don't clog up the metrics.
|
||||||
func cleanVerb(verb, suggestedVerb string, request *http.Request, requestInfo *request.RequestInfo) string {
|
func cleanVerb(verb, suggestedVerb string, request *http.Request, requestInfo *request.RequestInfo) string {
|
||||||
// CanonicalVerb (being an input for this function) doesn't handle correctly the
|
// CanonicalVerb (being an input for this function) doesn't handle correctly the
|
||||||
|
@ -17,11 +17,14 @@ limitations under the License.
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/endpoints/responsewriter"
|
"k8s.io/apiserver/pkg/endpoints/responsewriter"
|
||||||
"k8s.io/component-base/metrics/legacyregistry"
|
"k8s.io/component-base/metrics/legacyregistry"
|
||||||
@ -466,3 +469,61 @@ func TestRecordDroppedRequests(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCleanListScope(t *testing.T) {
|
||||||
|
scenarios := []struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
opts *metainternalversion.ListOptions
|
||||||
|
expectedScope string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty scope",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty scope with empty request info",
|
||||||
|
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespace from ctx",
|
||||||
|
ctx: request.WithNamespace(context.TODO(), "foo"),
|
||||||
|
expectedScope: "namespace",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespace from field selector",
|
||||||
|
opts: &metainternalversion.ListOptions{
|
||||||
|
FieldSelector: fields.ParseSelectorOrDie("metadata.namespace=foo"),
|
||||||
|
},
|
||||||
|
expectedScope: "namespace",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "name from request info",
|
||||||
|
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Name: "bar"}),
|
||||||
|
expectedScope: "resource",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "name from field selector",
|
||||||
|
opts: &metainternalversion.ListOptions{
|
||||||
|
FieldSelector: fields.ParseSelectorOrDie("metadata.name=bar"),
|
||||||
|
},
|
||||||
|
expectedScope: "resource",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cluster scope request",
|
||||||
|
ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{IsResourceRequest: true}),
|
||||||
|
expectedScope: "cluster",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
if scenario.ctx == nil {
|
||||||
|
scenario.ctx = context.TODO()
|
||||||
|
}
|
||||||
|
actualScope := CleanListScope(scenario.ctx, scenario.opts)
|
||||||
|
if actualScope != scenario.expectedScope {
|
||||||
|
t.Errorf("unexpected scope = %s, expected = %s", actualScope, scenario.expectedScope)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user