Merge pull request #36984 from wojtek-t/secret_manager

Automatic merge from submit-queue

Create SecretManager interface and switch to caching secrets in kubelet

Ref #19188

Obviously we would need to extend the interface to solve #19188 but this is good first step anyway.
This commit is contained in:
Kubernetes Submit Queue 2017-01-19 17:02:08 -08:00 committed by GitHub
commit f83802b317
18 changed files with 718 additions and 15 deletions

View File

@ -71,6 +71,7 @@ go_library(
"//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/rkt:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
@ -173,6 +174,7 @@ go_test(
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/prober/testing:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
@ -245,6 +247,7 @@ filegroup(
"//pkg/kubelet/remote:all-srcs",
"//pkg/kubelet/rkt:all-srcs",
"//pkg/kubelet/rktshim:all-srcs",
"//pkg/kubelet/secret:all-srcs",
"//pkg/kubelet/server:all-srcs",
"//pkg/kubelet/status:all-srcs",
"//pkg/kubelet/sysctl:all-srcs",

View File

@ -73,6 +73,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
@ -409,6 +410,11 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
}
containerRefManager := kubecontainer.NewRefManager()
secretManager, err := secret.NewSimpleSecretManager(kubeClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize secret manager: %v", err)
}
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
klet := &Kubelet{
@ -434,6 +440,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
diskSpaceManager: diskSpaceManager,
secretManager: secretManager,
cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider),
nodeRef: nodeRef,
@ -498,7 +505,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
// podManager is also responsible for keeping secretManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager)
if kubeCfg.RemoteRuntimeEndpoint != "" {
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
@ -913,6 +921,9 @@ type Kubelet struct {
// Diskspace manager.
diskSpaceManager diskSpaceManager
// Secret manager.
secretManager secret.Manager
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorapi.MachineInfo

View File

@ -524,7 +524,7 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *v1.Pod, container *v1.Container
if kl.kubeClient == nil {
return result, fmt.Errorf("Couldn't get secret %v/%v, no kubeClient defined", pod.Namespace, name)
}
secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name, metav1.GetOptions{})
secret, err = kl.secretManager.GetSecret(pod.Namespace, name)
if err != nil {
return result, err
}
@ -638,14 +638,11 @@ func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull
// secrets.
// TODO: duplicate secrets are being retrieved multiple times and there
// is no cache. Creating and using a secret manager interface will make this
// easier to address.
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) ([]v1.Secret, error) {
pullSecrets := []v1.Secret{}
for _, secretRef := range pod.Spec.ImagePullSecrets {
secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name, metav1.GetOptions{})
secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
if err != nil {
glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
continue

View File

@ -56,6 +56,7 @@ import (
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -166,7 +167,8 @@ func newTestKubeletWithImageList(
kubelet.cadvisor = mockCadvisor
fakeMirrorClient := podtest.NewFakeMirrorClient()
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
fakeSecretManager := secret.NewFakeManager()
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, fakeSecretManager)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager)
kubelet.containerRefManager = kubecontainer.NewRefManager()
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
@ -38,6 +39,7 @@ go_test(
"//pkg/api/v1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/secret"
)
// Manager stores and manages access to pods, maintaining the mappings
@ -112,13 +113,17 @@ type basicManager struct {
// Mirror pod UID to pod UID map.
translationByUID map[types.UID]types.UID
// basicManager is keeping secretManager up-to-date.
secretManager secret.Manager
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient) Manager {
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager) Manager {
pm := &basicManager{}
pm.secretManager = secretManager
pm.MirrorClient = client
pm.SetPods(nil)
return pm
@ -153,6 +158,11 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
// lock.
func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
for _, pod := range pods {
if pm.secretManager != nil {
// TODO: Consider detecting only status update and in such case do
// not register pod, as it doesn't really matter.
pm.secretManager.RegisterPod(pod)
}
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
@ -173,6 +183,9 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
func (pm *basicManager) DeletePod(pod *v1.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
if pm.secretManager != nil {
pm.secretManager.UnregisterPod(pod)
}
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)

View File

@ -24,13 +24,15 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/secret"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// Stub out mirror client for testing purpose.
func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient()
manager := NewBasicPodManager(fakeMirrorClient).(*basicManager)
secretManager := secret.NewFakeManager()
manager := NewBasicPodManager(fakeMirrorClient, secretManager).(*basicManager)
return manager, fakeMirrorClient
}

View File

@ -98,7 +98,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
func newTestManager() *manager {
refManager := kubecontainer.NewRefManager()
refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings.
podManager := kubepod.NewBasicPodManager(nil)
podManager := kubepod.NewBasicPodManager(nil, nil)
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager(

View File

@ -117,7 +117,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil))
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil))
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -39,6 +39,7 @@ import (
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
@ -59,7 +60,8 @@ func TestRunOnce(t *testing.T) {
Usage: 9 * mb,
Capacity: 10 * mb,
}, nil)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), secret.NewFakeManager())
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
fakeRuntime := &containertest.FakeRuntime{}
basePath, err := utiltesting.MkTmpdir("kubelet")

54
pkg/kubelet/secret/BUILD Normal file
View File

@ -0,0 +1,54 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["secret_manager_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/pkg/util/clock",
],
)
go_library(
name = "go_default_library",
srcs = [
"fake_manager.go",
"secret_manager.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/storage/etcd:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/client-go/pkg/util/clock",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,40 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package secret
import (
"k8s.io/kubernetes/pkg/api/v1"
)
// fakeManager implements Manager interface for testing purposes.
// simple operations to apiserver.
type fakeManager struct {
}
func NewFakeManager() Manager {
return &fakeManager{}
}
func (s *fakeManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return nil, nil
}
func (s *fakeManager) RegisterPod(pod *v1.Pod) {
}
func (s *fakeManager) UnregisterPod(pod *v1.Pod) {
}

View File

@ -0,0 +1,262 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package secret
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
storageetcd "k8s.io/kubernetes/pkg/storage/etcd"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/pkg/util/clock"
)
type Manager interface {
// Get secret by secret namespace and name.
GetSecret(namespace, name string) (*v1.Secret, error)
// WARNING: Register/UnregisterPod functions should be efficient,
// i.e. should not block on network operations.
// RegisterPod registers all secrets from a given pod.
RegisterPod(pod *v1.Pod)
// UnregisterPod unregisters secrets from a given pod that are not
// used by any other registered pod.
UnregisterPod(pod *v1.Pod)
}
// simpleSecretManager implements SecretManager interfaces with
// simple operations to apiserver.
type simpleSecretManager struct {
kubeClient clientset.Interface
}
func NewSimpleSecretManager(kubeClient clientset.Interface) (Manager, error) {
return &simpleSecretManager{kubeClient: kubeClient}, nil
}
func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{})
}
func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
}
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
}
type objectKey struct {
namespace string
name string
}
// secretStoreItems is a single item stored in secretStore.
type secretStoreItem struct {
refCount int
secret *secretData
}
type secretData struct {
sync.Mutex
secret *v1.Secret
err error
lastUpdateTime time.Time
}
// secretStore is a local cache of secrets.
type secretStore struct {
kubeClient clientset.Interface
clock clock.Clock
lock sync.Mutex
items map[objectKey]*secretStoreItem
ttl time.Duration
}
func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, ttl time.Duration) *secretStore {
return &secretStore{
kubeClient: kubeClient,
clock: clock,
items: make(map[objectKey]*secretStoreItem),
ttl: ttl,
}
}
func isSecretOlder(newSecret, oldSecret *v1.Secret) bool {
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret)
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret)
return newVersion < oldVersion
}
func (s *secretStore) Add(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
// Add is called from RegisterPod, thus it needs to be efficient.
// Thus Add() is only increasing refCount and generation of a given secret.
// Then Get() is responsible for fetching if needed.
s.lock.Lock()
defer s.lock.Unlock()
item, exists := s.items[key]
if !exists {
item = &secretStoreItem{
refCount: 0,
secret: &secretData{},
}
s.items[key] = item
}
item.refCount++
// This will trigger fetch on the next Get() operation.
item.secret = nil
}
func (s *secretStore) Delete(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
s.lock.Lock()
defer s.lock.Unlock()
if item, ok := s.items[key]; ok {
item.refCount--
if item.refCount == 0 {
delete(s.items, key)
}
}
}
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
key := objectKey{namespace: namespace, name: name}
data := func() *secretData {
s.lock.Lock()
defer s.lock.Unlock()
item, exists := s.items[key]
if !exists {
return nil
}
if item.secret == nil {
item.secret = &secretData{}
}
return item.secret
}()
if data == nil {
return nil, fmt.Errorf("secret %q/%q not registered", namespace, name)
}
// After updating data in secretStore, lock the data, fetch secret if
// needed and return data.
data.Lock()
defer data.Unlock()
if data.err != nil || !s.clock.Now().Before(data.lastUpdateTime.Add(s.ttl)) {
secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{})
// Update state, unless we got error different than "not-found".
if err == nil || apierrors.IsNotFound(err) {
// Ignore the update to the older version of a secret.
if data.secret == nil || secret == nil || !isSecretOlder(secret, data.secret) {
data.secret = secret
data.err = err
data.lastUpdateTime = s.clock.Now()
}
} else if data.secret == nil && data.err == nil {
// We have unitialized secretData - return current result.
return secret, err
}
}
return data.secret, data.err
}
// cachingSecretManager keeps a cache of all secrets necessary for registered pods.
// It implements the following logic:
// - whenever a pod is created or updated, the cached versions of all its secrets
// are invalidated
// - every GetSecret() call tries to fetch the value from local cache; if it is
// not there, invalidated or too old, we fetch it from apiserver and refresh the
// value in cache; otherwise it is just fetched from cache
type cachingSecretManager struct {
secretStore *secretStore
lock sync.Mutex
registeredPods map[objectKey]*v1.Pod
}
func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) {
csm := &cachingSecretManager{
secretStore: newSecretStore(kubeClient, clock.RealClock{}, time.Minute),
registeredPods: make(map[objectKey]*v1.Pod),
}
return csm, nil
}
func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return c.secretStore.Get(namespace, name)
}
// TODO: Before we will use secretManager in other places (e.g. for secret volumes)
// we should update this function to also get secrets from those places.
func getSecretNames(pod *v1.Pod) sets.String {
result := sets.NewString()
for _, reference := range pod.Spec.ImagePullSecrets {
result.Insert(reference.Name)
}
for i := range pod.Spec.Containers {
for _, envVar := range pod.Spec.Containers[i].Env {
if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil {
result.Insert(envVar.ValueFrom.SecretKeyRef.Name)
}
}
}
return result
}
func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
names := getSecretNames(pod)
c.lock.Lock()
defer c.lock.Unlock()
for name := range names {
c.secretStore.Add(pod.Namespace, name)
}
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name}
prev = c.registeredPods[key]
c.registeredPods[key] = pod
if prev != nil {
for name := range getSecretNames(prev) {
c.secretStore.Delete(prev.Namespace, name)
}
}
}
func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name}
c.lock.Lock()
defer c.lock.Unlock()
prev = c.registeredPods[key]
delete(c.registeredPods, key)
if prev != nil {
for name := range getSecretNames(prev) {
c.secretStore.Delete(prev.Namespace, name)
}
}
}

