diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD index 1fdb5850c27..84632b98b47 100644 --- a/pkg/kubelet/secret/BUILD +++ b/pkg/kubelet/secret/BUILD @@ -8,7 +8,7 @@ load( go_test( name = "go_default_test", - srcs = ["secret_manager_test.go"], + srcs = ["caching_secret_manager_test.go"], embed = [":go_default_library"], deps = [ "//vendor/github.com/stretchr/testify/assert:go_default_library", @@ -25,6 +25,7 @@ go_test( go_library( name = "go_default_library", srcs = [ + "caching_secret_manager.go", "fake_manager.go", "secret_manager.go", ], diff --git a/pkg/kubelet/secret/caching_secret_manager.go b/pkg/kubelet/secret/caching_secret_manager.go new file mode 100644 index 00000000000..45756ad2ff0 --- /dev/null +++ b/pkg/kubelet/secret/caching_secret_manager.go @@ -0,0 +1,206 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "strconv" + "sync" + "time" + + "k8s.io/api/core/v1" + storageetcd "k8s.io/apiserver/pkg/storage/etcd" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/kubelet/util" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" +) + +const ( + defaultTTL = time.Minute +) + +type GetObjectTTLFunc func() (time.Duration, bool) + +// secretStoreItems is a single item stored in secretStore. +type secretStoreItem struct { + refCount int + secret *secretData +} + +type secretData struct { + sync.Mutex + + secret *v1.Secret + err error + lastUpdateTime time.Time +} + +// secretStore is a local cache of secrets. +type secretStore struct { + kubeClient clientset.Interface + clock clock.Clock + + lock sync.Mutex + items map[objectKey]*secretStoreItem + + defaultTTL time.Duration + getTTL GetObjectTTLFunc +} + +func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore { + return &secretStore{ + kubeClient: kubeClient, + clock: clock, + items: make(map[objectKey]*secretStoreItem), + defaultTTL: ttl, + getTTL: getTTL, + } +} + +func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { + if newSecret == nil || oldSecret == nil { + return false + } + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) + return newVersion < oldVersion +} + +func (s *secretStore) AddReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // AddReference is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given secret. + // Then Get() is responsible for fetching if needed. + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + item = &secretStoreItem{ + refCount: 0, + secret: &secretData{}, + } + s.items[key] = item + } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.secret = nil +} + +func (s *secretStore) DeleteReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { + return func() (time.Duration, bool) { + node, err := getNode() + if err != nil { + return time.Duration(0), false + } + if node != nil && node.Annotations != nil { + if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { + if intValue, err := strconv.Atoi(value); err == nil { + return time.Duration(intValue) * time.Second, true + } + } + } + return time.Duration(0), false + } +} + +func (s *secretStore) isSecretFresh(data *secretData) bool { + secretTTL := s.defaultTTL + if ttl, ok := s.getTTL(); ok { + secretTTL = ttl + } + return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL)) +} + +func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { + key := objectKey{namespace: namespace, name: name} + + data := func() *secretData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil + } + if item.secret == nil { + item.secret = &secretData{} + } + return item.secret + }() + if data == nil { + return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) + } + + // After updating data in secretStore, lock the data, fetch secret if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.isSecretFresh(data) { + opts := metav1.GetOptions{} + if data.secret != nil && data.err == nil { + // This is just a periodic refresh of a secret we successfully fetched previously. + // In this case, server data from apiserver cache to reduce the load on both + // etcd and apiserver (the cache is eventually consistent). + util.FromApiserverCache(&opts) + } + secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts) + if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil { + // Couldn't fetch the latest secret, but there is no cached data to return. + // Return the fetch result instead. + return secret, err + } + if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) { + // If the fetch succeeded with a newer version of the secret, or if the + // secret could not be found in the apiserver, update the cached data to + // reflect the current status. + data.secret = secret + data.err = err + data.lastUpdateTime = s.clock.Now() + } + } + return data.secret, data.err +} + +// NewCachingSecretManager creates a manager that keeps a cache of all secrets +// necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all its secrets +// are invalidated +// - every GetSecret() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { + secretStore := newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL) + return newCacheBasedSecretManager(secretStore) +} diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/caching_secret_manager_test.go similarity index 95% rename from pkg/kubelet/secret/secret_manager_test.go rename to pkg/kubelet/secret/caching_secret_manager_test.go index 36380e514cc..6d53043e29c 100644 --- a/pkg/kubelet/secret/secret_manager_test.go +++ b/pkg/kubelet/secret/caching_secret_manager_test.go @@ -53,13 +53,13 @@ func noObjectTTL() (time.Duration, bool) { func TestSecretStore(t *testing.T) { fakeClient := &fake.Clientset{} store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) - store.Add("ns1", "name1") - store.Add("ns2", "name2") - store.Add("ns1", "name1") - store.Add("ns1", "name1") - store.Delete("ns1", "name1") - store.Delete("ns2", "name2") - store.Add("ns3", "name3") + store.AddReference("ns1", "name1") + store.AddReference("ns2", "name2") + store.AddReference("ns1", "name1") + store.AddReference("ns1", "name1") + store.DeleteReference("ns1", "name1") + store.DeleteReference("ns2", "name2") + store.AddReference("ns3", "name3") // Adds don't issue Get requests. actions := fakeClient.Actions() @@ -87,7 +87,7 @@ func TestSecretStore(t *testing.T) { func TestSecretStoreDeletingSecret(t *testing.T) { fakeClient := &fake.Clientset{} store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) - store.Add("ns", "name") + store.AddReference("ns", "name") result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} fakeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { @@ -119,7 +119,7 @@ func TestSecretStoreGetAlwaysRefresh(t *testing.T) { store := newSecretStore(fakeClient, fakeClock, noObjectTTL, 0) for i := 0; i < 10; i++ { - store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) } fakeClient.ClearActions() @@ -146,7 +146,7 @@ func TestSecretStoreGetNeverRefresh(t *testing.T) { store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) for i := 0; i < 10; i++ { - store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) } fakeClient.ClearActions() @@ -175,7 +175,7 @@ func TestCustomTTL(t *testing.T) { fakeClock := clock.NewFakeClock(time.Time{}) store := newSecretStore(fakeClient, fakeClock, customTTL, time.Minute) - store.Add("ns", "name") + store.AddReference("ns", "name") store.Get("ns", "name") fakeClient.ClearActions() @@ -345,10 +345,7 @@ func TestCacheInvalidation(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) - manager := &cachingSecretManager{ - secretStore: store, - registeredPods: make(map[objectKey]*v1.Pod), - } + manager := newCacheBasedSecretManager(store) // Create a pod with some secrets. s1 := secretsToAttach{ @@ -403,10 +400,7 @@ func TestCacheRefcounts(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := clock.NewFakeClock(time.Now()) store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) - manager := &cachingSecretManager{ - secretStore: store, - registeredPods: make(map[objectKey]*v1.Pod), - } + manager := newCacheBasedSecretManager(store) s1 := secretsToAttach{ imagePullSecretNames: []string{"s1"}, @@ -490,10 +484,7 @@ func TestCacheRefcounts(t *testing.T) { func TestCachingSecretManager(t *testing.T) { fakeClient := &fake.Clientset{} secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) - manager := &cachingSecretManager{ - secretStore: secretStore, - registeredPods: make(map[objectKey]*v1.Pod), - } + manager := newCacheBasedSecretManager(secretStore) // Create a pod with some secrets. s1 := secretsToAttach{ diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index 3de230a9032..01def3296ea 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -17,27 +17,16 @@ limitations under the License. package secret import ( - "fmt" - "strconv" "sync" - "time" "k8s.io/api/core/v1" - storageetcd "k8s.io/apiserver/pkg/storage/etcd" clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/kubelet/util" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" ) -const ( - defaultTTL = time.Minute -) - type Manager interface { // Get secret by secret namespace and name. GetSecret(namespace, name string) (*v1.Secret, error) @@ -53,6 +42,11 @@ type Manager interface { UnregisterPod(pod *v1.Pod) } +type objectKey struct { + namespace string + name string +} + // simpleSecretManager implements SecretManager interfaces with // simple operations to apiserver. type simpleSecretManager struct { @@ -73,190 +67,41 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { } -type GetObjectTTLFunc func() (time.Duration, bool) - -type objectKey struct { - namespace string - name string +// store is the interface for a secrets cache that +// can be used by cacheBasedSecretManager. +type store interface { + // AddReference adds a reference to the secret to the store. + // Note that multiple additions to the store has to be allowed + // in the implementations and effectively treated as refcounted. + AddReference(namespace, name string) + // DeleteReference deletes reference to the secret from the store. + // Note that secret should be deleted only when there was a + // corresponding Delete call for each of Add calls (effectively + // when refcount was reduced to zero). + DeleteReference(namespace, name string) + // Get a secret from a store. + Get(namespace, name string) (*v1.Secret, error) } -// secretStoreItems is a single item stored in secretStore. -type secretStoreItem struct { - refCount int - secret *secretData -} - -type secretData struct { - sync.Mutex - - secret *v1.Secret - err error - lastUpdateTime time.Time -} - -// secretStore is a local cache of secrets. -type secretStore struct { - kubeClient clientset.Interface - clock clock.Clock - - lock sync.Mutex - items map[objectKey]*secretStoreItem - - defaultTTL time.Duration - getTTL GetObjectTTLFunc -} - -func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore { - return &secretStore{ - kubeClient: kubeClient, - clock: clock, - items: make(map[objectKey]*secretStoreItem), - defaultTTL: ttl, - getTTL: getTTL, - } -} - -func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { - if newSecret == nil || oldSecret == nil { - return false - } - newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) - oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) - return newVersion < oldVersion -} - -func (s *secretStore) Add(namespace, name string) { - key := objectKey{namespace: namespace, name: name} - - // Add is called from RegisterPod, thus it needs to be efficient. - // Thus Add() is only increasing refCount and generation of a given secret. - // Then Get() is responsible for fetching if needed. - s.lock.Lock() - defer s.lock.Unlock() - item, exists := s.items[key] - if !exists { - item = &secretStoreItem{ - refCount: 0, - secret: &secretData{}, - } - s.items[key] = item - } - - item.refCount++ - // This will trigger fetch on the next Get() operation. - item.secret = nil -} - -func (s *secretStore) Delete(namespace, name string) { - key := objectKey{namespace: namespace, name: name} - - s.lock.Lock() - defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - item.refCount-- - if item.refCount == 0 { - delete(s.items, key) - } - } -} - -func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { - return func() (time.Duration, bool) { - node, err := getNode() - if err != nil { - return time.Duration(0), false - } - if node != nil && node.Annotations != nil { - if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { - if intValue, err := strconv.Atoi(value); err == nil { - return time.Duration(intValue) * time.Second, true - } - } - } - return time.Duration(0), false - } -} - -func (s *secretStore) isSecretFresh(data *secretData) bool { - secretTTL := s.defaultTTL - if ttl, ok := s.getTTL(); ok { - secretTTL = ttl - } - return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL)) -} - -func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { - key := objectKey{namespace: namespace, name: name} - - data := func() *secretData { - s.lock.Lock() - defer s.lock.Unlock() - item, exists := s.items[key] - if !exists { - return nil - } - if item.secret == nil { - item.secret = &secretData{} - } - return item.secret - }() - if data == nil { - return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) - } - - // After updating data in secretStore, lock the data, fetch secret if - // needed and return data. - data.Lock() - defer data.Unlock() - if data.err != nil || !s.isSecretFresh(data) { - opts := metav1.GetOptions{} - if data.secret != nil && data.err == nil { - // This is just a periodic refresh of a secret we successfully fetched previously. - // In this case, server data from apiserver cache to reduce the load on both - // etcd and apiserver (the cache is eventually consistent). - util.FromApiserverCache(&opts) - } - secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts) - if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil { - // Couldn't fetch the latest secret, but there is no cached data to return. - // Return the fetch result instead. - return secret, err - } - if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) { - // If the fetch succeeded with a newer version of the secret, or if the - // secret could not be found in the apiserver, update the cached data to - // reflect the current status. - data.secret = secret - data.err = err - data.lastUpdateTime = s.clock.Now() - } - } - return data.secret, data.err -} - -// cachingSecretManager keeps a cache of all secrets necessary for registered pods. -// It implements the following logic: -// - whenever a pod is created or updated, the cached versions of all its secrets -// are invalidated -// - every GetSecret() call tries to fetch the value from local cache; if it is -// not there, invalidated or too old, we fetch it from apiserver and refresh the -// value in cache; otherwise it is just fetched from cache -type cachingSecretManager struct { - secretStore *secretStore +// cachingBasedSecretManager keeps a store with secrets necessary +// for registered pods. Different implementations of the store +// may result in different semantics for freshness of secrets +// (e.g. ttl-based implementation vs watch-based implementation). +type cacheBasedSecretManager struct { + secretStore store lock sync.Mutex registeredPods map[objectKey]*v1.Pod } -func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { - csm := &cachingSecretManager{ - secretStore: newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL), +func newCacheBasedSecretManager(secretStore store) Manager { + return &cacheBasedSecretManager{ + secretStore: secretStore, registeredPods: make(map[objectKey]*v1.Pod), } - return csm } -func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { +func (c *cacheBasedSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { return c.secretStore.Get(namespace, name) } @@ -269,12 +114,12 @@ func getSecretNames(pod *v1.Pod) sets.String { return result } -func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { +func (c *cacheBasedSecretManager) RegisterPod(pod *v1.Pod) { names := getSecretNames(pod) c.lock.Lock() defer c.lock.Unlock() for name := range names { - c.secretStore.Add(pod.Namespace, name) + c.secretStore.AddReference(pod.Namespace, name) } var prev *v1.Pod key := objectKey{namespace: pod.Namespace, name: pod.Name} @@ -287,12 +132,12 @@ func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { // names and prev need to have their ref counts decremented. Any that // are only in prev need to be completely removed. This unconditional // call takes care of both cases. - c.secretStore.Delete(prev.Namespace, name) + c.secretStore.DeleteReference(prev.Namespace, name) } } } -func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { +func (c *cacheBasedSecretManager) UnregisterPod(pod *v1.Pod) { var prev *v1.Pod key := objectKey{namespace: pod.Namespace, name: pod.Name} c.lock.Lock() @@ -301,7 +146,7 @@ func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { delete(c.registeredPods, key) if prev != nil { for name := range getSecretNames(prev) { - c.secretStore.Delete(prev.Namespace, name) + c.secretStore.DeleteReference(prev.Namespace, name) } } }