From 85ee9e570b40becd7b77668f193129510c55b551 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 17 Nov 2016 12:22:11 +0100 Subject: [PATCH 1/4] Create SecretManager interface --- pkg/kubelet/BUILD | 1 + pkg/kubelet/kubelet.go | 10 +++++++++ pkg/kubelet/kubelet_pods.go | 7 ++---- pkg/kubelet/secret_manager.go | 41 +++++++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 pkg/kubelet/secret_manager.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index ef058c2bc05..0136ee16540 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -29,6 +29,7 @@ go_library( "reason_cache.go", "runonce.go", "runtime.go", + "secret_manager.go", "util.go", "volume_host.go", ], diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 842235b5ff5..ecb7dbd3495 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -409,6 +409,12 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } containerRefManager := kubecontainer.NewRefManager() + // TODO: Create and use a more sophisticated secret mamanger. + secretManager, err := newSimpleSecretManager(kubeClient) + if err != nil { + return nil, fmt.Errorf("failed to initialize secret manager: %v", err) + } + oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder) klet := &Kubelet{ @@ -434,6 +440,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub recorder: kubeDeps.Recorder, cadvisor: kubeDeps.CAdvisorInterface, diskSpaceManager: diskSpaceManager, + secretManager: secretManager, cloud: kubeDeps.Cloud, autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider), nodeRef: nodeRef, @@ -913,6 +920,9 @@ type Kubelet struct { // Diskspace manager. diskSpaceManager diskSpaceManager + // Secret manager. + secretManager secretManager + // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorapi.MachineInfo diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index da53ca6afb8..7ce3a24dffe 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -524,7 +524,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container if kl.kubeClient == nil { return result, fmt.Errorf("Couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name) } - secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name, metav1.GetOptions{}) + secret, err = kl.secretManager.GetSecret(pod.Namespace, name) if err != nil { return result, err } @@ -638,14 +638,11 @@ func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error { // getPullSecretsForPod inspects the Pod and retrieves the referenced pull // secrets. -// TODO: duplicate secrets are being retrieved multiple times and there -// is no cache. Creating and using a secret manager interface will make this -// easier to address. func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) ([]v1.Secret, error) { pullSecrets := []v1.Secret{} for _, secretRef := range pod.Spec.ImagePullSecrets { - secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name, metav1.GetOptions{}) + secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name) if err != nil { glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err) continue diff --git a/pkg/kubelet/secret_manager.go b/pkg/kubelet/secret_manager.go new file mode 100644 index 00000000000..644f6cf7010 --- /dev/null +++ b/pkg/kubelet/secret_manager.go @@ -0,0 +1,41 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" +) + +type secretManager interface { + // Get secret by secret namespace and name. + GetSecret(namespace, name string) (*api.Secret, error) +} + +// simpleSecretManager implements SecretManager interfaces with +// simple operations to apiserver. +type simpleSecretManager struct { + kubeClient clientset.Interface +} + +func newSimpleSecretManager(kubeClient clientset.Interface) (secretManager, error) { + return &simpleSecretManager{kubeClient: kubeClient}, nil +} + +func (s *simpleSecretManager) GetSecret(namespace, name string) (*api.Secret, error) { + return s.kubeClient.Core().Secrets(namespace).Get(name) +} From ffd8daf4888e09c29fb136cade47b2d97b4711d0 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 18 Nov 2016 13:14:03 +0100 Subject: [PATCH 2/4] SecretManager with caching --- pkg/kubelet/BUILD | 2 +- pkg/kubelet/kubelet.go | 6 +- pkg/kubelet/secret/BUILD | 39 ++++ pkg/kubelet/secret/secret_manager.go | 256 ++++++++++++++++++++++ pkg/kubelet/secret/secret_manager_test.go | 161 ++++++++++++++ pkg/kubelet/secret_manager.go | 41 ---- 6 files changed, 460 insertions(+), 45 deletions(-) create mode 100644 pkg/kubelet/secret/BUILD create mode 100644 pkg/kubelet/secret/secret_manager.go create mode 100644 pkg/kubelet/secret/secret_manager_test.go delete mode 100644 pkg/kubelet/secret_manager.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 0136ee16540..b9e2315fd2b 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -29,7 +29,6 @@ go_library( "reason_cache.go", "runonce.go", "runtime.go", - "secret_manager.go", "util.go", "volume_host.go", ], @@ -72,6 +71,7 @@ go_library( "//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/remote:go_default_library", "//pkg/kubelet/rkt:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ecb7dbd3495..55eee92e0d4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -73,6 +73,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/remote" "k8s.io/kubernetes/pkg/kubelet/rkt" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -409,8 +410,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } containerRefManager := kubecontainer.NewRefManager() - // TODO: Create and use a more sophisticated secret mamanger. - secretManager, err := newSimpleSecretManager(kubeClient) + secretManager, err := secret.NewSimpleSecretManager(kubeClient) if err != nil { return nil, fmt.Errorf("failed to initialize secret manager: %v", err) } @@ -921,7 +921,7 @@ type Kubelet struct { diskSpaceManager diskSpaceManager // Secret manager. - secretManager secretManager + secretManager secret.Manager // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorapi.MachineInfo diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD new file mode 100644 index 00000000000..16aec480ad2 --- /dev/null +++ b/pkg/kubelet/secret/BUILD @@ -0,0 +1,39 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["secret_manager_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "fake_manager.go", + "secret_manager.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api/errors:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/apis/meta/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/storage/etcd:go_default_library", + "//pkg/util/sets:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go new file mode 100644 index 00000000000..d6a08822041 --- /dev/null +++ b/pkg/kubelet/secret/secret_manager.go @@ -0,0 +1,256 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "sync" + "time" + + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/v1" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + storageetcd "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" +) + +type Manager interface { + // Get secret by secret namespace and name. + GetSecret(namespace, name string) (*v1.Secret, error) + + // RegisterPod registers all secrets from a given pod. + RegisterPod(pod *v1.Pod) + + // UnregisterPod unregisters secrets from a given pod that are not + // registered still by any other registered pod. + UnregisterPod(pod *v1.Pod) +} + +// simpleSecretManager implements SecretManager interfaces with +// simple operations to apiserver. +type simpleSecretManager struct { + kubeClient clientset.Interface +} + +func NewSimpleSecretManager(kubeClient clientset.Interface) (Manager, error) { + return &simpleSecretManager{kubeClient: kubeClient}, nil +} + +func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) +} + +func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { +} + +func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { +} + +type objectKey struct { + namespace string + name string +} + +// secretStoreItems is a single item stored in secretStore. +type secretStoreItem struct { + secret *v1.Secret + err error + refCount int +} + +// secretStore is a local cache of secrets. +type secretStore struct { + kubeClient clientset.Interface + + lock sync.Mutex + items map[objectKey]*secretStoreItem +} + +func newSecretStore(kubeClient clientset.Interface) *secretStore { + return &secretStore{ + kubeClient: kubeClient, + items: make(map[objectKey]*secretStoreItem), + } +} + +func (s *secretStore) Add(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.secret = secret + item.err = err + item.refCount++ + } else { + s.items[key] = &secretStoreItem{secret: secret, err: err, refCount: 1} + } +} + +func (s *secretStore) Delete(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + return item.secret, item.err + } + return nil, fmt.Errorf("secret not registered") +} + +func (s *secretStore) Refresh() { + s.lock.Lock() + keys := make([]objectKey, 0, len(s.items)) + for key := range s.items { + keys = append(keys, key) + } + s.lock.Unlock() + + type result struct { + secret *v1.Secret + err error + } + results := make([]result, 0, len(keys)) + for _, key := range keys { + secret, err := s.kubeClient.Core().Secrets(key.namespace).Get(key.name, metav1.GetOptions{}) + if err != nil { + glog.Warningf("Unable to retrieve a secret %s/%s: %v", key.namespace, key.name, err) + } + results = append(results, result{secret: secret, err: err}) + } + + s.lock.Lock() + defer s.lock.Unlock() + for i, key := range keys { + secret := results[i].secret + err := results[i].err + if err != nil && !apierrors.IsNotFound(err) { + // If we couldn't retrieve a secret and it wasn't 404 error, skip updating. + continue + } + if item, ok := s.items[key]; ok { + if secret != nil && item.secret != nil { + // If the fetched version is not newer than the current one (such races are + // possible), then skip update. + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(secret) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(item.secret) + if newVersion <= oldVersion { + continue + } + } + item.secret = secret + item.err = err + } + } +} + +// cachingSecretManager keeps a cache of all secrets necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the current versions of all its secrets +// are grabbed from apiserver and stored in local cache +// - every GetSecret call is served from local cache +// - every X seconds we are refreshing the local cache by grabbing current version +// of all registered secrets from apiserver +type cachingSecretManager struct { + secretStore *secretStore + + lock sync.Mutex + registeredPods map[objectKey]*v1.Pod +} + +func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { + csm := &cachingSecretManager{ + secretStore: newSecretStore(kubeClient), + registeredPods: make(map[objectKey]*v1.Pod), + } + go wait.NonSlidingUntil(func() { csm.secretStore.Refresh() }, time.Minute, wait.NeverStop) + return csm, nil +} + +func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return c.secretStore.Get(namespace, name) +} + +// TODO: Before we will use secretManager in other places (e.g. for secret volumes) +// we should update this function to also get secrets from those places. +func getSecretNames(pod *v1.Pod) sets.String { + result := sets.NewString() + for _, reference := range pod.Spec.ImagePullSecrets { + result.Insert(reference.Name) + } + for i := range pod.Spec.Containers { + for _, envVar := range pod.Spec.Containers[i].Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil { + result.Insert(envVar.ValueFrom.SecretKeyRef.Name) + } + } + } + return result +} + +func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { + for key := range getSecretNames(pod) { + c.secretStore.Add(pod.Namespace, key) + } + var prev *v1.Pod + func() { + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + c.registeredPods[key] = pod + }() + if prev != nil { + for key := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, key) + } + } +} + +func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { + var prev *v1.Pod + func() { + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) + }() + if prev != nil { + for key := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, key) + } + } +} diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go new file mode 100644 index 00000000000..d4142e85031 --- /dev/null +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" +) + +func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) { + _, err := store.Get(ns, name) + if shouldExist && err != nil { + t.Errorf("unexpected actions: %#v", err) + } + if !shouldExist && (err == nil || !strings.Contains(err.Error(), "secret not registered")) { + t.Errorf("unexpected actions: %#v", err) + } +} + +func TestSecretStore(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newSecretStore(fakeClient) + store.Add("ns1", "name1") + store.Add("ns2", "name2") + store.Add("ns1", "name1") + store.Add("ns1", "name1") + store.Delete("ns1", "name1") + store.Delete("ns2", "name2") + store.Add("ns3", "name3") + + // We expect one Get action per Add. + actions := fakeClient.Actions() + if len(actions) != 5 { + t.Fatalf("unexpected actions: %#v", actions) + } + for _, a := range actions { + if !a.Matches("get", "secrets") { + t.Errorf("unexpected actions: %#v", a) + } + } + + checkSecret(t, store, "ns1", "name1", true) + checkSecret(t, store, "ns2", "name2", false) + checkSecret(t, store, "ns3", "name3", true) + checkSecret(t, store, "ns4", "name4", false) +} + +func TestSecretStoreRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newSecretStore(fakeClient) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + store.Refresh() + actions := fakeClient.Actions() + if len(actions) != 10 { + t.Fatalf("unexpected actions: %#v", actions) + } + for _, a := range actions { + if !a.Matches("get", "secrets") { + t.Errorf("unexpected actions: %#v", a) + } + } +} + +type secretsToAttach struct { + imagePullSecretNames []string + containerEnvSecretNames [][]string +} + +func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1.PodSpec{}, + } + for _, name := range toAttach.imagePullSecretNames { + pod.Spec.ImagePullSecrets = append( + pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name}) + } + for i, names := range toAttach.containerEnvSecretNames { + container := v1.Container{ + Name: fmt.Sprintf("container-%d", i), + } + for _, name := range names { + envSource := &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource}) + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + return pod +} + +func TestCachingSecretManager(t *testing.T) { + fakeClient := &fake.Clientset{} + secretStore := newSecretStore(fakeClient) + manager := &cachingSecretManager{ + secretStore: secretStore, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Update the pod with a different secrets. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s3"}, {"s4"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // Create another pod, but with same secrets in different namespace. + manager.RegisterPod(podWithSecrets("ns2", "name2", s2)) + // Create and delete a pod with some other secrets. + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s5"}, + containerEnvSecretNames: [][]string{{"s6"}}, + } + manager.RegisterPod(podWithSecrets("ns3", "name", s3)) + manager.UnregisterPod(podWithSecrets("ns3", "name", s3)) + + // We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2. + for _, ns := range []string{"ns1", "ns2", "ns3"} { + for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6"} { + shouldExist := + (secret == "s1" || secret == "s3" || secret == "s4") && (ns == "ns1" || ns == "ns2") + checkSecret(t, secretStore, ns, secret, shouldExist) + } + } +} diff --git a/pkg/kubelet/secret_manager.go b/pkg/kubelet/secret_manager.go deleted file mode 100644 index 644f6cf7010..00000000000 --- a/pkg/kubelet/secret_manager.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "k8s.io/kubernetes/pkg/api" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" -) - -type secretManager interface { - // Get secret by secret namespace and name. - GetSecret(namespace, name string) (*api.Secret, error) -} - -// simpleSecretManager implements SecretManager interfaces with -// simple operations to apiserver. -type simpleSecretManager struct { - kubeClient clientset.Interface -} - -func newSimpleSecretManager(kubeClient clientset.Interface) (secretManager, error) { - return &simpleSecretManager{kubeClient: kubeClient}, nil -} - -func (s *simpleSecretManager) GetSecret(namespace, name string) (*api.Secret, error) { - return s.kubeClient.Core().Secrets(namespace).Get(name) -} From 09e4de385c3ea5422ce4d696c7c84f7ea12bb3e9 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 13 Dec 2016 11:32:12 +0100 Subject: [PATCH 3/4] Enable nontrivial secret manager --- pkg/kubelet/BUILD | 1 + pkg/kubelet/kubelet.go | 3 +- pkg/kubelet/kubelet_test.go | 4 +- pkg/kubelet/pod/BUILD | 2 + pkg/kubelet/pod/pod_manager.go | 14 ++++++- pkg/kubelet/pod/pod_manager_test.go | 4 +- pkg/kubelet/prober/common_test.go | 2 +- pkg/kubelet/prober/worker_test.go | 2 +- pkg/kubelet/runonce_test.go | 4 +- pkg/kubelet/secret/fake_manager.go | 40 +++++++++++++++++++ pkg/kubelet/status/BUILD | 1 + pkg/kubelet/status/status_manager_test.go | 3 +- pkg/kubelet/volumemanager/BUILD | 1 + .../volumemanager/volume_manager_test.go | 5 ++- 14 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 pkg/kubelet/secret/fake_manager.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index b9e2315fd2b..59e66ce4825 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -174,6 +174,7 @@ go_test( "//pkg/kubelet/pod/testing:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/status:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 55eee92e0d4..04ee4604ceb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -505,7 +505,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.livenessManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) + // podManager is also responsible for keeping secretManager contents up-to-date. + klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager) if kubeCfg.RemoteRuntimeEndpoint != "" { // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index a7c1d6bb5c4..3c23784bb96 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -56,6 +56,7 @@ import ( podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -166,7 +167,8 @@ func newTestKubeletWithImageList( kubelet.cadvisor = mockCadvisor fakeMirrorClient := podtest.NewFakeMirrorClient() - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient) + fakeSecretManager := secret.NewFakeManager() + kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, fakeSecretManager) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager) kubelet.containerRefManager = kubecontainer.NewRefManager() diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{}) diff --git a/pkg/kubelet/pod/BUILD b/pkg/kubelet/pod/BUILD index ea8ad683e4e..341ab74e8b2 100644 --- a/pkg/kubelet/pod/BUILD +++ b/pkg/kubelet/pod/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", @@ -38,6 +39,7 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index e02d09dcf5a..e2427065c93 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/secret" + "k8s.io/kubernetes/pkg/types" ) // Manager stores and manages access to pods, maintaining the mappings @@ -112,13 +114,17 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[types.UID]types.UID + // basicManager is keeping secretManager up-to-date. + secretManager secret.Manager + // A mirror pod client to create/delete mirror pods. MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient) Manager { +func NewBasicPodManager(client MirrorClient, secretManager secret.Manager) Manager { pm := &basicManager{} + pm.secretManager = secretManager pm.MirrorClient = client pm.SetPods(nil) return pm @@ -153,6 +159,9 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { // lock. func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { for _, pod := range pods { + if pm.secretManager != nil { + pm.secretManager.RegisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { pm.mirrorPodByUID[pod.UID] = pod @@ -173,6 +182,9 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { func (pm *basicManager) DeletePod(pod *v1.Pod) { pm.lock.Lock() defer pm.lock.Unlock() + if pm.secretManager != nil { + pm.secretManager.UnregisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { delete(pm.mirrorPodByUID, pod.UID) diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 3d530a1f46c..f87a0836dc4 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -24,13 +24,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) // Stub out mirror client for testing purpose. func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() - manager := NewBasicPodManager(fakeMirrorClient).(*basicManager) + secretManager := secret.NewFakeManager() + manager := NewBasicPodManager(fakeMirrorClient, secretManager).(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index f5e69428f85..2dcc972c6ef 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -98,7 +98,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager(nil, nil) // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 41fe42e8961..590756a3249 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -117,7 +117,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil)) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil)) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 047b5a819a0..1fc0850b07a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -39,6 +39,7 @@ import ( nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/volumemanager" @@ -59,7 +60,8 @@ func TestRunOnce(t *testing.T) { Usage: 9 * mb, Capacity: 10 * mb, }, nil) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager( + podtest.NewFakeMirrorClient(), secret.NewFakeManager()) diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) fakeRuntime := &containertest.FakeRuntime{} basePath, err := utiltesting.MkTmpdir("kubelet") diff --git a/pkg/kubelet/secret/fake_manager.go b/pkg/kubelet/secret/fake_manager.go new file mode 100644 index 00000000000..25f948f57eb --- /dev/null +++ b/pkg/kubelet/secret/fake_manager.go @@ -0,0 +1,40 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "k8s.io/kubernetes/pkg/api/v1" +) + +// fakeManager implements Manager interface for testing purposes. +// simple operations to apiserver. +type fakeManager struct { +} + +func NewFakeManager() Manager { + return &fakeManager{} +} + +func (s *fakeManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return nil, nil +} + +func (s *fakeManager) RegisterPod(pod *v1.Pod) { +} + +func (s *fakeManager) UnregisterPod(pod *v1.Pod) { +} diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index ef2f86fee44..b9df40b46b0 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -50,6 +50,7 @@ go_test( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:github.com/stretchr/testify/assert", "//vendor:k8s.io/apimachinery/pkg/api/errors", diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index a11290633d3..c4b5a2b7594 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -38,6 +38,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + kubesecret "k8s.io/kubernetes/pkg/kubelet/secret" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -71,7 +72,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager()) podManager.AddPod(getTestPod()) return NewManager(kubeClient, podManager).(*manager) } diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index 88f381e0a75..21232f8caa3 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -50,6 +50,7 @@ go_test( "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/testing:go_default_library", "//pkg/volume:go_default_library", diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index ce52675e5ec..d2c9a6a3fe3 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/util/mount" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" @@ -52,7 +53,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) node, pod, pv, claim := createObjects() kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -97,7 +98,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) node, pod, _, claim := createObjects() From 3c0d2bb1f0259b6b826b2a12d4e76b2db1e619ed Mon Sep 17 00:00:00 2001 From: gmarek Date: Fri, 6 Jan 2017 20:47:43 +0100 Subject: [PATCH 4/4] Add SecretManager to Kubelet --- pkg/kubelet/BUILD | 1 + pkg/kubelet/pod/pod_manager.go | 3 +- pkg/kubelet/secret/BUILD | 27 ++- pkg/kubelet/secret/secret_manager.go | 184 +++++++++++---------- pkg/kubelet/secret/secret_manager_test.go | 190 +++++++++++++++++++--- 5 files changed, 289 insertions(+), 116 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 59e66ce4825..a2a9f0b60f3 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -247,6 +247,7 @@ filegroup( "//pkg/kubelet/remote:all-srcs", "//pkg/kubelet/rkt:all-srcs", "//pkg/kubelet/rktshim:all-srcs", + "//pkg/kubelet/secret:all-srcs", "//pkg/kubelet/server:all-srcs", "//pkg/kubelet/status:all-srcs", "//pkg/kubelet/sysctl:all-srcs", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index e2427065c93..dd069b0fa3e 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -23,7 +23,6 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" - "k8s.io/kubernetes/pkg/types" ) // Manager stores and manages access to pods, maintaining the mappings @@ -160,6 +159,8 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { for _, pod := range pods { if pm.secretManager != nil { + // TODO: Consider detecting only status update and in such case do + // not register pod, as it doesn't really matter. pm.secretManager.RegisterPod(pod) } podFullName := kubecontainer.GetPodFullName(pod) diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD index 16aec480ad2..fc2a99bc232 100644 --- a/pkg/kubelet/secret/BUILD +++ b/pkg/kubelet/secret/BUILD @@ -11,11 +11,14 @@ load( go_test( name = "go_default_test", srcs = ["secret_manager_test.go"], - library = "go_default_library", + library = ":go_default_library", tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor:github.com/stretchr/testify/assert", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/client-go/pkg/util/clock", ], ) @@ -27,13 +30,25 @@ go_library( ], tags = ["automanaged"], deps = [ - "//pkg/api/errors:go_default_library", "//pkg/api/v1:go_default_library", - "//pkg/apis/meta/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/storage/etcd:go_default_library", - "//pkg/util/sets:go_default_library", - "//pkg/util/wait:go_default_library", - "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + "//vendor:k8s.io/client-go/pkg/util/clock", ], ) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index d6a08822041..eb9f8dec3c5 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -21,26 +21,28 @@ import ( "sync" "time" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/v1" - metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" storageetcd "k8s.io/kubernetes/pkg/storage/etcd" - "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/util/wait" - "github.com/golang/glog" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/pkg/util/clock" ) type Manager interface { // Get secret by secret namespace and name. GetSecret(namespace, name string) (*v1.Secret, error) + // WARNING: Register/UnregisterPod functions should be efficient, + // i.e. should not block on network operations. + // RegisterPod registers all secrets from a given pod. RegisterPod(pod *v1.Pod) // UnregisterPod unregisters secrets from a given pod that are not - // registered still by any other registered pod. + // used by any other registered pod. UnregisterPod(pod *v1.Pod) } @@ -71,39 +73,63 @@ type objectKey struct { // secretStoreItems is a single item stored in secretStore. type secretStoreItem struct { - secret *v1.Secret - err error refCount int + secret *secretData +} + +type secretData struct { + sync.Mutex + + secret *v1.Secret + err error + lastUpdateTime time.Time } // secretStore is a local cache of secrets. type secretStore struct { kubeClient clientset.Interface + clock clock.Clock lock sync.Mutex items map[objectKey]*secretStoreItem + ttl time.Duration } -func newSecretStore(kubeClient clientset.Interface) *secretStore { +func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore { return &secretStore{ kubeClient: kubeClient, + clock: clock, items: make(map[objectKey]*secretStoreItem), + ttl: ttl, } } +func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) + return newVersion < oldVersion +} + func (s *secretStore) Add(namespace, name string) { key := objectKey{namespace: namespace, name: name} - secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + // Add is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given secret. + // Then Get() is responsible for fetching if needed. s.lock.Lock() defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - item.secret = secret - item.err = err - item.refCount++ - } else { - s.items[key] = &secretStoreItem{secret: secret, err: err, refCount: 1} + item, exists := s.items[key] + if !exists { + item = &secretStoreItem{ + refCount: 0, + secret: &secretData{}, + } + s.items[key] = item } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.secret = nil } func (s *secretStore) Delete(namespace, name string) { @@ -122,67 +148,51 @@ func (s *secretStore) Delete(namespace, name string) { func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { key := objectKey{namespace: namespace, name: name} - s.lock.Lock() - defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - return item.secret, item.err - } - return nil, fmt.Errorf("secret not registered") -} - -func (s *secretStore) Refresh() { - s.lock.Lock() - keys := make([]objectKey, 0, len(s.items)) - for key := range s.items { - keys = append(keys, key) - } - s.lock.Unlock() - - type result struct { - secret *v1.Secret - err error - } - results := make([]result, 0, len(keys)) - for _, key := range keys { - secret, err := s.kubeClient.Core().Secrets(key.namespace).Get(key.name, metav1.GetOptions{}) - if err != nil { - glog.Warningf("Unable to retrieve a secret %s/%s: %v", key.namespace, key.name, err) + data := func() *secretData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil } - results = append(results, result{secret: secret, err: err}) + if item.secret == nil { + item.secret = &secretData{} + } + return item.secret + }() + if data == nil { + return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) } - s.lock.Lock() - defer s.lock.Unlock() - for i, key := range keys { - secret := results[i].secret - err := results[i].err - if err != nil && !apierrors.IsNotFound(err) { - // If we couldn't retrieve a secret and it wasn't 404 error, skip updating. - continue - } - if item, ok := s.items[key]; ok { - if secret != nil && item.secret != nil { - // If the fetched version is not newer than the current one (such races are - // possible), then skip update. - newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(secret) - oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(item.secret) - if newVersion <= oldVersion { - continue - } + // After updating data in secretStore, lock the data, fetch secret if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) { + secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + // Update state, unless we got error different than "not-found". + if err == nil || apierrors.IsNotFound(err) { + // Ignore the update to the older version of a secret. + if data.secret == nil || secret == nil || !isSecretOlder(secret, data.secret) { + data.secret = secret + data.err = err + data.lastUpdateTime = s.clock.Now() } - item.secret = secret - item.err = err + } else if data.secret == nil && data.err == nil { + // We have unitialized secretData - return current result. + return secret, err } } + return data.secret, data.err } // cachingSecretManager keeps a cache of all secrets necessary for registered pods. // It implements the following logic: -// - whenever a pod is created or updated, the current versions of all its secrets -// are grabbed from apiserver and stored in local cache -// - every GetSecret call is served from local cache -// - every X seconds we are refreshing the local cache by grabbing current version -// of all registered secrets from apiserver +// - whenever a pod is created or updated, the cached versions of all its secrets +// are invalidated +// - every GetSecret() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache type cachingSecretManager struct { secretStore *secretStore @@ -192,10 +202,9 @@ type cachingSecretManager struct { func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { csm := &cachingSecretManager{ - secretStore: newSecretStore(kubeClient), + secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute), registeredPods: make(map[objectKey]*v1.Pod), } - go wait.NonSlidingUntil(func() { csm.secretStore.Refresh() }, time.Minute, wait.NeverStop) return csm, nil } @@ -221,36 +230,33 @@ func getSecretNames(pod *v1.Pod) sets.String { } func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { - for key := range getSecretNames(pod) { - c.secretStore.Add(pod.Namespace, key) + names := getSecretNames(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.secretStore.Add(pod.Namespace, name) } var prev *v1.Pod - func() { - key := objectKey{namespace: pod.Namespace, name: pod.Name} - c.lock.Lock() - defer c.lock.Unlock() - prev = c.registeredPods[key] - c.registeredPods[key] = pod - }() + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod if prev != nil { - for key := range getSecretNames(prev) { - c.secretStore.Delete(prev.Namespace, key) + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) } } } func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { var prev *v1.Pod - func() { - key := objectKey{namespace: pod.Namespace, name: pod.Name} - c.lock.Lock() - defer c.lock.Unlock() - prev = c.registeredPods[key] - delete(c.registeredPods, key) - }() + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) if prev != nil { - for key := range getSecretNames(prev) { - c.secretStore.Delete(prev.Namespace, key) + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) } } } diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go index d4142e85031..ad0b2985359 100644 --- a/pkg/kubelet/secret/secret_manager_test.go +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -19,10 +19,17 @@ package secret import ( "fmt" "strings" + "sync" "testing" + "time" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/util/clock" + + "github.com/stretchr/testify/assert" ) func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) { @@ -30,14 +37,14 @@ func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist if shouldExist && err != nil { t.Errorf("unexpected actions: %#v", err) } - if !shouldExist && (err == nil || !strings.Contains(err.Error(), "secret not registered")) { + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) { t.Errorf("unexpected actions: %#v", err) } } func TestSecretStore(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient) + store := newSecretStore(fakeClient, clock.RealClock{}, 0) store.Add("ns1", "name1") store.Add("ns2", "name2") store.Add("ns1", "name1") @@ -46,15 +53,21 @@ func TestSecretStore(t *testing.T) { store.Delete("ns2", "name2") store.Add("ns3", "name3") - // We expect one Get action per Add. + // Adds don't issue Get requests. actions := fakeClient.Actions() - if len(actions) != 5 { - t.Fatalf("unexpected actions: %#v", actions) - } + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Should issue Get request + store.Get("ns1", "name1") + // Shouldn't issue Get request, as secret is not registered + store.Get("ns2", "name2") + // Should issue Get request + store.Get("ns3", "name3") + + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + for _, a := range actions { - if !a.Matches("get", "secrets") { - t.Errorf("unexpected actions: %#v", a) - } + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) } checkSecret(t, store, "ns1", "name1", true) @@ -63,27 +76,57 @@ func TestSecretStore(t *testing.T) { checkSecret(t, store, "ns4", "name4", false) } -func TestSecretStoreRefresh(t *testing.T) { +func TestSecretStoreGetAlwaysRefresh(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, 0) for i := 0; i < 10; i++ { store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) } fakeClient.ClearActions() - store.Refresh() + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() actions := fakeClient.Actions() - if len(actions) != 10 { - t.Fatalf("unexpected actions: %#v", actions) - } + assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) + for _, a := range actions { - if !a.Matches("get", "secrets") { - t.Errorf("unexpected actions: %#v", a) - } + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) } } +func TestSecretStoreGetNeverRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + // Only first Get, should forward the Get request. + assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) +} + type secretsToAttach struct { imagePullSecretNames []string containerEnvSecretNames [][]string @@ -91,7 +134,7 @@ type secretsToAttach struct { func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { pod := &v1.Pod{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, @@ -120,9 +163,116 @@ func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { return pod } +func TestCacheInvalidation(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Fetch both secrets - this should triggger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + actions := fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Update a pod with a new secret. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // All secrets should be invalidated - this should trigger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Create a new pod that is refencing the first two secrets - those should + // be invalidated. + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() +} + +func TestCacheRefcounts(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s2"}, + containerEnvSecretNames: [][]string{{"s4"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name2", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name3", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name4", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "name3", s2)) + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s3"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name5", s3)) + manager.RegisterPod(podWithSecrets("ns1", "name6", s3)) + s4 := secretsToAttach{ + imagePullSecretNames: []string{"s3"}, + containerEnvSecretNames: [][]string{{"s6"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name7", s4)) + manager.UnregisterPod(podWithSecrets("ns1", "name7", s4)) + + // Also check the Add + Update + Remove scenario. + manager.RegisterPod(podWithSecrets("ns1", "other-name", s1)) + manager.RegisterPod(podWithSecrets("ns1", "other-name", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "other-name", s2)) + + // Now we have: 1 pod with s1, 2 pods with s2 and 2 pods with s3, 0 pods with s4. + verify := func(ns, name string, count int) bool { + store.lock.Lock() + defer store.lock.Unlock() + item, ok := store.items[objectKey{ns, name}] + if !ok { + return count == 0 + } + return item.refCount == count + } + assert.True(t, verify("ns1", "s1", 3)) + assert.True(t, verify("ns1", "s2", 3)) + assert.True(t, verify("ns1", "s3", 3)) + assert.True(t, verify("ns1", "s4", 2)) + assert.True(t, verify("ns1", "s5", 4)) + assert.True(t, verify("ns1", "s6", 0)) + assert.True(t, verify("ns1", "s7", 0)) +} + func TestCachingSecretManager(t *testing.T) { fakeClient := &fake.Clientset{} - secretStore := newSecretStore(fakeClient) + secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0) manager := &cachingSecretManager{ secretStore: secretStore, registeredPods: make(map[objectKey]*v1.Pod),