diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 59e66ce4825..a2a9f0b60f3 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -247,6 +247,7 @@ filegroup( "//pkg/kubelet/remote:all-srcs", "//pkg/kubelet/rkt:all-srcs", "//pkg/kubelet/rktshim:all-srcs", + "//pkg/kubelet/secret:all-srcs", "//pkg/kubelet/server:all-srcs", "//pkg/kubelet/status:all-srcs", "//pkg/kubelet/sysctl:all-srcs", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index e2427065c93..dd069b0fa3e 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -23,7 +23,6 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" - "k8s.io/kubernetes/pkg/types" ) // Manager stores and manages access to pods, maintaining the mappings @@ -160,6 +159,8 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { for _, pod := range pods { if pm.secretManager != nil { + // TODO: Consider detecting only status update and in such case do + // not register pod, as it doesn't really matter. pm.secretManager.RegisterPod(pod) } podFullName := kubecontainer.GetPodFullName(pod) diff --git a/pkg/kubelet/secret/BUILD b/pkg/kubelet/secret/BUILD index 16aec480ad2..fc2a99bc232 100644 --- a/pkg/kubelet/secret/BUILD +++ b/pkg/kubelet/secret/BUILD @@ -11,11 +11,14 @@ load( go_test( name = "go_default_test", srcs = ["secret_manager_test.go"], - library = "go_default_library", + library = ":go_default_library", tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor:github.com/stretchr/testify/assert", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/client-go/pkg/util/clock", ], ) @@ -27,13 +30,25 @@ go_library( ], tags = ["automanaged"], deps = [ - "//pkg/api/errors:go_default_library", "//pkg/api/v1:go_default_library", - "//pkg/apis/meta/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/storage/etcd:go_default_library", - "//pkg/util/sets:go_default_library", - "//pkg/util/wait:go_default_library", - "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + "//vendor:k8s.io/client-go/pkg/util/clock", ], ) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index d6a08822041..eb9f8dec3c5 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -21,26 +21,28 @@ import ( "sync" "time" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/v1" - metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" storageetcd "k8s.io/kubernetes/pkg/storage/etcd" - "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/util/wait" - "github.com/golang/glog" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/pkg/util/clock" ) type Manager interface { // Get secret by secret namespace and name. GetSecret(namespace, name string) (*v1.Secret, error) + // WARNING: Register/UnregisterPod functions should be efficient, + // i.e. should not block on network operations. + // RegisterPod registers all secrets from a given pod. RegisterPod(pod *v1.Pod) // UnregisterPod unregisters secrets from a given pod that are not - // registered still by any other registered pod. + // used by any other registered pod. UnregisterPod(pod *v1.Pod) } @@ -71,39 +73,63 @@ type objectKey struct { // secretStoreItems is a single item stored in secretStore. type secretStoreItem struct { - secret *v1.Secret - err error refCount int + secret *secretData +} + +type secretData struct { + sync.Mutex + + secret *v1.Secret + err error + lastUpdateTime time.Time } // secretStore is a local cache of secrets. type secretStore struct { kubeClient clientset.Interface + clock clock.Clock lock sync.Mutex items map[objectKey]*secretStoreItem + ttl time.Duration } -func newSecretStore(kubeClient clientset.Interface) *secretStore { +func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore { return &secretStore{ kubeClient: kubeClient, + clock: clock, items: make(map[objectKey]*secretStoreItem), + ttl: ttl, } } +func isSecretOlder(newSecret, oldSecret *v1.Secret) bool { + newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret) + oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret) + return newVersion < oldVersion +} + func (s *secretStore) Add(namespace, name string) { key := objectKey{namespace: namespace, name: name} - secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + // Add is called from RegisterPod, thus it needs to be efficient. + // Thus Add() is only increasing refCount and generation of a given secret. + // Then Get() is responsible for fetching if needed. s.lock.Lock() defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - item.secret = secret - item.err = err - item.refCount++ - } else { - s.items[key] = &secretStoreItem{secret: secret, err: err, refCount: 1} + item, exists := s.items[key] + if !exists { + item = &secretStoreItem{ + refCount: 0, + secret: &secretData{}, + } + s.items[key] = item } + + item.refCount++ + // This will trigger fetch on the next Get() operation. + item.secret = nil } func (s *secretStore) Delete(namespace, name string) { @@ -122,67 +148,51 @@ func (s *secretStore) Delete(namespace, name string) { func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) { key := objectKey{namespace: namespace, name: name} - s.lock.Lock() - defer s.lock.Unlock() - if item, ok := s.items[key]; ok { - return item.secret, item.err - } - return nil, fmt.Errorf("secret not registered") -} - -func (s *secretStore) Refresh() { - s.lock.Lock() - keys := make([]objectKey, 0, len(s.items)) - for key := range s.items { - keys = append(keys, key) - } - s.lock.Unlock() - - type result struct { - secret *v1.Secret - err error - } - results := make([]result, 0, len(keys)) - for _, key := range keys { - secret, err := s.kubeClient.Core().Secrets(key.namespace).Get(key.name, metav1.GetOptions{}) - if err != nil { - glog.Warningf("Unable to retrieve a secret %s/%s: %v", key.namespace, key.name, err) + data := func() *secretData { + s.lock.Lock() + defer s.lock.Unlock() + item, exists := s.items[key] + if !exists { + return nil } - results = append(results, result{secret: secret, err: err}) + if item.secret == nil { + item.secret = &secretData{} + } + return item.secret + }() + if data == nil { + return nil, fmt.Errorf("secret %q/%q not registered", namespace, name) } - s.lock.Lock() - defer s.lock.Unlock() - for i, key := range keys { - secret := results[i].secret - err := results[i].err - if err != nil && !apierrors.IsNotFound(err) { - // If we couldn't retrieve a secret and it wasn't 404 error, skip updating. - continue - } - if item, ok := s.items[key]; ok { - if secret != nil && item.secret != nil { - // If the fetched version is not newer than the current one (such races are - // possible), then skip update. - newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(secret) - oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(item.secret) - if newVersion <= oldVersion { - continue - } + // After updating data in secretStore, lock the data, fetch secret if + // needed and return data. + data.Lock() + defer data.Unlock() + if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) { + secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{}) + // Update state, unless we got error different than "not-found". + if err == nil || apierrors.IsNotFound(err) { + // Ignore the update to the older version of a secret. + if data.secret == nil || secret == nil || !isSecretOlder(secret, data.secret) { + data.secret = secret + data.err = err + data.lastUpdateTime = s.clock.Now() } - item.secret = secret - item.err = err + } else if data.secret == nil && data.err == nil { + // We have unitialized secretData - return current result. + return secret, err } } + return data.secret, data.err } // cachingSecretManager keeps a cache of all secrets necessary for registered pods. // It implements the following logic: -// - whenever a pod is created or updated, the current versions of all its secrets -// are grabbed from apiserver and stored in local cache -// - every GetSecret call is served from local cache -// - every X seconds we are refreshing the local cache by grabbing current version -// of all registered secrets from apiserver +// - whenever a pod is created or updated, the cached versions of all its secrets +// are invalidated +// - every GetSecret() call tries to fetch the value from local cache; if it is +// not there, invalidated or too old, we fetch it from apiserver and refresh the +// value in cache; otherwise it is just fetched from cache type cachingSecretManager struct { secretStore *secretStore @@ -192,10 +202,9 @@ type cachingSecretManager struct { func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) { csm := &cachingSecretManager{ - secretStore: newSecretStore(kubeClient), + secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute), registeredPods: make(map[objectKey]*v1.Pod), } - go wait.NonSlidingUntil(func() { csm.secretStore.Refresh() }, time.Minute, wait.NeverStop) return csm, nil } @@ -221,36 +230,33 @@ func getSecretNames(pod *v1.Pod) sets.String { } func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { - for key := range getSecretNames(pod) { - c.secretStore.Add(pod.Namespace, key) + names := getSecretNames(pod) + c.lock.Lock() + defer c.lock.Unlock() + for name := range names { + c.secretStore.Add(pod.Namespace, name) } var prev *v1.Pod - func() { - key := objectKey{namespace: pod.Namespace, name: pod.Name} - c.lock.Lock() - defer c.lock.Unlock() - prev = c.registeredPods[key] - c.registeredPods[key] = pod - }() + key := objectKey{namespace: pod.Namespace, name: pod.Name} + prev = c.registeredPods[key] + c.registeredPods[key] = pod if prev != nil { - for key := range getSecretNames(prev) { - c.secretStore.Delete(prev.Namespace, key) + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) } } } func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { var prev *v1.Pod - func() { - key := objectKey{namespace: pod.Namespace, name: pod.Name} - c.lock.Lock() - defer c.lock.Unlock() - prev = c.registeredPods[key] - delete(c.registeredPods, key) - }() + key := objectKey{namespace: pod.Namespace, name: pod.Name} + c.lock.Lock() + defer c.lock.Unlock() + prev = c.registeredPods[key] + delete(c.registeredPods, key) if prev != nil { - for key := range getSecretNames(prev) { - c.secretStore.Delete(prev.Namespace, key) + for name := range getSecretNames(prev) { + c.secretStore.Delete(prev.Namespace, name) } } } diff --git a/pkg/kubelet/secret/secret_manager_test.go b/pkg/kubelet/secret/secret_manager_test.go index d4142e85031..ad0b2985359 100644 --- a/pkg/kubelet/secret/secret_manager_test.go +++ b/pkg/kubelet/secret/secret_manager_test.go @@ -19,10 +19,17 @@ package secret import ( "fmt" "strings" + "sync" "testing" + "time" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/util/clock" + + "github.com/stretchr/testify/assert" ) func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) { @@ -30,14 +37,14 @@ func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist if shouldExist && err != nil { t.Errorf("unexpected actions: %#v", err) } - if !shouldExist && (err == nil || !strings.Contains(err.Error(), "secret not registered")) { + if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) { t.Errorf("unexpected actions: %#v", err) } } func TestSecretStore(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient) + store := newSecretStore(fakeClient, clock.RealClock{}, 0) store.Add("ns1", "name1") store.Add("ns2", "name2") store.Add("ns1", "name1") @@ -46,15 +53,21 @@ func TestSecretStore(t *testing.T) { store.Delete("ns2", "name2") store.Add("ns3", "name3") - // We expect one Get action per Add. + // Adds don't issue Get requests. actions := fakeClient.Actions() - if len(actions) != 5 { - t.Fatalf("unexpected actions: %#v", actions) - } + assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) + // Should issue Get request + store.Get("ns1", "name1") + // Shouldn't issue Get request, as secret is not registered + store.Get("ns2", "name2") + // Should issue Get request + store.Get("ns3", "name3") + + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + for _, a := range actions { - if !a.Matches("get", "secrets") { - t.Errorf("unexpected actions: %#v", a) - } + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) } checkSecret(t, store, "ns1", "name1", true) @@ -63,27 +76,57 @@ func TestSecretStore(t *testing.T) { checkSecret(t, store, "ns4", "name4", false) } -func TestSecretStoreRefresh(t *testing.T) { +func TestSecretStoreGetAlwaysRefresh(t *testing.T) { fakeClient := &fake.Clientset{} - store := newSecretStore(fakeClient) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, 0) for i := 0; i < 10; i++ { store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) } fakeClient.ClearActions() - store.Refresh() + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() actions := fakeClient.Actions() - if len(actions) != 10 { - t.Fatalf("unexpected actions: %#v", actions) - } + assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) + for _, a := range actions { - if !a.Matches("get", "secrets") { - t.Errorf("unexpected actions: %#v", a) - } + assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a) } } +func TestSecretStoreGetNeverRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + + for i := 0; i < 10; i++ { + store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) + } + fakeClient.ClearActions() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) + wg.Done() + }(i) + } + wg.Wait() + actions := fakeClient.Actions() + // Only first Get, should forward the Get request. + assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) +} + type secretsToAttach struct { imagePullSecretNames []string containerEnvSecretNames [][]string @@ -91,7 +134,7 @@ type secretsToAttach struct { func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { pod := &v1.Pod{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, @@ -120,9 +163,116 @@ func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod { return pod } +func TestCacheInvalidation(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + // Fetch both secrets - this should triggger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + actions := fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Update a pod with a new secret. + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s2)) + // All secrets should be invalidated - this should trigger get operations. + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() + + // Create a new pod that is refencing the first two secrets - those should + // be invalidated. + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + store.Get("ns1", "s1") + store.Get("ns1", "s2") + store.Get("ns1", "s3") + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + fakeClient.ClearActions() +} + +func TestCacheRefcounts(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, time.Minute) + manager := &cachingSecretManager{ + secretStore: store, + registeredPods: make(map[objectKey]*v1.Pod), + } + + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name1", s1)) + manager.RegisterPod(podWithSecrets("ns1", "name2", s1)) + s2 := secretsToAttach{ + imagePullSecretNames: []string{"s2"}, + containerEnvSecretNames: [][]string{{"s4"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name2", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name3", s2)) + manager.RegisterPod(podWithSecrets("ns1", "name4", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "name3", s2)) + s3 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecretNames: [][]string{{"s3"}, {"s5"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name5", s3)) + manager.RegisterPod(podWithSecrets("ns1", "name6", s3)) + s4 := secretsToAttach{ + imagePullSecretNames: []string{"s3"}, + containerEnvSecretNames: [][]string{{"s6"}}, + } + manager.RegisterPod(podWithSecrets("ns1", "name7", s4)) + manager.UnregisterPod(podWithSecrets("ns1", "name7", s4)) + + // Also check the Add + Update + Remove scenario. + manager.RegisterPod(podWithSecrets("ns1", "other-name", s1)) + manager.RegisterPod(podWithSecrets("ns1", "other-name", s2)) + manager.UnregisterPod(podWithSecrets("ns1", "other-name", s2)) + + // Now we have: 1 pod with s1, 2 pods with s2 and 2 pods with s3, 0 pods with s4. + verify := func(ns, name string, count int) bool { + store.lock.Lock() + defer store.lock.Unlock() + item, ok := store.items[objectKey{ns, name}] + if !ok { + return count == 0 + } + return item.refCount == count + } + assert.True(t, verify("ns1", "s1", 3)) + assert.True(t, verify("ns1", "s2", 3)) + assert.True(t, verify("ns1", "s3", 3)) + assert.True(t, verify("ns1", "s4", 2)) + assert.True(t, verify("ns1", "s5", 4)) + assert.True(t, verify("ns1", "s6", 0)) + assert.True(t, verify("ns1", "s7", 0)) +} + func TestCachingSecretManager(t *testing.T) { fakeClient := &fake.Clientset{} - secretStore := newSecretStore(fakeClient) + secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0) manager := &cachingSecretManager{ secretStore: secretStore, registeredPods: make(map[objectKey]*v1.Pod),