diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index ef058c2bc05..a2a9f0b60f3 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -71,6 +71,7 @@ go_library( "//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/remote:go_default_library", "//pkg/kubelet/rkt:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", @@ -173,6 +174,7 @@ go_test( "//pkg/kubelet/pod/testing:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/status:go_default_library", @@ -245,6 +247,7 @@ filegroup( "//pkg/kubelet/remote:all-srcs", "//pkg/kubelet/rkt:all-srcs", "//pkg/kubelet/rktshim:all-srcs", + "//pkg/kubelet/secret:all-srcs", "//pkg/kubelet/server:all-srcs", "//pkg/kubelet/status:all-srcs", "//pkg/kubelet/sysctl:all-srcs", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 842235b5ff5..04ee4604ceb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -73,6 +73,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/remote" "k8s.io/kubernetes/pkg/kubelet/rkt" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -409,6 +410,11 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } containerRefManager := kubecontainer.NewRefManager() + secretManager, err := secret.NewSimpleSecretManager(kubeClient) + if err != nil { + return nil, fmt.Errorf("failed to initialize secret manager: %v", err) + } + oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder) klet := &Kubelet{ @@ -434,6 +440,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub recorder: kubeDeps.Recorder, cadvisor: kubeDeps.CAdvisorInterface, diskSpaceManager: diskSpaceManager, + secretManager: secretManager, cloud: kubeDeps.Cloud, autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider), nodeRef: nodeRef, @@ -498,7 +505,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.livenessManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) + // podManager is also responsible for keeping secretManager contents up-to-date. + klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager) if kubeCfg.RemoteRuntimeEndpoint != "" { // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified @@ -913,6 +921,9 @@ type Kubelet struct { // Diskspace manager. diskSpaceManager diskSpaceManager + // Secret manager. + secretManager secret.Manager + // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorapi.MachineInfo diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index da53ca6afb8..7ce3a24dffe 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -524,7 +524,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container if kl.kubeClient == nil { return result, fmt.Errorf("Couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name) } - secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name, metav1.GetOptions{}) + secret, err = kl.secretManager.GetSecret(pod.Namespace, name) if err != nil { return result, err } @@ -638,14 +638,11 @@ func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error { // getPullSecretsForPod inspects the Pod and retrieves the referenced pull // secrets. -// TODO: duplicate secrets are being retrieved multiple times and there -// is no cache. Creating and using a secret manager interface will make this -// easier to address. func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) ([]v1.Secret, error) { pullSecrets := []v1.Secret{} for _, secretRef := range pod.Spec.ImagePullSecrets { - secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name, metav1.GetOptions{}) + secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name) if err != nil { glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err) continue diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index a7c1d6bb5c4..3c23784bb96 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -56,6 +56,7 @@ import ( podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -166,7 +167,8 @@ func newTestKubeletWithImageList( kubelet.cadvisor = mockCadvisor fakeMirrorClient := podtest.NewFakeMirrorClient() - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient) + fakeSecretManager := secret.NewFakeManager() + kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, fakeSecretManager) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager) kubelet.containerRefManager = kubecontainer.NewRefManager() diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{}) diff --git a/pkg/kubelet/pod/BUILD b/pkg/kubelet/pod/BUILD index ea8ad683e4e..341ab74e8b2 100644 --- a/pkg/kubelet/pod/BUILD +++ b/pkg/kubelet/pod/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", @@ -38,6 +39,7 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index e02d09dcf5a..dd069b0fa3e 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/secret" ) // Manager stores and manages access to pods, maintaining the mappings @@ -112,13 +113,17 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[types.UID]types.UID + // basicManager is keeping secretManager up-to-date. + secretManager secret.Manager + // A mirror pod client to create/delete mirror pods. MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient) Manager { +func NewBasicPodManager(client MirrorClient, secretManager secret.Manager) Manager { pm := &basicManager{} + pm.secretManager = secretManager pm.MirrorClient = client pm.SetPods(nil) return pm @@ -153,6 +158,11 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { // lock. func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { for _, pod := range pods { + if pm.secretManager != nil { + // TODO: Consider detecting only status update and in such case do + // not register pod, as it doesn't really matter. + pm.secretManager.RegisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { pm.mirrorPodByUID[pod.UID] = pod @@ -173,6 +183,9 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { func (pm *basicManager) DeletePod(pod *v1.Pod) { pm.lock.Lock() defer pm.lock.Unlock() + if pm.secretManager != nil { + pm.secretManager.UnregisterPod(pod) + } podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { delete(pm.mirrorPodByUID, pod.UID) diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 3d530a1f46c..f87a0836dc4 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -24,13 +24,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) // Stub out mirror client for testing purpose. func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() - manager := NewBasicPodManager(fakeMirrorClient).(*basicManager) + secretManager := secret.NewFakeManager() + manager := NewBasicPodManager(fakeMirrorClient, secretManager).(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index f5e69428f85..2dcc972c6ef 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -98,7 +98,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager(nil, nil) // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 41fe42e8961..590756a3249 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -117,7 +117,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil)) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil)) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 047b5a819a0..1fc0850b07a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -39,6 +39,7 @@ import ( nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/volumemanager" @@ -59,7 +60,8 @@ func TestRunOnce(t *testing.T) { Usage: 9 * mb, Capacity: 10 * mb, }, nil) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager( + podtest.NewFakeMirrorClient(), secret.NewFakeManager()) diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) fakeRuntime := &containertest.FakeRuntime{} basePath, err := utiltesting.MkTmpdir("kubelet") diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD new file mode 100644 index 00000000000..fc2a99bc232 --- /dev/null +++ b/pkg/kubelet/secret/BUILD @@ -0,0 +1,54 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["secret_manager_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor:github.com/stretchr/testify/assert", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/client-go/pkg/util/clock", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "fake_manager.go", + "secret_manager.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/storage/etcd:go_default_library", + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + "//vendor:k8s.io/client-go/pkg/util/clock", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/secret/fake_manager.go b/pkg/kubelet/secret/fake_manager.go new file mode 100644 index 00000000000..25f948f57eb --- /dev/null +++ b/pkg/kubelet/secret/fake_manager.go @@ -0,0 +1,40 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "k8s.io/kubernetes/pkg/api/v1" +) + +// fakeManager implements Manager interface for testing purposes. +// simple operations to apiserver. +type fakeManager struct { +} + +func NewFakeManager() Manager { + return &fakeManager{} +} + +func (s *fakeManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return nil, nil +} + +func (s *fakeManager) RegisterPod(pod *v1.Pod) { +} + +func (s *fakeManager) UnregisterPod(pod *v1.Pod) { +} diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go new file mode 100644 index 00000000000..eb9f8dec3c5 --- /dev/null +++ b/pkg/kubelet/secret/secret_manager.go @@ -0,0 +1,262 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + storageetcd "k8s.io/kubernetes/pkg/storage/etcd" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/pkg/util/clock" +) + +type Manager interface { + // Get secret by secret namespace and name. + GetSecret(namespace, name string) (*v1.Secret, error) + + // WARNING: Register/UnregisterPod functions should be efficient, + // i.e. should not block on network operations. + + // RegisterPod registers all secrets from a given pod. + RegisterPod(pod *v1.Pod) + + // UnregisterPod unregisters secrets from a given pod that are not + // used by any other registered pod. + UnregisterPod(pod *v1.Pod) +} + +// simpleSecretManager implements SecretManager interfaces with +// simple operations to apiserver. +type simpleSecretManager struct { + kubeClient clientset.Interface +} + +func NewSimpleSecretManager(kubeClient clientset.Interface) (Manager, error) { + return &simpleSecretManager{kubeClient: kubeClient}, nil +} + +func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) +} + +func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { +} + +func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { +} + +type objectKey struct { + namespace string + name string +} + +// secretStoreItems is a single item stored in secretStore. +type secretStoreItem struct { + refCount int + secret *secretData +} + +type secretData struct { + sync.Mutex + + secret *v1.Secret + err error + lastUpdateTime time.Time +} + +// secretStore is a local cache of secrets. +type secretStore struct { + kubeClient clientset.Interface + clock clock.Clock + + lock sync.Mutex + items map[objectKey]*secretStoreItem + ttl time.Duration +} + +func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore { + return &secretStore{ + kubeClient: kubeClient, + clock: clock, + items: make(map[objectKey]*secretStoreItem), + ttl: ttl, + } +} + +func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) + return newVersion < oldVersion +} + +func (s *secretStore) Add(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // Add is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given secret. + // Then Get() is responsible for fetching if needed. + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + item = &secretStoreItem{ + refCount: 0, + secret: &secretData{}, + } + s.items[key] = item + } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.secret = nil +} + +func (s *secretStore) Delete(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + s.lock.Lock() + defer s.lock.Unlock() + if item, ok := s.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + delete(s.items, key) + } + } +} + +func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { + key := objectKey{namespace: namespace, name: name} + + data := func() *secretData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil + } + if item.secret == nil { + item.secret = &secretData{} + } + return item.secret + }() + if data == nil { + return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) + } + + // After updating data in secretStore, lock the data, fetch secret if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) { + secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + // Update state, unless we got error different than "not-found". + if err == nil || apierrors.IsNotFound(err) { + // Ignore the update to the older version of a secret. + if data.secret == nil || secret == nil || !isSecretOlder(secret, data.secret) { + data.secret = secret + data.err = err + data.lastUpdateTime = s.clock.Now() + } + } else if data.secret == nil && data.err == nil { + // We have unitialized secretData - return current result. + return secret, err + } + } + return data.secret, data.err +} + +// cachingSecretManager keeps a cache of all secrets necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, the cached versions of all its secrets +// are invalidated +// - every GetSecret() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache +type cachingSecretManager struct { + secretStore *secretStore + + lock sync.Mutex + registeredPods map[objectKey]*v1.Pod +} + +func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { + csm := &cachingSecretManager{ + secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute), + registeredPods: make(map[objectKey]*v1.Pod), + } + return csm, nil +} + +func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { + return c.secretStore.Get(namespace, name) +} + +// TODO: Before we will use secretManager in other places (e.g. for secret volumes) +// we should update this function to also get secrets from those places. +func getSecretNames(pod *v1.Pod) sets.String { + result := sets.NewString() + for _, reference := range pod.Spec.ImagePullSecrets { + result.Insert(reference.Name) + } + for i := range pod.Spec.Containers { + for _, envVar := range pod.Spec.Containers[i].Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil { + result.Insert(envVar.ValueFrom.SecretKeyRef.Name) + } + } + } + return result +} + +func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { + names := getSecretNames(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.secretStore.Add(pod.Namespace, name) + } + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod + if prev != nil { + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) + } + } +} + +func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { + var prev *v1.Pod + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) + if prev != nil { + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) + } + } +} diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go new file mode 100644 index 00000000000..ad0b2985359 --- /dev/null +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -0,0 +1,311 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "strings" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/util/clock" + + "github.com/stretchr/testify/assert" +) + +func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) { + _, err := store.Get(ns, name) + if shouldExist && err != nil { + t.Errorf("unexpected actions: %#v", err) + } + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) { + t.Errorf("unexpected actions: %#v", err) + } +} + +func TestSecretStore(t *testing.T) { + fakeClient := &fake.Clientset{} + store := newSecretStore(fakeClient, clock.RealClock{}, 0) + store.Add("ns1", "name1") + store.Add("ns2", "name2") + store.Add("ns1", "name1") + store.Add("ns1", "name1") + store.Delete("ns1", "name1") + store.Delete("ns2", "name2") + store.Add("ns3", "name3") + + // Adds don't issue Get requests. + actions := fakeClient.Actions() + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Should issue Get request + store.Get("ns1", "name1") + // Shouldn't issue Get request, as secret is not registered + store.Get("ns2", "name2") + // Should issue Get request + store.Get("ns3", "name3") + + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) + } + + checkSecret(t, store, "ns1", "name1", true) + checkSecret(t, store, "ns2", "name2", false) + checkSecret(t, store, "ns3", "name3", true) + checkSecret(t, store, "ns4", "name4", false) +} + +func TestSecretStoreGetAlwaysRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, 0) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) + + for _, a := range actions { + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) + } +} + +func TestSecretStoreGetNeverRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + // Only first Get, should forward the Get request. + assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) +} + +type secretsToAttach struct { + imagePullSecretNames []string + containerEnvSecretNames [][]string +} + +func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Spec: v1.PodSpec{}, + } + for _, name := range toAttach.imagePullSecretNames { + pod.Spec.ImagePullSecrets = append( + pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name}) + } + for i, names := range toAttach.containerEnvSecretNames { + container := v1.Container{ + Name: fmt.Sprintf("container-%d", i), + } + for _, name := range names { + envSource := &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: name, + }, + }, + } + container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource}) + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + return pod +} + +func TestCacheInvalidation(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Fetch both secrets - this should triggger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + actions := fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Update a pod with a new secret. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // All secrets should be invalidated - this should trigger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Create a new pod that is refencing the first two secrets - those should + // be invalidated. + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() +} + +func TestCacheRefcounts(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s2"}, + containerEnvSecretNames: [][]string{{"s4"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name2", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name3", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name4", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "name3", s2)) + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s3"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name5", s3)) + manager.RegisterPod(podWithSecrets("ns1", "name6", s3)) + s4 := secretsToAttach{ + imagePullSecretNames: []string{"s3"}, + containerEnvSecretNames: [][]string{{"s6"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name7", s4)) + manager.UnregisterPod(podWithSecrets("ns1", "name7", s4)) + + // Also check the Add + Update + Remove scenario. + manager.RegisterPod(podWithSecrets("ns1", "other-name", s1)) + manager.RegisterPod(podWithSecrets("ns1", "other-name", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "other-name", s2)) + + // Now we have: 1 pod with s1, 2 pods with s2 and 2 pods with s3, 0 pods with s4. + verify := func(ns, name string, count int) bool { + store.lock.Lock() + defer store.lock.Unlock() + item, ok := store.items[objectKey{ns, name}] + if !ok { + return count == 0 + } + return item.refCount == count + } + assert.True(t, verify("ns1", "s1", 3)) + assert.True(t, verify("ns1", "s2", 3)) + assert.True(t, verify("ns1", "s3", 3)) + assert.True(t, verify("ns1", "s4", 2)) + assert.True(t, verify("ns1", "s5", 4)) + assert.True(t, verify("ns1", "s6", 0)) + assert.True(t, verify("ns1", "s7", 0)) +} + +func TestCachingSecretManager(t *testing.T) { + fakeClient := &fake.Clientset{} + secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0) + manager := &cachingSecretManager{ + secretStore: secretStore, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Update the pod with a different secrets. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s3"}, {"s4"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // Create another pod, but with same secrets in different namespace. + manager.RegisterPod(podWithSecrets("ns2", "name2", s2)) + // Create and delete a pod with some other secrets. + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s5"}, + containerEnvSecretNames: [][]string{{"s6"}}, + } + manager.RegisterPod(podWithSecrets("ns3", "name", s3)) + manager.UnregisterPod(podWithSecrets("ns3", "name", s3)) + + // We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2. + for _, ns := range []string{"ns1", "ns2", "ns3"} { + for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6"} { + shouldExist := + (secret == "s1" || secret == "s3" || secret == "s4") && (ns == "ns1" || ns == "ns2") + checkSecret(t, secretStore, ns, secret, shouldExist) + } + } +} diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index ef2f86fee44..b9df40b46b0 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -50,6 +50,7 @@ go_test( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:github.com/stretchr/testify/assert", "//vendor:k8s.io/apimachinery/pkg/api/errors", diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index a11290633d3..c4b5a2b7594 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -38,6 +38,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + kubesecret "k8s.io/kubernetes/pkg/kubelet/secret" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -71,7 +72,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager()) podManager.AddPod(getTestPod()) return NewManager(kubeClient, podManager).(*manager) } diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index 88f381e0a75..21232f8caa3 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -50,6 +50,7 @@ go_test( "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", + "//pkg/kubelet/secret:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/testing:go_default_library", "//pkg/volume:go_default_library", diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index ce52675e5ec..d2c9a6a3fe3 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/util/mount" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" @@ -52,7 +53,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) node, pod, pv, claim := createObjects() kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -97,7 +98,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager()) node, pod, _, claim := createObjects()