SecretManager with caching

This commit is contained in:
Wojciech Tyczynski 2016-11-18 13:14:03 +01:00
parent 85ee9e570b
commit ffd8daf488
6 changed files with 460 additions and 45 deletions

View File

@ -29,7 +29,6 @@ go_library(
"reason_cache.go", "reason_cache.go",
"runonce.go", "runonce.go",
"runtime.go", "runtime.go",
"secret_manager.go",
"util.go", "util.go",
"volume_host.go", "volume_host.go",
], ],
@ -72,6 +71,7 @@ go_library(
"//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/remote:go_default_library", "//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/rkt:go_default_library", "//pkg/kubelet/rkt:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/server/stats:go_default_library",

View File

@ -73,6 +73,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote" "k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/server/streaming"
@ -409,8 +410,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
} }
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
// TODO: Create and use a more sophisticated secret mamanger. secretManager, err := secret.NewSimpleSecretManager(kubeClient)
secretManager, err := newSimpleSecretManager(kubeClient)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize secret manager: %v", err) return nil, fmt.Errorf("failed to initialize secret manager: %v", err)
} }
@ -921,7 +921,7 @@ type Kubelet struct {
diskSpaceManager diskSpaceManager diskSpaceManager diskSpaceManager
// Secret manager. // Secret manager.
secretManager secretManager secretManager secret.Manager
// Cached MachineInfo returned by cadvisor. // Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorapi.MachineInfo machineInfo *cadvisorapi.MachineInfo

39
pkg/kubelet/secret/BUILD Normal file
View File

@ -0,0 +1,39 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["secret_manager_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"fake_manager.go",
"secret_manager.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/storage/etcd:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -0,0 +1,256 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package secret
import (
"fmt"
"sync"
"time"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
storageetcd "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
)
type Manager interface {
// Get secret by secret namespace and name.
GetSecret(namespace, name string) (*v1.Secret, error)
// RegisterPod registers all secrets from a given pod.
RegisterPod(pod *v1.Pod)
// UnregisterPod unregisters secrets from a given pod that are not
// registered still by any other registered pod.
UnregisterPod(pod *v1.Pod)
}
// simpleSecretManager implements SecretManager interfaces with
// simple operations to apiserver.
type simpleSecretManager struct {
kubeClient clientset.Interface
}
func NewSimpleSecretManager(kubeClient clientset.Interface) (Manager, error) {
return &simpleSecretManager{kubeClient: kubeClient}, nil
}
func (s *simpleSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{})
}
func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
}
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
}
type objectKey struct {
namespace string
name string
}
// secretStoreItems is a single item stored in secretStore.
type secretStoreItem struct {
secret *v1.Secret
err error
refCount int
}
// secretStore is a local cache of secrets.
type secretStore struct {
kubeClient clientset.Interface
lock sync.Mutex
items map[objectKey]*secretStoreItem
}
func newSecretStore(kubeClient clientset.Interface) *secretStore {
return &secretStore{
kubeClient: kubeClient,
items: make(map[objectKey]*secretStoreItem),
}
}
func (s *secretStore) Add(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
secret, err := s.kubeClient.Core().Secrets(namespace).Get(name, metav1.GetOptions{})
s.lock.Lock()
defer s.lock.Unlock()
if item, ok := s.items[key]; ok {
item.secret = secret
item.err = err
item.refCount++
} else {
s.items[key] = &secretStoreItem{secret: secret, err: err, refCount: 1}
}
}
func (s *secretStore) Delete(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
s.lock.Lock()
defer s.lock.Unlock()
if item, ok := s.items[key]; ok {
item.refCount--
if item.refCount == 0 {
delete(s.items, key)
}
}
}
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
key := objectKey{namespace: namespace, name: name}
s.lock.Lock()
defer s.lock.Unlock()
if item, ok := s.items[key]; ok {
return item.secret, item.err
}
return nil, fmt.Errorf("secret not registered")
}
func (s *secretStore) Refresh() {
s.lock.Lock()
keys := make([]objectKey, 0, len(s.items))
for key := range s.items {
keys = append(keys, key)
}
s.lock.Unlock()
type result struct {
secret *v1.Secret
err error
}
results := make([]result, 0, len(keys))
for _, key := range keys {
secret, err := s.kubeClient.Core().Secrets(key.namespace).Get(key.name, metav1.GetOptions{})
if err != nil {
glog.Warningf("Unable to retrieve a secret %s/%s: %v", key.namespace, key.name, err)
}
results = append(results, result{secret: secret, err: err})
}
s.lock.Lock()
defer s.lock.Unlock()
for i, key := range keys {
secret := results[i].secret
err := results[i].err
if err != nil && !apierrors.IsNotFound(err) {
// If we couldn't retrieve a secret and it wasn't 404 error, skip updating.
continue
}
if item, ok := s.items[key]; ok {
if secret != nil && item.secret != nil {
// If the fetched version is not newer than the current one (such races are
// possible), then skip update.
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(secret)
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(item.secret)
if newVersion <= oldVersion {
continue
}
}
item.secret = secret
item.err = err
}
}
}
// cachingSecretManager keeps a cache of all secrets necessary for registered pods.
// It implements the following logic:
// - whenever a pod is created or updated, the current versions of all its secrets
// are grabbed from apiserver and stored in local cache
// - every GetSecret call is served from local cache
// - every X seconds we are refreshing the local cache by grabbing current version
// of all registered secrets from apiserver
type cachingSecretManager struct {
secretStore *secretStore
lock sync.Mutex
registeredPods map[objectKey]*v1.Pod
}
func NewCachingSecretManager(kubeClient clientset.Interface) (Manager, error) {
csm := &cachingSecretManager{
secretStore: newSecretStore(kubeClient),
registeredPods: make(map[objectKey]*v1.Pod),
}
go wait.NonSlidingUntil(func() { csm.secretStore.Refresh() }, time.Minute, wait.NeverStop)
return csm, nil
}
func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
return c.secretStore.Get(namespace, name)
}
// TODO: Before we will use secretManager in other places (e.g. for secret volumes)
// we should update this function to also get secrets from those places.
func getSecretNames(pod *v1.Pod) sets.String {
result := sets.NewString()
for _, reference := range pod.Spec.ImagePullSecrets {
result.Insert(reference.Name)
}
for i := range pod.Spec.Containers {
for _, envVar := range pod.Spec.Containers[i].Env {
if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil {
result.Insert(envVar.ValueFrom.SecretKeyRef.Name)
}
}
}
return result
}
func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
for key := range getSecretNames(pod) {
c.secretStore.Add(pod.Namespace, key)
}
var prev *v1.Pod
func() {
key := objectKey{namespace: pod.Namespace, name: pod.Name}
c.lock.Lock()
defer c.lock.Unlock()
prev = c.registeredPods[key]
c.registeredPods[key] = pod
}()
if prev != nil {
for key := range getSecretNames(prev) {
c.secretStore.Delete(prev.Namespace, key)
}
}
}
func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
var prev *v1.Pod
func() {
key := objectKey{namespace: pod.Namespace, name: pod.Name}
c.lock.Lock()
defer c.lock.Unlock()
prev = c.registeredPods[key]
delete(c.registeredPods, key)
}()
if prev != nil {
for key := range getSecretNames(prev) {
c.secretStore.Delete(prev.Namespace, key)
}
}
}

