diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 6454c48d8c8..b4d6d166fa9 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -83,6 +83,7 @@ go_library( "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/kubelet/util/manager:go_default_library", "//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/kubelet/volumemanager:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2bae0115356..fb0f5321726 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -92,6 +92,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/sysctl" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/kubelet/util/manager" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/volumemanager" @@ -542,7 +543,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } secretManager := secret.NewCachingSecretManager( - kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode)) + kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) klet.secretManager = secretManager configMapManager := configmap.NewCachingConfigMapManager( diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD index 84632b98b47..4b39334d0bb 100644 --- a/pkg/kubelet/secret/BUILD +++ b/pkg/kubelet/secret/BUILD @@ -8,37 +8,34 @@ load( go_test( name = "go_default_test", - srcs = ["caching_secret_manager_test.go"], + srcs = ["secret_manager_test.go"], embed = [":go_default_library"], deps = [ - "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//pkg/kubelet/util/manager:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", ], ) go_library( name = "go_default_library", srcs = [ - "caching_secret_manager.go", "fake_manager.go", "secret_manager.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/secret", deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/kubelet/util:go_default_library", + "//pkg/kubelet/util/manager:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/kubelet/secret/caching_secret_manager.go b/pkg/kubelet/secret/caching_secret_manager.go deleted file mode 100644 index 45756ad2ff0..00000000000 --- a/pkg/kubelet/secret/caching_secret_manager.go +++ /dev/null @@ -1,206 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package secret - -import ( - "fmt" - "strconv" - "sync" - "time" - - "k8s.io/api/core/v1" - storageetcd "k8s.io/apiserver/pkg/storage/etcd" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/kubelet/util" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/clock" -) - -const ( - defaultTTL = time.Minute -) - -type GetObjectTTLFunc func() (time.Duration, bool) - -// secretStoreItems is a single item stored in secretStore. -type secretStoreItem struct { - refCount int - secret *secretData -} - -type secretData struct { - sync.Mutex - - secret *v1.Secret - err error - lastUpdateTime time.Time -} - -// secretStore is a local cache of secrets. -type secretStore struct { - kubeClient clientset.Interface - clock clock.Clock - - lock sync.Mutex - items map[objectKey]*secretStoreItem - - defaultTTL time.Duration - getTTL GetObjectTTLFunc -} - -func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore { - return &secretStore{ - kubeClient: kubeClient, - clock: clock, - items: make(map[objectKey]*secretStoreItem), - defaultTTL: ttl, - getTTL: getTTL, - } -} - -func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { - if newSecret == nil || oldSecret == nil { - return false - } - newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) - oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) - return newVersion < oldVersion -} - -func (s *secretStore) AddReference(namespace, name string) { - key := objectKey{namespace: namespace, name: name} - - // AddReference is called from RegisterPod, thus it needs to be efficient. - // Thus Add() is only increasing refCount and generation of a given secret. - // Then Get() is responsible for fetching if needed. - s.lock.Lock() - defer s.lock.Unlock() - item, exists := s.items[key] - if !exists { - item = &secretStoreItem{ - refCount: 0, - secret: &secretData{}, - } - s.items[key] = item - } - - item.refCount++ - // This will trigger fetch on the next Get() operation. - item.secret = nil -} - -func (s *secretStore) DeleteReference(namespace, name string) { - key := objectKey{namespace: namespace, name: name} - - s.lock.Lock() - defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - item.refCount-- - if item.refCount == 0 { - delete(s.items, key) - } - } -} - -func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { - return func() (time.Duration, bool) { - node, err := getNode() - if err != nil { - return time.Duration(0), false - } - if node != nil && node.Annotations != nil { - if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { - if intValue, err := strconv.Atoi(value); err == nil { - return time.Duration(intValue) * time.Second, true - } - } - } - return time.Duration(0), false - } -} - -func (s *secretStore) isSecretFresh(data *secretData) bool { - secretTTL := s.defaultTTL - if ttl, ok := s.getTTL(); ok { - secretTTL = ttl - } - return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL)) -} - -func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { - key := objectKey{namespace: namespace, name: name} - - data := func() *secretData { - s.lock.Lock() - defer s.lock.Unlock() - item, exists := s.items[key] - if !exists { - return nil - } - if item.secret == nil { - item.secret = &secretData{} - } - return item.secret - }() - if data == nil { - return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) - } - - // After updating data in secretStore, lock the data, fetch secret if - // needed and return data. - data.Lock() - defer data.Unlock() - if data.err != nil || !s.isSecretFresh(data) { - opts := metav1.GetOptions{} - if data.secret != nil && data.err == nil { - // This is just a periodic refresh of a secret we successfully fetched previously. - // In this case, server data from apiserver cache to reduce the load on both - // etcd and apiserver (the cache is eventually consistent). - util.FromApiserverCache(&opts) - } - secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts) - if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil { - // Couldn't fetch the latest secret, but there is no cached data to return. - // Return the fetch result instead. - return secret, err - } - if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) { - // If the fetch succeeded with a newer version of the secret, or if the - // secret could not be found in the apiserver, update the cached data to - // reflect the current status. - data.secret = secret - data.err = err - data.lastUpdateTime = s.clock.Now() - } - } - return data.secret, data.err -} - -// NewCachingSecretManager creates a manager that keeps a cache of all secrets -// necessary for registered pods. -// It implements the following logic: -// - whenever a pod is created or updated, the cached versions of all its secrets -// are invalidated -// - every GetSecret() call tries to fetch the value from local cache; if it is -// not there, invalidated or too old, we fetch it from apiserver and refresh the -// value in cache; otherwise it is just fetched from cache -func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { - secretStore := newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL) - return newCacheBasedSecretManager(secretStore) -} diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index 01def3296ea..d59260447fd 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -17,13 +17,17 @@ limitations under the License. package secret import ( - "sync" + "fmt" + "time" "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/kubelet/util/manager" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" ) @@ -42,11 +46,6 @@ type Manager interface { UnregisterPod(pod *v1.Pod) } -type objectKey struct { - namespace string - name string -} - // simpleSecretManager implements SecretManager interfaces with // simple operations to apiserver. type simpleSecretManager struct { @@ -67,42 +66,31 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { } -// store is the interface for a secrets cache that -// can be used by cacheBasedSecretManager. -type store interface { - // AddReference adds a reference to the secret to the store. - // Note that multiple additions to the store has to be allowed - // in the implementations and effectively treated as refcounted. - AddReference(namespace, name string) - // DeleteReference deletes reference to the secret from the store. - // Note that secret should be deleted only when there was a - // corresponding Delete call for each of Add calls (effectively - // when refcount was reduced to zero). - DeleteReference(namespace, name string) - // Get a secret from a store. - Get(namespace, name string) (*v1.Secret, error) -} - -// cachingBasedSecretManager keeps a store with secrets necessary +// cachingSecretManager keeps a store with secrets necessary // for registered pods. Different implementations of the store // may result in different semantics for freshness of secrets // (e.g. ttl-based implementation vs watch-based implementation). -type cacheBasedSecretManager struct { - secretStore store - - lock sync.Mutex - registeredPods map[objectKey]*v1.Pod +type cachingSecretManager struct { + manager manager.Manager } -func newCacheBasedSecretManager(secretStore store) Manager { - return &cacheBasedSecretManager{ - secretStore: secretStore, - registeredPods: make(map[objectKey]*v1.Pod), +func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { + object, err := c.manager.GetObject(namespace, name) + if err != nil { + return nil, err } + if secret, ok := object.(*v1.Secret); ok { + return secret, nil + } + return nil, fmt.Errorf("unexpected object type: %v", object) } -func (c *cacheBasedSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { - return c.secretStore.Get(namespace, name) +func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { + c.manager.RegisterPod(pod) +} + +func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { + c.manager.UnregisterPod(pod) } func getSecretNames(pod *v1.Pod) sets.String { @@ -114,39 +102,24 @@ func getSecretNames(pod *v1.Pod) sets.String { return result } -func (c *cacheBasedSecretManager) RegisterPod(pod *v1.Pod) { - names := getSecretNames(pod) - c.lock.Lock() - defer c.lock.Unlock() - for name := range names { - c.secretStore.AddReference(pod.Namespace, name) - } - var prev *v1.Pod - key := objectKey{namespace: pod.Namespace, name: pod.Name} - prev = c.registeredPods[key] - c.registeredPods[key] = pod - if prev != nil { - for name := range getSecretNames(prev) { - // On an update, the .Add() call above will have re-incremented the - // ref count of any existing secrets, so any secrets that are in both - // names and prev need to have their ref counts decremented. Any that - // are only in prev need to be completely removed. This unconditional - // call takes care of both cases. - c.secretStore.DeleteReference(prev.Namespace, name) - } - } -} +const ( + defaultTTL = time.Minute +) -func (c *cacheBasedSecretManager) UnregisterPod(pod *v1.Pod) { - var prev *v1.Pod - key := objectKey{namespace: pod.Namespace, name: pod.Name} - c.lock.Lock() - defer c.lock.Unlock() - prev = c.registeredPods[key] - delete(c.registeredPods, key) - if prev != nil { - for name := range getSecretNames(prev) { - c.secretStore.DeleteReference(prev.Namespace, name) - } +// NewCacheBasedManager creates a manager that keeps a cache of all secrets +// necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all secrets +// are invalidated +// - every GetObject() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager { + getSecret := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { + return kubeClient.CoreV1().Secrets(namespace).Get(name, opts) + } + secretStore := manager.NewObjectStore(getSecret, clock.RealClock{}, getTTL, defaultTTL) + return &cachingSecretManager{ + manager: manager.NewCacheBasedManager(secretStore, getSecretNames), } } diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go new file mode 100644 index 00000000000..4295246935f --- /dev/null +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "strings" + "testing" + "time" + + "k8s.io/api/core/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/kubelet/util/manager" +) + +func checkObject(t *testing.T, store manager.Store, ns, name string, shouldExist bool) { + _, err := store.Get(ns, name) + if shouldExist && err != nil { + t.Errorf("unexpected actions: %#v", err) + } + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) { + t.Errorf("unexpected actions: %#v", err) + } +} + +func noObjectTTL() (time.Duration, bool) { + return time.Duration(0), false +} + +func getSecret(fakeClient clientset.Interface) manager.GetObjectFunc { + return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { + return fakeClient.CoreV1().Secrets(namespace).Get(name, opts) + } +} + +type envSecrets struct { + envVarNames []string + envFromNames []string +} + +type secretsToAttach struct { + imagePullSecretNames []string + containerEnvSecrets []envSecrets +} + +func podWithSecrets(ns, podName string, toAttach secretsToAttach) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: podName, + }, + Spec: v1.PodSpec{}, + } + for _, name := range toAttach.imagePullSecretNames { + pod.Spec.ImagePullSecrets = append( + pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name}) + } + for i, secrets := range toAttach.containerEnvSecrets { + container := v1.Container{ + Name: fmt.Sprintf("container-%d", i), + } + for _, name := range secrets.envFromNames { + envFrom := v1.EnvFromSource{ + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.EnvFrom = append(container.EnvFrom, envFrom) + } + + for _, name := range secrets.envVarNames { + envSource := &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource}) + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + return pod +} + +func TestCacheBasedSecretManager(t *testing.T) { + fakeClient := &fake.Clientset{} + store := manager.NewObjectStore(getSecret(fakeClient), clock.RealClock{}, noObjectTTL, 0) + manager := &cachingSecretManager{ + manager: manager.NewCacheBasedManager(store, getSecretNames), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecrets: []envSecrets{ + {envVarNames: []string{"s1"}}, + {envVarNames: []string{"s2"}}, + {envFromNames: []string{"s20"}}, + }, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Update the pod with a different secrets. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecrets: []envSecrets{ + {envVarNames: []string{"s3"}}, + {envVarNames: []string{"s4"}}, + {envFromNames: []string{"s40"}}, + }, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // Create another pod, but with same secrets in different namespace. + manager.RegisterPod(podWithSecrets("ns2", "name2", s2)) + // Create and delete a pod with some other secrets. + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s5"}, + containerEnvSecrets: []envSecrets{ + {envVarNames: []string{"s6"}}, + {envFromNames: []string{"s60"}}, + }, + } + manager.RegisterPod(podWithSecrets("ns3", "name", s3)) + manager.UnregisterPod(podWithSecrets("ns3", "name", s3)) + + // We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2. + for _, ns := range []string{"ns1", "ns2", "ns3"} { + for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} { + shouldExist := + (secret == "s1" || secret == "s3" || secret == "s4" || secret == "s40") && (ns == "ns1" || ns == "ns2") + checkObject(t, store, ns, secret, shouldExist) + } + } +} diff --git a/pkg/kubelet/util/BUILD b/pkg/kubelet/util/BUILD index e76e870bb34..df02ebdcd25 100644 --- a/pkg/kubelet/util/BUILD +++ b/pkg/kubelet/util/BUILD @@ -92,6 +92,7 @@ filegroup( "//pkg/kubelet/util/cache:all-srcs", "//pkg/kubelet/util/format:all-srcs", "//pkg/kubelet/util/ioutils:all-srcs", + "//pkg/kubelet/util/manager:all-srcs", "//pkg/kubelet/util/queue:all-srcs", "//pkg/kubelet/util/sliceutils:all-srcs", "//pkg/kubelet/util/store:all-srcs", diff --git a/pkg/kubelet/util/manager/BUILD b/pkg/kubelet/util/manager/BUILD new file mode 100644 index 00000000000..60005033ded --- /dev/null +++ b/pkg/kubelet/util/manager/BUILD @@ -0,0 +1,54 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "cache_based_manager.go", + "manager.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/util:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["cache_based_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/api/v1/pod:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/util/manager/cache_based_manager.go b/pkg/kubelet/util/manager/cache_based_manager.go new file mode 100644 index 00000000000..f8c1100f47b --- /dev/null +++ b/pkg/kubelet/util/manager/cache_based_manager.go @@ -0,0 +1,272 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "fmt" + "strconv" + "sync" + "time" + + "k8s.io/api/core/v1" + storageetcd "k8s.io/apiserver/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/kubelet/util" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" +) + +// GetObjectTTLFunc defines a function to get value of TTL. +type GetObjectTTLFunc func() (time.Duration, bool) + +// GetObjectFunc defines a function to get object with a given namespace and name. +type GetObjectFunc func(string, string, metav1.GetOptions) (runtime.Object, error) + +type objectKey struct { + namespace string + name string +} + +// objectStoreItems is a single item stored in objectStore. +type objectStoreItem struct { + refCount int + data *objectData +} + +type objectData struct { + sync.Mutex + + object runtime.Object + err error + lastUpdateTime time.Time +} + +// objectStore is a local cache of objects. +type objectStore struct { + getObject GetObjectFunc + clock clock.Clock + + lock sync.Mutex + items map[objectKey]*objectStoreItem + + defaultTTL time.Duration + getTTL GetObjectTTLFunc +} + +// NewObjectStore returns a new ttl-based instance of Store interface. +func NewObjectStore(getObject GetObjectFunc, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) Store { + return &objectStore{ + getObject: getObject, + clock: clock, + items: make(map[objectKey]*objectStoreItem), + defaultTTL: ttl, + getTTL: getTTL, + } +} + +func isObjectOlder(newObject, oldObject runtime.Object) bool { + if newObject == nil || oldObject == nil { + return false + } + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newObject) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldObject) + return newVersion < oldVersion +} + +func (s *objectStore) AddReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // AddReference is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given object. + // Then Get() is responsible for fetching if needed. + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + item = &objectStoreItem{ + refCount: 0, + data: &objectData{}, + } + s.items[key] = item + } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.data = nil +} + +func (s *objectStore) DeleteReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +// GetObjectTTLFromNodeFunc returns a function that returns TTL value +// from a given Node object. +func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { + return func() (time.Duration, bool) { + node, err := getNode() + if err != nil { + return time.Duration(0), false + } + if node != nil && node.Annotations != nil { + if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { + if intValue, err := strconv.Atoi(value); err == nil { + return time.Duration(intValue) * time.Second, true + } + } + } + return time.Duration(0), false + } +} + +func (s *objectStore) isObjectFresh(data *objectData) bool { + objectTTL := s.defaultTTL + if ttl, ok := s.getTTL(); ok { + objectTTL = ttl + } + return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL)) +} + +func (s *objectStore) Get(namespace, name string) (runtime.Object, error) { + key := objectKey{namespace: namespace, name: name} + + data := func() *objectData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil + } + if item.data == nil { + item.data = &objectData{} + } + return item.data + }() + if data == nil { + return nil, fmt.Errorf("object %q/%q not registered", namespace, name) + } + + // After updating data in objectStore, lock the data, fetch object if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.isObjectFresh(data) { + opts := metav1.GetOptions{} + if data.object != nil && data.err == nil { + // This is just a periodic refresh of an object we successfully fetched previously. + // In this case, server data from apiserver cache to reduce the load on both + // etcd and apiserver (the cache is eventually consistent). + util.FromApiserverCache(&opts) + } + + object, err := s.getObject(namespace, name, opts) + if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil { + // Couldn't fetch the latest object, but there is no cached data to return. + // Return the fetch result instead. + return object, err + } + if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) { + // If the fetch succeeded with a newer version of the object, or if the + // object could not be found in the apiserver, update the cached data to + // reflect the current status. + data.object = object + data.err = err + data.lastUpdateTime = s.clock.Now() + } + } + return data.object, data.err +} + +// cacheBasedManager keeps a store with objects necessary +// for registered pods. Different implementations of the store +// may result in different semantics for freshness of objects +// (e.g. ttl-based implementation vs watch-based implementation). +type cacheBasedManager struct { + objectStore Store + getReferencedObjects func(*v1.Pod) sets.String + + lock sync.Mutex + registeredPods map[objectKey]*v1.Pod +} + +func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) { + return c.objectStore.Get(namespace, name) +} + +func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) { + names := c.getReferencedObjects(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.objectStore.AddReference(pod.Namespace, name) + } + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod + if prev != nil { + for name := range c.getReferencedObjects(prev) { + // On an update, the .Add() call above will have re-incremented the + // ref count of any existing object, so any objects that are in both + // names and prev need to have their ref counts decremented. Any that + // are only in prev need to be completely removed. This unconditional + // call takes care of both cases. + c.objectStore.DeleteReference(prev.Namespace, name) + } + } +} + +func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) { + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) + if prev != nil { + for name := range c.getReferencedObjects(prev) { + c.objectStore.DeleteReference(prev.Namespace, name) + } + } +} + +// NewCacheBasedManager creates a manager that keeps a cache of all objects +// necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all objects +// is is referencing are invalidated +// - every GetObject() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +func NewCacheBasedManager(objectStore Store, getReferencedObjects func(*v1.Pod) sets.String) Manager { + return &cacheBasedManager{ + objectStore: objectStore, + getReferencedObjects: getReferencedObjects, + registeredPods: make(map[objectKey]*v1.Pod), + } +} diff --git a/pkg/kubelet/secret/caching_secret_manager_test.go b/pkg/kubelet/util/manager/cache_based_manager_test.go similarity index 90% rename from pkg/kubelet/secret/caching_secret_manager_test.go rename to pkg/kubelet/util/manager/cache_based_manager_test.go index 6d53043e29c..c21584df50a 100644 --- a/pkg/kubelet/secret/caching_secret_manager_test.go +++ b/pkg/kubelet/util/manager/cache_based_manager_test.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package secret +package manager import ( "fmt" @@ -25,23 +25,27 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes/fake" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" + + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "github.com/stretchr/testify/assert" ) -func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) { +func checkObject(t *testing.T, store *objectStore, ns, name string, shouldExist bool) { _, err := store.Get(ns, name) if shouldExist && err != nil { t.Errorf("unexpected actions: %#v", err) } - if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) { + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) { t.Errorf("unexpected actions: %#v", err) } } @@ -50,6 +54,35 @@ func noObjectTTL() (time.Duration, bool) { return time.Duration(0), false } +func getSecret(fakeClient clientset.Interface) GetObjectFunc { + return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { + return fakeClient.CoreV1().Secrets(namespace).Get(name, opts) + } +} + +func newSecretStore(fakeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *objectStore { + return &objectStore{ + getObject: getSecret(fakeClient), + clock: clock, + items: make(map[objectKey]*objectStoreItem), + defaultTTL: ttl, + getTTL: getTTL, + } +} + +func getSecretNames(pod *v1.Pod) sets.String { + result := sets.NewString() + podutil.VisitPodSecretNames(pod, func(name string) bool { + result.Insert(name) + return true + }) + return result +} + +func newCacheBasedSecretManager(store Store) Manager { + return NewCacheBasedManager(store, getSecretNames) +} + func TestSecretStore(t *testing.T) { fakeClient := &fake.Clientset{} store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) @@ -78,10 +111,10 @@ func TestSecretStore(t *testing.T) { assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) } - checkSecret(t, store, "ns1", "name1", true) - checkSecret(t, store, "ns2", "name2", false) - checkSecret(t, store, "ns3", "name3", true) - checkSecret(t, store, "ns4", "name4", false) + checkObject(t, store, "ns1", "name1", true) + checkObject(t, store, "ns2", "name2", false) + checkObject(t, store, "ns3", "name3", true) + checkObject(t, store, "ns4", "name4", false) } func TestSecretStoreDeletingSecret(t *testing.T) { @@ -481,10 +514,10 @@ func TestCacheRefcounts(t *testing.T) { assert.Equal(t, 1, refs("ns1", "s70")) } -func TestCachingSecretManager(t *testing.T) { +func TestCacheBasedSecretManager(t *testing.T) { fakeClient := &fake.Clientset{} - secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) - manager := newCacheBasedSecretManager(secretStore) + store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) + manager := newCacheBasedSecretManager(store) // Create a pod with some secrets. s1 := secretsToAttach{ @@ -524,7 +557,7 @@ func TestCachingSecretManager(t *testing.T) { for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} { shouldExist := (secret == "s1" || secret == "s3" || secret == "s4" || secret == "s40") && (ns == "ns1" || ns == "ns2") - checkSecret(t, secretStore, ns, secret, shouldExist) + checkObject(t, store, ns, secret, shouldExist) } } } diff --git a/pkg/kubelet/util/manager/manager.go b/pkg/kubelet/util/manager/manager.go new file mode 100644 index 00000000000..4d4b958d0a6 --- /dev/null +++ b/pkg/kubelet/util/manager/manager.go @@ -0,0 +1,56 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// Manager is the interface for registering and unregistering +// objects referenced by pods in the underlying cache and +// extracting those from that cache if needed. +type Manager interface { + // Get object by its namespace and name. + GetObject(namespace, name string) (runtime.Object, error) + + // WARNING: Register/UnregisterPod functions should be efficient, + // i.e. should not block on network operations. + + // RegisterPod registers all objects referenced from a given pod. + RegisterPod(pod *v1.Pod) + + // UnregisterPod unregisters objects referenced from a given pod that are not + // used by any other registered pod. + UnregisterPod(pod *v1.Pod) +} + +// Store is the interface for a object cache that +// can be used by cacheBasedManager. +type Store interface { + // AddReference adds a reference to the object to the store. + // Note that multiple additions to the store has to be allowed + // in the implementations and effectively treated as refcounted. + AddReference(namespace, name string) + // DeleteReference deletes reference to the object from the store. + // Note that object should be deleted only when there was a + // corresponding Delete call for each of Add calls (effectively + // when refcount was reduced to zero). + DeleteReference(namespace, name string) + // Get an object from a store. + Get(namespace, name string) (runtime.Object, error) +}