View File

@ -0,0 +1,311 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package secret
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/util/clock"
"github.com/stretchr/testify/assert"
)
func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) {
_, err := store.Get(ns, name)
if shouldExist && err != nil {
t.Errorf("unexpected actions: %#v", err)
}
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) {
t.Errorf("unexpected actions: %#v", err)
}
}
func TestSecretStore(t *testing.T) {
fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient, clock.RealClock{}, 0)
store.Add("ns1", "name1")
store.Add("ns2", "name2")
store.Add("ns1", "name1")
store.Add("ns1", "name1")
store.Delete("ns1", "name1")
store.Delete("ns2", "name2")
store.Add("ns3", "name3")
// Adds don't issue Get requests.
actions := fakeClient.Actions()
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
// Should issue Get request
store.Get("ns1", "name1")
// Shouldn't issue Get request, as secret is not registered
store.Get("ns2", "name2")
// Should issue Get request
store.Get("ns3", "name3")
actions = fakeClient.Actions()
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
for _, a := range actions {
assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a)
}
checkSecret(t, store, "ns1", "name1", true)
checkSecret(t, store, "ns2", "name2", false)
checkSecret(t, store, "ns3", "name3", true)
checkSecret(t, store, "ns4", "name4", false)
}
func TestSecretStoreGetAlwaysRefresh(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, 0)
for i := 0; i < 10; i++ {
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
}
fakeClient.ClearActions()
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
wg.Done()
}(i)
}
wg.Wait()
actions := fakeClient.Actions()
assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions)
for _, a := range actions {
assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a)
}
}
func TestSecretStoreGetNeverRefresh(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute)
for i := 0; i < 10; i++ {
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
}
fakeClient.ClearActions()
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
wg.Done()
}(i)
}
wg.Wait()
actions := fakeClient.Actions()
// Only first Get, should forward the Get request.
assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions)
}
type secretsToAttach struct {
imagePullSecretNames []string
containerEnvSecretNames [][]string
}
func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Spec: v1.PodSpec{},
}
for _, name := range toAttach.imagePullSecretNames {
pod.Spec.ImagePullSecrets = append(
pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name})
}
for i, names := range toAttach.containerEnvSecretNames {
container := v1.Container{
Name: fmt.Sprintf("container-%d", i),
}
for _, name := range names {
envSource := &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: name,
},
},
}
container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource})
}
pod.Spec.Containers = append(pod.Spec.Containers, container)
}
return pod
}
func TestCacheInvalidation(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute)
manager := &cachingSecretManager{
secretStore: store,
registeredPods: make(map[objectKey]*v1.Pod),
}
// Create a pod with some secrets.
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s1"}, {"s2"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
// Fetch both secrets - this should triggger get operations.
store.Get("ns1", "s1")
store.Get("ns1", "s2")
actions := fakeClient.Actions()
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
fakeClient.ClearActions()
// Update a pod with a new secret.
s2 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s2))
// All secrets should be invalidated - this should trigger get operations.
store.Get("ns1", "s1")
store.Get("ns1", "s2")
store.Get("ns1", "s3")
actions = fakeClient.Actions()
assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions)
fakeClient.ClearActions()
// Create a new pod that is refencing the first two secrets - those should
// be invalidated.
manager.RegisterPod(podWithSecrets("ns1", "name2", s1))
store.Get("ns1", "s1")
store.Get("ns1", "s2")
store.Get("ns1", "s3")
actions = fakeClient.Actions()
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
fakeClient.ClearActions()
}
func TestCacheRefcounts(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, time.Minute)
manager := &cachingSecretManager{
secretStore: store,
registeredPods: make(map[objectKey]*v1.Pod),
}
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s1"}, {"s2"}, {"s3"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
manager.RegisterPod(podWithSecrets("ns1", "name2", s1))
s2 := secretsToAttach{
imagePullSecretNames: []string{"s2"},
containerEnvSecretNames: [][]string{{"s4"}, {"s5"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name2", s2))
manager.RegisterPod(podWithSecrets("ns1", "name3", s2))
manager.RegisterPod(podWithSecrets("ns1", "name4", s2))
manager.UnregisterPod(podWithSecrets("ns1", "name3", s2))
s3 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s3"}, {"s5"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name5", s3))
manager.RegisterPod(podWithSecrets("ns1", "name6", s3))
s4 := secretsToAttach{
imagePullSecretNames: []string{"s3"},
containerEnvSecretNames: [][]string{{"s6"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name7", s4))
manager.UnregisterPod(podWithSecrets("ns1", "name7", s4))
// Also check the Add + Update + Remove scenario.
manager.RegisterPod(podWithSecrets("ns1", "other-name", s1))
manager.RegisterPod(podWithSecrets("ns1", "other-name", s2))
manager.UnregisterPod(podWithSecrets("ns1", "other-name", s2))
// Now we have: 1 pod with s1, 2 pods with s2 and 2 pods with s3, 0 pods with s4.
verify := func(ns, name string, count int) bool {
store.lock.Lock()
defer store.lock.Unlock()
item, ok := store.items[objectKey{ns, name}]
if !ok {
return count == 0
}
return item.refCount == count
}
assert.True(t, verify("ns1", "s1", 3))
assert.True(t, verify("ns1", "s2", 3))
assert.True(t, verify("ns1", "s3", 3))
assert.True(t, verify("ns1", "s4", 2))
assert.True(t, verify("ns1", "s5", 4))
assert.True(t, verify("ns1", "s6", 0))
assert.True(t, verify("ns1", "s7", 0))
}
func TestCachingSecretManager(t *testing.T) {
fakeClient := &fake.Clientset{}
secretStore := newSecretStore(fakeClient, clock.RealClock{}, 0)
manager := &cachingSecretManager{
secretStore: secretStore,
registeredPods: make(map[objectKey]*v1.Pod),
}
// Create a pod with some secrets.
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s1"}, {"s2"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
// Update the pod with a different secrets.
s2 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s3"}, {"s4"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s2))
// Create another pod, but with same secrets in different namespace.
manager.RegisterPod(podWithSecrets("ns2", "name2", s2))
// Create and delete a pod with some other secrets.
s3 := secretsToAttach{
imagePullSecretNames: []string{"s5"},
containerEnvSecretNames: [][]string{{"s6"}},
}
manager.RegisterPod(podWithSecrets("ns3", "name", s3))
manager.UnregisterPod(podWithSecrets("ns3", "name", s3))
// We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2.
for _, ns := range []string{"ns1", "ns2", "ns3"} {
for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6"} {
shouldExist :=
(secret == "s1" || secret == "s3" || secret == "s4") && (ns == "ns1" || ns == "ns2")
checkSecret(t, secretStore, ns, secret, shouldExist)
}
}
}

View File

@ -50,6 +50,7 @@ go_test(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/api/errors",

View File

@ -38,6 +38,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
kubesecret "k8s.io/kubernetes/pkg/kubelet/secret"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -71,7 +72,7 @@ func (m *manager) testSyncBatch() {
}
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager())
podManager.AddPod(getTestPod())
return NewManager(kubeClient, podManager).(*manager)
}

View File

@ -50,6 +50,7 @@ go_test(
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/volume:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/pod"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/util/mount"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
@ -52,7 +53,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager())
node, pod, pv, claim := createObjects()
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -97,7 +98,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager())
node, pod, _, claim := createObjects()