View File

@ -0,0 +1,161 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package secret
import (
"fmt"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
)
func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) {
_, err := store.Get(ns, name)
if shouldExist && err != nil {
t.Errorf("unexpected actions: %#v", err)
}
if !shouldExist && (err == nil || !strings.Contains(err.Error(), "secret not registered")) {
t.Errorf("unexpected actions: %#v", err)
}
}
func TestSecretStore(t *testing.T) {
fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient)
store.Add("ns1", "name1")
store.Add("ns2", "name2")
store.Add("ns1", "name1")
store.Add("ns1", "name1")
store.Delete("ns1", "name1")
store.Delete("ns2", "name2")
store.Add("ns3", "name3")
// We expect one Get action per Add.
actions := fakeClient.Actions()
if len(actions) != 5 {
t.Fatalf("unexpected actions: %#v", actions)
}
for _, a := range actions {
if !a.Matches("get", "secrets") {
t.Errorf("unexpected actions: %#v", a)
}
}
checkSecret(t, store, "ns1", "name1", true)
checkSecret(t, store, "ns2", "name2", false)
checkSecret(t, store, "ns3", "name3", true)
checkSecret(t, store, "ns4", "name4", false)
}
func TestSecretStoreRefresh(t *testing.T) {
fakeClient := &fake.Clientset{}
store := newSecretStore(fakeClient)
for i := 0; i < 10; i++ {
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
}
fakeClient.ClearActions()
store.Refresh()
actions := fakeClient.Actions()
if len(actions) != 10 {
t.Fatalf("unexpected actions: %#v", actions)
}
for _, a := range actions {
if !a.Matches("get", "secrets") {
t.Errorf("unexpected actions: %#v", a)
}
}
}
type secretsToAttach struct {
imagePullSecretNames []string
containerEnvSecretNames [][]string
}
func podWithSecrets(ns, name string, toAttach secretsToAttach) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Namespace: ns,
Name: name,
},
Spec: v1.PodSpec{},
}
for _, name := range toAttach.imagePullSecretNames {
pod.Spec.ImagePullSecrets = append(
pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name})
}
for i, names := range toAttach.containerEnvSecretNames {
container := v1.Container{
Name: fmt.Sprintf("container-%d", i),
}
for _, name := range names {
envSource := &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: name,
},
},
}
container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource})
}
pod.Spec.Containers = append(pod.Spec.Containers, container)
}
return pod
}
func TestCachingSecretManager(t *testing.T) {
fakeClient := &fake.Clientset{}
secretStore := newSecretStore(fakeClient)
manager := &cachingSecretManager{
secretStore: secretStore,
registeredPods: make(map[objectKey]*v1.Pod),
}
// Create a pod with some secrets.
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s1"}, {"s2"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
// Update the pod with a different secrets.
s2 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecretNames: [][]string{{"s3"}, {"s4"}},
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s2))
// Create another pod, but with same secrets in different namespace.
manager.RegisterPod(podWithSecrets("ns2", "name2", s2))
// Create and delete a pod with some other secrets.
s3 := secretsToAttach{
imagePullSecretNames: []string{"s5"},
containerEnvSecretNames: [][]string{{"s6"}},
}
manager.RegisterPod(podWithSecrets("ns3", "name", s3))
manager.UnregisterPod(podWithSecrets("ns3", "name", s3))
// We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2.
for _, ns := range []string{"ns1", "ns2", "ns3"} {
for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6"} {
shouldExist :=
(secret == "s1" || secret == "s3" || secret == "s4") && (ns == "ns1" || ns == "ns2")
checkSecret(t, secretStore, ns, secret, shouldExist)
}
}
}

View File

@ -1,41 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)
type secretManager interface {
// Get secret by secret namespace and name.
GetSecret(namespace, name string) (*api.Secret, error)
}
// simpleSecretManager implements SecretManager interfaces with
// simple operations to apiserver.
type simpleSecretManager struct {
kubeClient clientset.Interface
}
func newSimpleSecretManager(kubeClient clientset.Interface) (secretManager, error) {
return &simpleSecretManager{kubeClient: kubeClient}, nil
}
func (s *simpleSecretManager) GetSecret(namespace, name string) (*api.Secret, error) {
return s.kubeClient.Core().Secrets(namespace).Get(name)
}