mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #107910 from tkashem/latency-breakdown
track latency incurred in various layers of apiserver
This commit is contained in:
commit
84bd51c7a1
@ -154,6 +154,35 @@ func evaluatePolicyAndCreateAuditEvent(req *http.Request, policy audit.PolicyRul
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeLatencyToAnnotation writes the latency incurred in different
|
||||||
|
// layers of the apiserver to the annotations of the audit object.
|
||||||
|
// it should be invoked after ev.StageTimestamp has been set appropriately.
|
||||||
|
func writeLatencyToAnnotation(ctx context.Context, ev *auditinternal.Event) {
|
||||||
|
// we will track latency in annotation only when the total latency
|
||||||
|
// of the given request exceeds 500ms, this is in keeping with the
|
||||||
|
// traces in rest/handlers for create, delete, update,
|
||||||
|
// get, list, and deletecollection.
|
||||||
|
const threshold = 500 * time.Millisecond
|
||||||
|
latency := ev.StageTimestamp.Time.Sub(ev.RequestReceivedTimestamp.Time)
|
||||||
|
if latency <= threshold {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we are tracking latency incurred inside different layers within
|
||||||
|
// the apiserver, add these as annotation to the audit event object.
|
||||||
|
layerLatencies := request.AuditAnnotationsFromLatencyTrackers(ctx)
|
||||||
|
if len(layerLatencies) == 0 {
|
||||||
|
// latency tracking is not enabled for this request
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// record the total latency for this request, for convenience.
|
||||||
|
audit.LogAnnotation(ev, "apiserver.latency.k8s.io/total", latency.String())
|
||||||
|
for k, v := range layerLatencies {
|
||||||
|
audit.LogAnnotation(ev, k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
|
func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
|
||||||
for _, stage := range omitStages {
|
for _, stage := range omitStages {
|
||||||
if ev.Stage == stage {
|
if ev.Stage == stage {
|
||||||
@ -161,11 +190,16 @@ func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.E
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ev.Stage == auditinternal.StageRequestReceived {
|
switch {
|
||||||
|
case ev.Stage == auditinternal.StageRequestReceived:
|
||||||
ev.StageTimestamp = metav1.NewMicroTime(ev.RequestReceivedTimestamp.Time)
|
ev.StageTimestamp = metav1.NewMicroTime(ev.RequestReceivedTimestamp.Time)
|
||||||
} else {
|
case ev.Stage == auditinternal.StageResponseComplete:
|
||||||
|
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
||||||
|
writeLatencyToAnnotation(ctx, ev)
|
||||||
|
default:
|
||||||
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
audit.ObserveEvent(ctx)
|
audit.ObserveEvent(ctx)
|
||||||
return sink.ProcessEvents(ev)
|
return sink.ProcessEvents(ev)
|
||||||
}
|
}
|
||||||
|
@ -17,9 +17,18 @@ limitations under the License.
|
|||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/responsewriter"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
watchVerbs = sets.NewString("watch")
|
||||||
)
|
)
|
||||||
|
|
||||||
// WithLatencyTrackers adds a LatencyTrackers instance to the
|
// WithLatencyTrackers adds a LatencyTrackers instance to the
|
||||||
@ -28,7 +37,44 @@ import (
|
|||||||
func WithLatencyTrackers(handler http.Handler) http.Handler {
|
func WithLatencyTrackers(handler http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
ctx := req.Context()
|
ctx := req.Context()
|
||||||
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
handleError(w, req, http.StatusInternalServerError, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if watchVerbs.Has(requestInfo.Verb) {
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
req = req.WithContext(request.WithLatencyTrackers(ctx))
|
req = req.WithContext(request.WithLatencyTrackers(ctx))
|
||||||
|
w = responsewriter.WrapForHTTP1Or2(&writeLatencyTracker{
|
||||||
|
ResponseWriter: w,
|
||||||
|
ctx: req.Context(),
|
||||||
|
})
|
||||||
|
|
||||||
handler.ServeHTTP(w, req)
|
handler.ServeHTTP(w, req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ http.ResponseWriter = &writeLatencyTracker{}
|
||||||
|
var _ responsewriter.UserProvidedDecorator = &writeLatencyTracker{}
|
||||||
|
|
||||||
|
type writeLatencyTracker struct {
|
||||||
|
http.ResponseWriter
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wt *writeLatencyTracker) Unwrap() http.ResponseWriter {
|
||||||
|
return wt.ResponseWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wt *writeLatencyTracker) Write(bs []byte) (int, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
request.TrackResponseWriteLatency(wt.ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return wt.ResponseWriter.Write(bs)
|
||||||
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
utiltrace "k8s.io/utils/trace"
|
utiltrace "k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -134,7 +135,13 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, trace *ut
|
|||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
obj, err := transformObject(ctx, result, options, mediaType, scope, req)
|
|
||||||
|
var obj runtime.Object
|
||||||
|
do := func() {
|
||||||
|
obj, err = transformObject(ctx, result, options, mediaType, scope, req)
|
||||||
|
}
|
||||||
|
endpointsrequest.TrackTransformResponseObjectLatency(ctx, do)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
return
|
return
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/apiserver/pkg/util/flushwriter"
|
"k8s.io/apiserver/pkg/util/flushwriter"
|
||||||
@ -271,7 +272,9 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiat
|
|||||||
audit.LogResponseObject(req.Context(), object, gv, s)
|
audit.LogResponseObject(req.Context(), object, gv, s)
|
||||||
|
|
||||||
encoder := s.EncoderForVersion(serializer.Serializer, gv)
|
encoder := s.EncoderForVersion(serializer.Serializer, gv)
|
||||||
SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object)
|
endpointsrequest.TrackSerializeResponseObjectLatency(req.Context(), func() {
|
||||||
|
SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
|
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
|
||||||
|
@ -35,9 +35,20 @@ func maxDuration(d1 time.Duration, d2 time.Duration) time.Duration {
|
|||||||
return d2
|
return d2
|
||||||
}
|
}
|
||||||
|
|
||||||
// DurationTracker is a simple interface for tracking functions duration
|
// DurationTracker is a simple interface for tracking functions duration,
|
||||||
|
// it is safe for concurrent use by multiple goroutines.
|
||||||
type DurationTracker interface {
|
type DurationTracker interface {
|
||||||
Track(func())
|
// Track measures time spent in the given function f and
|
||||||
|
// aggregates measured duration using aggregateFunction.
|
||||||
|
// if Track is invoked with f from multiple goroutines concurrently,
|
||||||
|
// then f must be safe to be invoked concurrently by multiple goroutines.
|
||||||
|
Track(f func())
|
||||||
|
|
||||||
|
// TrackDuration tracks latency from the specified duration
|
||||||
|
// and aggregate it using aggregateFunction
|
||||||
|
TrackDuration(time.Duration)
|
||||||
|
|
||||||
|
// GetLatency returns the total latency incurred so far
|
||||||
GetLatency() time.Duration
|
GetLatency() time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,6 +75,14 @@ func (t *durationTracker) Track(f func()) {
|
|||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrackDuration tracks latency from the given duration
|
||||||
|
// using aggregateFunction
|
||||||
|
func (t *durationTracker) TrackDuration(d time.Duration) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
t.latency = t.aggregateFunction(t.latency, d)
|
||||||
|
}
|
||||||
|
|
||||||
// GetLatency returns aggregated latency tracked by a tracker
|
// GetLatency returns aggregated latency tracked by a tracker
|
||||||
func (t *durationTracker) GetLatency() time.Duration {
|
func (t *durationTracker) GetLatency() time.Duration {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
@ -96,6 +115,39 @@ type LatencyTrackers struct {
|
|||||||
// ValidatingWebhookTracker tracks the latency incurred in validating webhook(s).
|
// ValidatingWebhookTracker tracks the latency incurred in validating webhook(s).
|
||||||
// Validate webhooks are done in parallel, so max function is used.
|
// Validate webhooks are done in parallel, so max function is used.
|
||||||
ValidatingWebhookTracker DurationTracker
|
ValidatingWebhookTracker DurationTracker
|
||||||
|
|
||||||
|
// StorageTracker tracks the latency incurred inside the storage layer,
|
||||||
|
// it accounts for the time it takes to send data to the underlying
|
||||||
|
// storage layer (etcd) and get the complete response back.
|
||||||
|
// If a request involves N (N>=1) round trips to the underlying
|
||||||
|
// stogare layer, the latency will account for the total duration
|
||||||
|
// from these N round trips.
|
||||||
|
// It does not include the time incurred in admission, or validation.
|
||||||
|
StorageTracker DurationTracker
|
||||||
|
|
||||||
|
// TransformTracker tracks the latency incurred in transforming the
|
||||||
|
// response object(s) returned from the underlying storage layer.
|
||||||
|
// This includes transforming the object to user's desired form
|
||||||
|
// (ie. as Table), and also setting appropriate API level fields.
|
||||||
|
// This does not include the latency incurred in serialization
|
||||||
|
// (json or protobuf) of the response object or writing
|
||||||
|
// of it to the http ResponseWriter object.
|
||||||
|
TransformTracker DurationTracker
|
||||||
|
|
||||||
|
// SerializationTracker tracks the latency incurred in serialization
|
||||||
|
// (json or protobuf) of the response object.
|
||||||
|
// NOTE: serialization and writing of the serialized raw bytes to the
|
||||||
|
// associated http ResponseWriter object are interleaved, and hence
|
||||||
|
// the latency measured here will include the time spent writing the
|
||||||
|
// serialized raw bytes to the http ResponseWriter object.
|
||||||
|
SerializationTracker DurationTracker
|
||||||
|
|
||||||
|
// ResponseWriteTracker tracks the latency incurred in writing the
|
||||||
|
// serialized raw bytes to the http ResponseWriter object (via the
|
||||||
|
// Write method) associated with the request.
|
||||||
|
// The Write method can be invoked multiple times, so we use a
|
||||||
|
// latency tracker that sums up the duration from each call.
|
||||||
|
ResponseWriteTracker DurationTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
type latencyTrackersKeyType int
|
type latencyTrackersKeyType int
|
||||||
@ -116,6 +168,10 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co
|
|||||||
return WithValue(parent, latencyTrackersKey, &LatencyTrackers{
|
return WithValue(parent, latencyTrackersKey, &LatencyTrackers{
|
||||||
MutatingWebhookTracker: newSumLatencyTracker(c),
|
MutatingWebhookTracker: newSumLatencyTracker(c),
|
||||||
ValidatingWebhookTracker: newMaxLatencyTracker(c),
|
ValidatingWebhookTracker: newMaxLatencyTracker(c),
|
||||||
|
StorageTracker: newSumLatencyTracker(c),
|
||||||
|
TransformTracker: newSumLatencyTracker(c),
|
||||||
|
SerializationTracker: newSumLatencyTracker(c),
|
||||||
|
ResponseWriteTracker: newSumLatencyTracker(c),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -125,3 +181,92 @@ func LatencyTrackersFrom(ctx context.Context) (*LatencyTrackers, bool) {
|
|||||||
wd, ok := ctx.Value(latencyTrackersKey).(*LatencyTrackers)
|
wd, ok := ctx.Value(latencyTrackersKey).(*LatencyTrackers)
|
||||||
return wd, ok && wd != nil
|
return wd, ok && wd != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrackTransformResponseObjectLatency is used to track latency incurred
|
||||||
|
// inside the function that takes an object returned from the underlying
|
||||||
|
// storage layer (etcd) and performs any necessary transformations
|
||||||
|
// of the response object. This does not include the latency incurred in
|
||||||
|
// serialization (json or protobuf) of the response object or writing of
|
||||||
|
// it to the http ResponseWriter object.
|
||||||
|
// When called multiple times, the latency incurred inside the
|
||||||
|
// transform func each time will be summed up.
|
||||||
|
func TrackTransformResponseObjectLatency(ctx context.Context, transform func()) {
|
||||||
|
if tracker, ok := LatencyTrackersFrom(ctx); ok {
|
||||||
|
tracker.TransformTracker.Track(transform)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
transform()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrackStorageLatency is used to track latency incurred
|
||||||
|
// inside the underlying storage layer.
|
||||||
|
// When called multiple times, the latency provided will be summed up.
|
||||||
|
func TrackStorageLatency(ctx context.Context, d time.Duration) {
|
||||||
|
if tracker, ok := LatencyTrackersFrom(ctx); ok {
|
||||||
|
tracker.StorageTracker.TrackDuration(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrackSerializeResponseObjectLatency is used to track latency incurred in
|
||||||
|
// serialization (json or protobuf) of the response object.
|
||||||
|
// When called multiple times, the latency provided will be summed up.
|
||||||
|
func TrackSerializeResponseObjectLatency(ctx context.Context, f func()) {
|
||||||
|
if tracker, ok := LatencyTrackersFrom(ctx); ok {
|
||||||
|
tracker.SerializationTracker.Track(f)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrackResponseWriteLatency is used to track latency incurred in writing
|
||||||
|
// the serialized raw bytes to the http ResponseWriter object (via the
|
||||||
|
// Write method) associated with the request.
|
||||||
|
// When called multiple times, the latency provided will be summed up.
|
||||||
|
func TrackResponseWriteLatency(ctx context.Context, d time.Duration) {
|
||||||
|
if tracker, ok := LatencyTrackersFrom(ctx); ok {
|
||||||
|
tracker.ResponseWriteTracker.TrackDuration(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker
|
||||||
|
// associated with the request context and return a set of audit
|
||||||
|
// annotations that can be added to the API audit entry.
|
||||||
|
func AuditAnnotationsFromLatencyTrackers(ctx context.Context) map[string]string {
|
||||||
|
const (
|
||||||
|
transformLatencyKey = "apiserver.latency.k8s.io/transform-response-object"
|
||||||
|
storageLatencyKey = "apiserver.latency.k8s.io/etcd"
|
||||||
|
serializationLatencyKey = "apiserver.latency.k8s.io/serialize-response-object"
|
||||||
|
responseWriteLatencyKey = "apiserver.latency.k8s.io/response-write"
|
||||||
|
mutatingWebhookLatencyKey = "apiserver.latency.k8s.io/mutating-webhook"
|
||||||
|
validatingWebhookLatencyKey = "apiserver.latency.k8s.io/validating-webhook"
|
||||||
|
)
|
||||||
|
|
||||||
|
tracker, ok := LatencyTrackersFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
annotations := map[string]string{}
|
||||||
|
if latency := tracker.TransformTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[transformLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
if latency := tracker.StorageTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[storageLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
if latency := tracker.SerializationTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[serializationLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
if latency := tracker.ResponseWriteTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[responseWriteLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
if latency := tracker.MutatingWebhookTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[mutatingWebhookLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
if latency := tracker.ValidatingWebhookTracker.GetLatency(); latency != 0 {
|
||||||
|
annotations[validatingWebhookLatencyKey] = latency.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
return annotations
|
||||||
|
}
|
||||||
|
@ -765,8 +765,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||||
handler := genericapifilters.WithLatencyTrackers(apiHandler)
|
handler := filterlatency.TrackCompleted(apiHandler)
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
|
||||||
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
||||||
handler = filterlatency.TrackStarted(handler, "authorization")
|
handler = filterlatency.TrackStarted(handler, "authorization")
|
||||||
|
|
||||||
@ -818,6 +817,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
||||||
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
|
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
|
||||||
}
|
}
|
||||||
|
handler = genericapifilters.WithLatencyTrackers(handler)
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
||||||
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
|
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
|
||||||
"k8s.io/apimachinery/pkg/util/json"
|
"k8s.io/apimachinery/pkg/util/json"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -340,8 +339,16 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
|
|||||||
if event.Stage != auditinternal.StageResponseComplete {
|
if event.Stage != auditinternal.StageResponseComplete {
|
||||||
t.Errorf("expected event stage to be complete, got: %s", event.Stage)
|
t.Errorf("expected event stage to be complete, got: %s", event.Stage)
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(want, event.Annotations); diff != "" {
|
|
||||||
t.Errorf("event has unexpected annotations (-want +got): %s", diff)
|
for wantK, wantV := range want {
|
||||||
|
gotV, ok := event.Annotations[wantK]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expected to find annotation key %q in %#v", wantK, event.Annotations)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if wantV != gotV {
|
||||||
|
t.Errorf("expected the annotation value to match, key: %q, want: %q got: %q", wantK, wantV, gotV)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,107 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package etcd3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewETCDLatencyTracker returns an implementation of
|
||||||
|
// clientv3.KV that times the calls from the specified
|
||||||
|
// 'delegate' KV instance in order to track latency incurred.
|
||||||
|
func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV {
|
||||||
|
return &clientV3KVLatencyTracker{KV: delegate}
|
||||||
|
}
|
||||||
|
|
||||||
|
// clientV3KVLatencyTracker decorates a clientv3.KV instance and times
|
||||||
|
// each call so we can track the latency an API request incurs in etcd
|
||||||
|
// round trips (the time it takes to send data to etcd and get the
|
||||||
|
// complete response back)
|
||||||
|
//
|
||||||
|
// If an API request involves N (N>=1) round trips to etcd, then we will sum
|
||||||
|
// up the latenciy incurred in each roundtrip.
|
||||||
|
|
||||||
|
// It uses the context associated with the request in flight, so there
|
||||||
|
// are no states shared among the requests in flight, and so there is no
|
||||||
|
// concurrency overhead.
|
||||||
|
// If the goroutine executing the request handler makes concurrent calls
|
||||||
|
// to the underlying storage layer, that is protected since the latency
|
||||||
|
// tracking function TrackStorageLatency is thread safe.
|
||||||
|
//
|
||||||
|
// NOTE: Compact is an asynchronous process and is not associated with
|
||||||
|
// any request, so we will not be tracking its latency.
|
||||||
|
type clientV3KVLatencyTracker struct {
|
||||||
|
clientv3.KV
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientV3KVLatencyTracker) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c.KV.Put(ctx, key, val, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientV3KVLatencyTracker) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c.KV.Get(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientV3KVLatencyTracker) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c.KV.Delete(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientV3KVLatencyTracker) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c.KV.Do(ctx, op)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientV3KVLatencyTracker) Txn(ctx context.Context) clientv3.Txn {
|
||||||
|
return &clientV3TxnTracker{ctx: ctx, Txn: c.KV.Txn(ctx)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientV3TxnTracker struct {
|
||||||
|
ctx context.Context
|
||||||
|
clientv3.Txn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *clientV3TxnTracker) Commit() (*clientv3.TxnResponse, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackStorageLatency(t.ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return t.Txn.Commit()
|
||||||
|
}
|
@ -261,6 +261,9 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decorate the KV instance so we can track etcd latency per request.
|
||||||
|
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
|
||||||
|
|
||||||
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
|
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user