diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD index aadbd8037d8..101aa352d76 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "cacher.go", + "caching_object.go", "time_budget.go", "util.go", "watch_cache.go", @@ -19,6 +20,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", @@ -38,6 +41,7 @@ go_test( name = "go_default_test", srcs = [ "cacher_whitebox_test.go", + "caching_object_test.go", "time_budget_test.go", "util_test.go", "watch_cache_test.go", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 8663e8ccc0e..845b6238107 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -703,7 +703,7 @@ func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBoo } rv, err := cacher.versioner.ObjectResourceVersion(event.Object) if err != nil { - t.Errorf("failed to parse resource version from %#v", event.Object) + t.Errorf("failed to parse resource version from %#v: %v", event.Object, err) } if event.Type == watch.Bookmark { if !expectedBookmarks { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go new file mode 100644 index 00000000000..9e7a46393db --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go @@ -0,0 +1,397 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "bytes" + "fmt" + "io" + "reflect" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" +) + +var _ runtime.CacheableObject = &cachingObject{} + +// metaRuntimeInterface implements runtime.Object and +// metav1.Object interfaces. +type metaRuntimeInterface interface { + runtime.Object + metav1.Object +} + +// serializationResult captures a result of serialization. +type serializationResult struct { + // once should be used to ensure serialization is computed once. + once sync.Once + + // raw is serialized object. + raw []byte + // err is error from serialization. + err error +} + +// serializationsCache is a type for caching serialization results. +type serializationsCache map[runtime.Identifier]*serializationResult + +// cachingObject is an object that is able to cache its serializations +// so that each of those is computed exactly once. +// +// cachingObject implements the metav1.Object interface (accessors for +// all metadata fields). However, setters for all fields except from +// SelfLink (which is set lately in the path) are ignored. +type cachingObject struct { + lock sync.RWMutex + + // Object for which serializations are cached. + object metaRuntimeInterface + + // serializations is a cache containing object`s serializations. + // The value stored in atomic.Value is of type serializationsCache. + // The atomic.Value type is used to allow fast-path. + serializations atomic.Value +} + +// newCachingObject performs a deep copy of the given object and wraps it +// into a cachingObject. +// An error is returned if it's not possible to cast the object to +// metav1.Object type. +func newCachingObject(object runtime.Object) (*cachingObject, error) { + if obj, ok := object.(metaRuntimeInterface); ok { + result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)} + result.serializations.Store(make(serializationsCache)) + return result, nil + } + return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object) +} + +func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult { + // Fast-path for getting from cache. + serializations := o.serializations.Load().(serializationsCache) + if result, exists := serializations[id]; exists { + return result + } + + // Slow-path (that may require insert). + o.lock.Lock() + defer o.lock.Unlock() + + serializations = o.serializations.Load().(serializationsCache) + // Check if in the meantime it wasn't inserted. + if result, exists := serializations[id]; exists { + return result + } + + // Insert an entry for . This requires copy of existing map. + newSerializations := make(serializationsCache) + for k, v := range serializations { + newSerializations[k] = v + } + result := &serializationResult{} + newSerializations[id] = result + o.serializations.Store(newSerializations) + return result +} + +// CacheEncode implements runtime.CacheableObject interface. +// It serializes the object and writes the result to given io.Writer trying +// to first use the already cached result and falls back to a given encode +// function in case of cache miss. +// It assumes that for a given identifier, the encode function always encodes +// each input object into the same output format. +func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + result := o.getSerializationResult(id) + result.once.Do(func() { + buffer := bytes.NewBuffer(nil) + result.err = encode(o.GetObject(), buffer) + result.raw = buffer.Bytes() + }) + // Once invoked, fields of serialization will not change. + if result.err != nil { + return result.err + } + _, err := w.Write(result.raw) + return err +} + +// GetObject implements runtime.CacheableObject interface. +// It returns deep-copy of the wrapped object to return ownership of it +// to the called according to the contract of the interface. +func (o *cachingObject) GetObject() runtime.Object { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.DeepCopyObject().(metaRuntimeInterface) +} + +// GetObjectKind implements runtime.Object interface. +func (o *cachingObject) GetObjectKind() schema.ObjectKind { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetObjectKind() +} + +// DeepCopyObject implements runtime.Object interface. +func (o *cachingObject) DeepCopyObject() runtime.Object { + // DeepCopyObject on cachingObject is not expected to be called anywhere. + // However, to be on the safe-side, we implement it, though given the + // cache is only an optimization we ignore copying it. + result := &cachingObject{} + result.serializations.Store(make(serializationsCache)) + + o.lock.RLock() + defer o.lock.RUnlock() + result.object = o.object.DeepCopyObject().(metaRuntimeInterface) + return result +} + +var ( + invalidationCacheTimestampLock sync.Mutex + invalidationCacheTimestamp time.Time +) + +// shouldLogCacheInvalidation allows for logging cache-invalidation +// at most once per second (to avoid spamming logs in case of issues). +func shouldLogCacheInvalidation(now time.Time) bool { + invalidationCacheTimestampLock.Lock() + defer invalidationCacheTimestampLock.Unlock() + if invalidationCacheTimestamp.Add(time.Second).Before(now) { + invalidationCacheTimestamp = now + return true + } + return false +} + +func (o *cachingObject) invalidateCacheLocked() { + if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 { + return + } + // We don't expect cache invalidation to happen - so we want + // to log the stacktrace to allow debugging if that will happen. + // OTOH, we don't want to spam logs with it. + // So we try to log it at most once per second. + if shouldLogCacheInvalidation(time.Now()) { + klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack())) + } + o.serializations.Store(make(serializationsCache)) +} + +// The following functions implement metav1.Object interface: +// - getters simply delegate for the underlying object +// - setters check if operations isn't noop and if so, +// invalidate the cache and delegate for the underlying object + +func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) { + if fastPath := func() bool { + o.lock.RLock() + defer o.lock.RUnlock() + return isNoop() + }(); fastPath { + return + } + o.lock.Lock() + defer o.lock.Unlock() + if isNoop() { + return + } + o.invalidateCacheLocked() + set() +} + +func (o *cachingObject) GetNamespace() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetNamespace() +} +func (o *cachingObject) SetNamespace(namespace string) { + o.conditionalSet( + func() bool { return o.object.GetNamespace() == namespace }, + func() { o.object.SetNamespace(namespace) }, + ) +} +func (o *cachingObject) GetName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetName() +} +func (o *cachingObject) SetName(name string) { + o.conditionalSet( + func() bool { return o.object.GetName() == name }, + func() { o.object.SetName(name) }, + ) +} +func (o *cachingObject) GetGenerateName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetGenerateName() +} +func (o *cachingObject) SetGenerateName(name string) { + o.conditionalSet( + func() bool { return o.object.GetGenerateName() == name }, + func() { o.object.SetGenerateName(name) }, + ) +} +func (o *cachingObject) GetUID() types.UID { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetUID() +} +func (o *cachingObject) SetUID(uid types.UID) { + o.conditionalSet( + func() bool { return o.object.GetUID() == uid }, + func() { o.object.SetUID(uid) }, + ) +} +func (o *cachingObject) GetResourceVersion() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetResourceVersion() +} +func (o *cachingObject) SetResourceVersion(version string) { + o.conditionalSet( + func() bool { return o.object.GetResourceVersion() == version }, + func() { o.object.SetResourceVersion(version) }, + ) +} +func (o *cachingObject) GetGeneration() int64 { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetGeneration() +} +func (o *cachingObject) SetGeneration(generation int64) { + o.conditionalSet( + func() bool { return o.object.GetGeneration() == generation }, + func() { o.object.SetGeneration(generation) }, + ) +} +func (o *cachingObject) GetSelfLink() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetSelfLink() +} +func (o *cachingObject) SetSelfLink(selfLink string) { + o.conditionalSet( + func() bool { return o.object.GetSelfLink() == selfLink }, + func() { o.object.SetSelfLink(selfLink) }, + ) +} +func (o *cachingObject) GetCreationTimestamp() metav1.Time { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetCreationTimestamp() +} +func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) { + o.conditionalSet( + func() bool { return o.object.GetCreationTimestamp() == timestamp }, + func() { o.object.SetCreationTimestamp(timestamp) }, + ) +} +func (o *cachingObject) GetDeletionTimestamp() *metav1.Time { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetDeletionTimestamp() +} +func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) { + o.conditionalSet( + func() bool { return o.object.GetDeletionTimestamp() == timestamp }, + func() { o.object.SetDeletionTimestamp(timestamp) }, + ) +} +func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetDeletionGracePeriodSeconds() +} +func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) { + o.conditionalSet( + func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds }, + func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) }, + ) +} +func (o *cachingObject) GetLabels() map[string]string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetLabels() +} +func (o *cachingObject) SetLabels(labels map[string]string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) }, + func() { o.object.SetLabels(labels) }, + ) +} +func (o *cachingObject) GetAnnotations() map[string]string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetAnnotations() +} +func (o *cachingObject) SetAnnotations(annotations map[string]string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) }, + func() { o.object.SetAnnotations(annotations) }, + ) +} +func (o *cachingObject) GetFinalizers() []string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetFinalizers() +} +func (o *cachingObject) SetFinalizers(finalizers []string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) }, + func() { o.object.SetFinalizers(finalizers) }, + ) +} +func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetOwnerReferences() +} +func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) }, + func() { o.object.SetOwnerReferences(references) }, + ) +} +func (o *cachingObject) GetClusterName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetClusterName() +} +func (o *cachingObject) SetClusterName(clusterName string) { + o.conditionalSet( + func() bool { return o.object.GetClusterName() == clusterName }, + func() { o.object.SetClusterName(clusterName) }, + ) +} +func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetManagedFields() +} +func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) }, + func() { o.object.SetManagedFields(managedFields) }, + ) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go new file mode 100644 index 00000000000..ca8a5094636 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "bytes" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +type mockEncoder struct { + identifier runtime.Identifier + expectedResult string + expectedError error + + callsNumber int32 +} + +func newMockEncoder(id, result string, err error) *mockEncoder { + return &mockEncoder{ + identifier: runtime.Identifier(id), + expectedResult: result, + expectedError: err, + } +} + +func (e *mockEncoder) encode(_ runtime.Object, w io.Writer) error { + atomic.AddInt32(&e.callsNumber, 1) + if e.expectedError != nil { + return e.expectedError + } + _, err := w.Write([]byte(e.expectedResult)) + return err +} + +func TestCachingObject(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + encoders := []*mockEncoder{ + newMockEncoder("1", "result1", nil), + newMockEncoder("2", "", fmt.Errorf("mock error")), + newMockEncoder("3", "result3", nil), + } + + for _, encoder := range encoders { + buffer := bytes.NewBuffer(nil) + err := object.CacheEncode(encoder.identifier, encoder.encode, buffer) + if a, e := err, encoder.expectedError; e != a { + t.Errorf("%s: unexpected error: %v, expected: %v", encoder.identifier, a, e) + } + if a, e := buffer.String(), encoder.expectedResult; e != a { + t.Errorf("%s: unexpected result: %s, expected: %s", encoder.identifier, a, e) + } + } + for _, encoder := range encoders { + if encoder.callsNumber != 1 { + t.Errorf("%s: unexpected number of encode() calls: %d", encoder.identifier, encoder.callsNumber) + } + } +} + +func TestSelfLink(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + selfLink := "selfLink" + object.SetSelfLink(selfLink) + + encodeSelfLink := func(obj runtime.Object, w io.Writer) error { + accessor, err := meta.Accessor(obj) + if err != nil { + t.Fatalf("failed to get accessor for %#v: %v", obj, err) + } + _, err = w.Write([]byte(accessor.GetSelfLink())) + return err + } + buffer := bytes.NewBuffer(nil) + if err := object.CacheEncode("", encodeSelfLink, buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if a, e := buffer.String(), selfLink; a != e { + t.Errorf("unexpected serialization: %s, expected: %s", a, e) + } + + // GetObject should also set selfLink. + buffer.Reset() + if err := encodeSelfLink(object.GetObject(), buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if a, e := buffer.String(), selfLink; a != e { + t.Errorf("unexpected serialization: %s, expected: %s", a, e) + } +} + +func TestCachingObjectRaces(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + encoders := []*mockEncoder{} + for i := 0; i < 10; i++ { + encoder := newMockEncoder(fmt.Sprintf("%d", i), "result", nil) + encoders = append(encoders, encoder) + } + + numWorkers := 1000 + wg := &sync.WaitGroup{} + wg.Add(numWorkers) + + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + object.SetSelfLink("selfLink") + buffer := bytes.NewBuffer(nil) + for _, encoder := range encoders { + buffer.Reset() + if err := object.CacheEncode(encoder.identifier, encoder.encode, buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if callsNumber := atomic.LoadInt32(&encoder.callsNumber); callsNumber != 1 { + t.Errorf("unexpected number of serializations: %d", callsNumber) + } + } + accessor, err := meta.Accessor(object.GetObject()) + if err != nil { + t.Fatalf("failed to get accessor: %v", err) + } + if selfLink := accessor.GetSelfLink(); selfLink != "selfLink" { + t.Errorf("unexpected selfLink: %s", selfLink) + } + }() + } + wg.Wait() +}