Merge pull request #121512 from HirazawaUi/add-decod-time-trace

Add decode time to the audit log
This commit is contained in:
Kubernetes Prow Robot 2024-01-31 12:54:17 -08:00 committed by GitHub
commit 11b9740436
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 9 deletions

View File

@ -152,6 +152,13 @@ type LatencyTrackers struct {
// The Write method can be invoked multiple times, so we use a // The Write method can be invoked multiple times, so we use a
// latency tracker that sums up the duration from each call. // latency tracker that sums up the duration from each call.
ResponseWriteTracker DurationTracker ResponseWriteTracker DurationTracker
// DecodeTracker is used to track latency incurred inside the function
// that takes an object returned from the underlying storage layer
// (etcd) and performs decoding of the response object.
// When called multiple times, the latency incurred inside to
// decode func each time will be summed up.
DecodeTracker DurationTracker
} }
type latencyTrackersKeyType int type latencyTrackersKeyType int
@ -177,6 +184,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co
TransformTracker: newSumLatencyTracker(c), TransformTracker: newSumLatencyTracker(c),
SerializationTracker: newSumLatencyTracker(c), SerializationTracker: newSumLatencyTracker(c),
ResponseWriteTracker: newSumLatencyTracker(c), ResponseWriteTracker: newSumLatencyTracker(c),
DecodeTracker: newSumLatencyTracker(c),
}) })
} }
@ -243,6 +251,17 @@ func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) {
} }
} }
// TrackDecodeLatency is used to track latency incurred inside the function
// that takes an object returned from the underlying storage layer
// (etcd) and performs decoding of the response object.
// When called multiple times, the latency incurred inside to
// decode func each time will be summed up.
func TrackDecodeLatency(ctx context.Context, d time.Duration) {
if tracker, ok := LatencyTrackersFrom(ctx); ok {
tracker.DecodeTracker.TrackDuration(d)
}
}
// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker // AuditAnnotationsFromLatencyTrackers will inspect each latency tracker
// associated with the request context and return a set of audit // associated with the request context and return a set of audit
// annotations that can be added to the API audit entry. // annotations that can be added to the API audit entry.
@ -254,6 +273,7 @@ func AuditAnnotationsFromLatencyTrackers(ctx context.Context) map[string]string
responseWriteLatencyKey = "apiserver.latency.k8s.io/response-write" responseWriteLatencyKey = "apiserver.latency.k8s.io/response-write"
mutatingWebhookLatencyKey = "apiserver.latency.k8s.io/mutating-webhook" mutatingWebhookLatencyKey = "apiserver.latency.k8s.io/mutating-webhook"
validatingWebhookLatencyKey = "apiserver.latency.k8s.io/validating-webhook" validatingWebhookLatencyKey = "apiserver.latency.k8s.io/validating-webhook"
decodeLatencyKey = "apiserver.latency.k8s.io/decode-response-object"
) )
tracker, ok := LatencyTrackersFrom(ctx) tracker, ok := LatencyTrackersFrom(ctx)
@ -280,6 +300,9 @@ func AuditAnnotationsFromLatencyTrackers(ctx context.Context) map[string]string
if latency := tracker.ValidatingWebhookTracker.GetLatency(); latency != 0 { if latency := tracker.ValidatingWebhookTracker.GetLatency(); latency != 0 {
annotations[validatingWebhookLatencyKey] = latency.String() annotations[validatingWebhookLatencyKey] = latency.String()
} }
if latency := tracker.DecodeTracker.GetLatency(); latency != 0 {
annotations[decodeLatencyKey] = latency.String()
}
return annotations return annotations
} }

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
@ -761,10 +762,17 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
default: default:
} }
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil { obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
if err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key)) recordDecodeError(s.groupResourceString, string(kv.Key))
return err return err
} }
// being unable to set the version does not prevent the object from being extracted
if matched, err := opts.Predicate.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
numEvald++ numEvald++
// free kv early. Long lists can take O(seconds) to decode. // free kv early. Long lists can take O(seconds) to decode.
@ -1038,20 +1046,23 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
return nil return nil
} }
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. // decodeListItem decodes bytes value in array into object.
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) error { func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := codec.Decode(data, nil, newItemFunc()) obj, _, err := codec.Decode(data, nil, newItemFunc())
if err != nil { if err != nil {
return err return nil, err
} }
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(obj, rev); err != nil { if err := versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err) klog.Errorf("failed to update object version: %v", err)
} }
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) return obj, nil
}
return nil
} }
// recordDecodeError record decode error split by object type. // recordDecodeError record decode error split by object type.