mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Merge pull request #63857 from wojtek-t/collapse_secret_manager
Automatic merge from submit-queue (batch tested with PRs 63886, 63857, 63824). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Refactor cache based manager This is support to be no-op refactoring. It will only allow to share code between secret and configmap managers.
This commit is contained in:
commit
8f0bb37fdc
@ -83,6 +83,7 @@ go_library(
|
|||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//pkg/kubelet/util:go_default_library",
|
"//pkg/kubelet/util:go_default_library",
|
||||||
"//pkg/kubelet/util/format:go_default_library",
|
"//pkg/kubelet/util/format:go_default_library",
|
||||||
|
"//pkg/kubelet/util/manager:go_default_library",
|
||||||
"//pkg/kubelet/util/queue:go_default_library",
|
"//pkg/kubelet/util/queue:go_default_library",
|
||||||
"//pkg/kubelet/util/sliceutils:go_default_library",
|
"//pkg/kubelet/util/sliceutils:go_default_library",
|
||||||
"//pkg/kubelet/volumemanager:go_default_library",
|
"//pkg/kubelet/volumemanager:go_default_library",
|
||||||
|
@ -92,6 +92,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||||
@ -542,7 +543,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
secretManager := secret.NewCachingSecretManager(
|
secretManager := secret.NewCachingSecretManager(
|
||||||
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
|
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||||
klet.secretManager = secretManager
|
klet.secretManager = secretManager
|
||||||
|
|
||||||
configMapManager := configmap.NewCachingConfigMapManager(
|
configMapManager := configmap.NewCachingConfigMapManager(
|
||||||
|
@ -8,37 +8,34 @@ load(
|
|||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = ["caching_secret_manager_test.go"],
|
srcs = ["secret_manager_test.go"],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//pkg/kubelet/util/manager:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"caching_secret_manager.go",
|
|
||||||
"fake_manager.go",
|
"fake_manager.go",
|
||||||
"secret_manager.go",
|
"secret_manager.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/secret",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/secret",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/kubelet/util:go_default_library",
|
"//pkg/kubelet/util/manager:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
|
||||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1,206 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package secret
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultTTL = time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
type GetObjectTTLFunc func() (time.Duration, bool)
|
|
||||||
|
|
||||||
// secretStoreItems is a single item stored in secretStore.
|
|
||||||
type secretStoreItem struct {
|
|
||||||
refCount int
|
|
||||||
secret *secretData
|
|
||||||
}
|
|
||||||
|
|
||||||
type secretData struct {
|
|
||||||
sync.Mutex
|
|
||||||
|
|
||||||
secret *v1.Secret
|
|
||||||
err error
|
|
||||||
lastUpdateTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// secretStore is a local cache of secrets.
|
|
||||||
type secretStore struct {
|
|
||||||
kubeClient clientset.Interface
|
|
||||||
clock clock.Clock
|
|
||||||
|
|
||||||
lock sync.Mutex
|
|
||||||
items map[objectKey]*secretStoreItem
|
|
||||||
|
|
||||||
defaultTTL time.Duration
|
|
||||||
getTTL GetObjectTTLFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore {
|
|
||||||
return &secretStore{
|
|
||||||
kubeClient: kubeClient,
|
|
||||||
clock: clock,
|
|
||||||
items: make(map[objectKey]*secretStoreItem),
|
|
||||||
defaultTTL: ttl,
|
|
||||||
getTTL: getTTL,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isSecretOlder(newSecret, oldSecret *v1.Secret) bool {
|
|
||||||
if newSecret == nil || oldSecret == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret)
|
|
||||||
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret)
|
|
||||||
return newVersion < oldVersion
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *secretStore) AddReference(namespace, name string) {
|
|
||||||
key := objectKey{namespace: namespace, name: name}
|
|
||||||
|
|
||||||
// AddReference is called from RegisterPod, thus it needs to be efficient.
|
|
||||||
// Thus Add() is only increasing refCount and generation of a given secret.
|
|
||||||
// Then Get() is responsible for fetching if needed.
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
item, exists := s.items[key]
|
|
||||||
if !exists {
|
|
||||||
item = &secretStoreItem{
|
|
||||||
refCount: 0,
|
|
||||||
secret: &secretData{},
|
|
||||||
}
|
|
||||||
s.items[key] = item
|
|
||||||
}
|
|
||||||
|
|
||||||
item.refCount++
|
|
||||||
// This will trigger fetch on the next Get() operation.
|
|
||||||
item.secret = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *secretStore) DeleteReference(namespace, name string) {
|
|
||||||
key := objectKey{namespace: namespace, name: name}
|
|
||||||
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
if item, ok := s.items[key]; ok {
|
|
||||||
item.refCount--
|
|
||||||
if item.refCount == 0 {
|
|
||||||
delete(s.items, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
|
||||||
return func() (time.Duration, bool) {
|
|
||||||
node, err := getNode()
|
|
||||||
if err != nil {
|
|
||||||
return time.Duration(0), false
|
|
||||||
}
|
|
||||||
if node != nil && node.Annotations != nil {
|
|
||||||
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
|
||||||
if intValue, err := strconv.Atoi(value); err == nil {
|
|
||||||
return time.Duration(intValue) * time.Second, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return time.Duration(0), false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *secretStore) isSecretFresh(data *secretData) bool {
|
|
||||||
secretTTL := s.defaultTTL
|
|
||||||
if ttl, ok := s.getTTL(); ok {
|
|
||||||
secretTTL = ttl
|
|
||||||
}
|
|
||||||
return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
|
|
||||||
key := objectKey{namespace: namespace, name: name}
|
|
||||||
|
|
||||||
data := func() *secretData {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
item, exists := s.items[key]
|
|
||||||
if !exists {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if item.secret == nil {
|
|
||||||
item.secret = &secretData{}
|
|
||||||
}
|
|
||||||
return item.secret
|
|
||||||
}()
|
|
||||||
if data == nil {
|
|
||||||
return nil, fmt.Errorf("secret %q/%q not registered", namespace, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// After updating data in secretStore, lock the data, fetch secret if
|
|
||||||
// needed and return data.
|
|
||||||
data.Lock()
|
|
||||||
defer data.Unlock()
|
|
||||||
if data.err != nil || !s.isSecretFresh(data) {
|
|
||||||
opts := metav1.GetOptions{}
|
|
||||||
if data.secret != nil && data.err == nil {
|
|
||||||
// This is just a periodic refresh of a secret we successfully fetched previously.
|
|
||||||
// In this case, server data from apiserver cache to reduce the load on both
|
|
||||||
// etcd and apiserver (the cache is eventually consistent).
|
|
||||||
util.FromApiserverCache(&opts)
|
|
||||||
}
|
|
||||||
secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
|
||||||
if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil {
|
|
||||||
// Couldn't fetch the latest secret, but there is no cached data to return.
|
|
||||||
// Return the fetch result instead.
|
|
||||||
return secret, err
|
|
||||||
}
|
|
||||||
if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) {
|
|
||||||
// If the fetch succeeded with a newer version of the secret, or if the
|
|
||||||
// secret could not be found in the apiserver, update the cached data to
|
|
||||||
// reflect the current status.
|
|
||||||
data.secret = secret
|
|
||||||
data.err = err
|
|
||||||
data.lastUpdateTime = s.clock.Now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return data.secret, data.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCachingSecretManager creates a manager that keeps a cache of all secrets
|
|
||||||
// necessary for registered pods.
|
|
||||||
// It implements the following logic:
|
|
||||||
// - whenever a pod is created or updated, the cached versions of all its secrets
|
|
||||||
// are invalidated
|
|
||||||
// - every GetSecret() call tries to fetch the value from local cache; if it is
|
|
||||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
|
||||||
// value in cache; otherwise it is just fetched from cache
|
|
||||||
func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager {
|
|
||||||
secretStore := newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL)
|
|
||||||
return newCacheBasedSecretManager(secretStore)
|
|
||||||
}
|
|
@ -17,13 +17,17 @@ limitations under the License.
|
|||||||
package secret
|
package secret
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -42,11 +46,6 @@ type Manager interface {
|
|||||||
UnregisterPod(pod *v1.Pod)
|
UnregisterPod(pod *v1.Pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
type objectKey struct {
|
|
||||||
namespace string
|
|
||||||
name string
|
|
||||||
}
|
|
||||||
|
|
||||||
// simpleSecretManager implements SecretManager interfaces with
|
// simpleSecretManager implements SecretManager interfaces with
|
||||||
// simple operations to apiserver.
|
// simple operations to apiserver.
|
||||||
type simpleSecretManager struct {
|
type simpleSecretManager struct {
|
||||||
@ -67,42 +66,31 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
|
|||||||
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
|
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// store is the interface for a secrets cache that
|
// cachingSecretManager keeps a store with secrets necessary
|
||||||
// can be used by cacheBasedSecretManager.
|
|
||||||
type store interface {
|
|
||||||
// AddReference adds a reference to the secret to the store.
|
|
||||||
// Note that multiple additions to the store has to be allowed
|
|
||||||
// in the implementations and effectively treated as refcounted.
|
|
||||||
AddReference(namespace, name string)
|
|
||||||
// DeleteReference deletes reference to the secret from the store.
|
|
||||||
// Note that secret should be deleted only when there was a
|
|
||||||
// corresponding Delete call for each of Add calls (effectively
|
|
||||||
// when refcount was reduced to zero).
|
|
||||||
DeleteReference(namespace, name string)
|
|
||||||
// Get a secret from a store.
|
|
||||||
Get(namespace, name string) (*v1.Secret, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// cachingBasedSecretManager keeps a store with secrets necessary
|
|
||||||
// for registered pods. Different implementations of the store
|
// for registered pods. Different implementations of the store
|
||||||
// may result in different semantics for freshness of secrets
|
// may result in different semantics for freshness of secrets
|
||||||
// (e.g. ttl-based implementation vs watch-based implementation).
|
// (e.g. ttl-based implementation vs watch-based implementation).
|
||||||
type cacheBasedSecretManager struct {
|
type cachingSecretManager struct {
|
||||||
secretStore store
|
manager manager.Manager
|
||||||
|
|
||||||
lock sync.Mutex
|
|
||||||
registeredPods map[objectKey]*v1.Pod
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheBasedSecretManager(secretStore store) Manager {
|
func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
||||||
return &cacheBasedSecretManager{
|
object, err := c.manager.GetObject(namespace, name)
|
||||||
secretStore: secretStore,
|
if err != nil {
|
||||||
registeredPods: make(map[objectKey]*v1.Pod),
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if secret, ok := object.(*v1.Secret); ok {
|
||||||
|
return secret, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("unexpected object type: %v", object)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheBasedSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
|
||||||
return c.secretStore.Get(namespace, name)
|
c.manager.RegisterPod(pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||||
|
c.manager.UnregisterPod(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSecretNames(pod *v1.Pod) sets.String {
|
func getSecretNames(pod *v1.Pod) sets.String {
|
||||||
@ -114,39 +102,24 @@ func getSecretNames(pod *v1.Pod) sets.String {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheBasedSecretManager) RegisterPod(pod *v1.Pod) {
|
const (
|
||||||
names := getSecretNames(pod)
|
defaultTTL = time.Minute
|
||||||
c.lock.Lock()
|
)
|
||||||
defer c.lock.Unlock()
|
|
||||||
for name := range names {
|
|
||||||
c.secretStore.AddReference(pod.Namespace, name)
|
|
||||||
}
|
|
||||||
var prev *v1.Pod
|
|
||||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
|
||||||
prev = c.registeredPods[key]
|
|
||||||
c.registeredPods[key] = pod
|
|
||||||
if prev != nil {
|
|
||||||
for name := range getSecretNames(prev) {
|
|
||||||
// On an update, the .Add() call above will have re-incremented the
|
|
||||||
// ref count of any existing secrets, so any secrets that are in both
|
|
||||||
// names and prev need to have their ref counts decremented. Any that
|
|
||||||
// are only in prev need to be completely removed. This unconditional
|
|
||||||
// call takes care of both cases.
|
|
||||||
c.secretStore.DeleteReference(prev.Namespace, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cacheBasedSecretManager) UnregisterPod(pod *v1.Pod) {
|
// NewCacheBasedManager creates a manager that keeps a cache of all secrets
|
||||||
var prev *v1.Pod
|
// necessary for registered pods.
|
||||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
// It implements the following logic:
|
||||||
c.lock.Lock()
|
// - whenever a pod is created or updated, the cached versions of all secrets
|
||||||
defer c.lock.Unlock()
|
// are invalidated
|
||||||
prev = c.registeredPods[key]
|
// - every GetObject() call tries to fetch the value from local cache; if it is
|
||||||
delete(c.registeredPods, key)
|
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||||
if prev != nil {
|
// value in cache; otherwise it is just fetched from cache
|
||||||
for name := range getSecretNames(prev) {
|
func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager {
|
||||||
c.secretStore.DeleteReference(prev.Namespace, name)
|
getSecret := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
|
||||||
|
return kubeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||||
}
|
}
|
||||||
|
secretStore := manager.NewObjectStore(getSecret, clock.RealClock{}, getTTL, defaultTTL)
|
||||||
|
return &cachingSecretManager{
|
||||||
|
manager: manager.NewCacheBasedManager(secretStore, getSecretNames),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
156
pkg/kubelet/secret/secret_manager_test.go
Normal file
156
pkg/kubelet/secret/secret_manager_test.go
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package secret
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
func checkObject(t *testing.T, store manager.Store, ns, name string, shouldExist bool) {
|
||||||
|
_, err := store.Get(ns, name)
|
||||||
|
if shouldExist && err != nil {
|
||||||
|
t.Errorf("unexpected actions: %#v", err)
|
||||||
|
}
|
||||||
|
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) {
|
||||||
|
t.Errorf("unexpected actions: %#v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func noObjectTTL() (time.Duration, bool) {
|
||||||
|
return time.Duration(0), false
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSecret(fakeClient clientset.Interface) manager.GetObjectFunc {
|
||||||
|
return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
|
||||||
|
return fakeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type envSecrets struct {
|
||||||
|
envVarNames []string
|
||||||
|
envFromNames []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type secretsToAttach struct {
|
||||||
|
imagePullSecretNames []string
|
||||||
|
containerEnvSecrets []envSecrets
|
||||||
|
}
|
||||||
|
|
||||||
|
func podWithSecrets(ns, podName string, toAttach secretsToAttach) *v1.Pod {
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: podName,
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{},
|
||||||
|
}
|
||||||
|
for _, name := range toAttach.imagePullSecretNames {
|
||||||
|
pod.Spec.ImagePullSecrets = append(
|
||||||
|
pod.Spec.ImagePullSecrets, v1.LocalObjectReference{Name: name})
|
||||||
|
}
|
||||||
|
for i, secrets := range toAttach.containerEnvSecrets {
|
||||||
|
container := v1.Container{
|
||||||
|
Name: fmt.Sprintf("container-%d", i),
|
||||||
|
}
|
||||||
|
for _, name := range secrets.envFromNames {
|
||||||
|
envFrom := v1.EnvFromSource{
|
||||||
|
SecretRef: &v1.SecretEnvSource{
|
||||||
|
LocalObjectReference: v1.LocalObjectReference{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
container.EnvFrom = append(container.EnvFrom, envFrom)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, name := range secrets.envVarNames {
|
||||||
|
envSource := &v1.EnvVarSource{
|
||||||
|
SecretKeyRef: &v1.SecretKeySelector{
|
||||||
|
LocalObjectReference: v1.LocalObjectReference{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
container.Env = append(container.Env, v1.EnvVar{ValueFrom: envSource})
|
||||||
|
}
|
||||||
|
pod.Spec.Containers = append(pod.Spec.Containers, container)
|
||||||
|
}
|
||||||
|
return pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheBasedSecretManager(t *testing.T) {
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
store := manager.NewObjectStore(getSecret(fakeClient), clock.RealClock{}, noObjectTTL, 0)
|
||||||
|
manager := &cachingSecretManager{
|
||||||
|
manager: manager.NewCacheBasedManager(store, getSecretNames),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a pod with some secrets.
|
||||||
|
s1 := secretsToAttach{
|
||||||
|
imagePullSecretNames: []string{"s1"},
|
||||||
|
containerEnvSecrets: []envSecrets{
|
||||||
|
{envVarNames: []string{"s1"}},
|
||||||
|
{envVarNames: []string{"s2"}},
|
||||||
|
{envFromNames: []string{"s20"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
|
||||||
|
// Update the pod with a different secrets.
|
||||||
|
s2 := secretsToAttach{
|
||||||
|
imagePullSecretNames: []string{"s1"},
|
||||||
|
containerEnvSecrets: []envSecrets{
|
||||||
|
{envVarNames: []string{"s3"}},
|
||||||
|
{envVarNames: []string{"s4"}},
|
||||||
|
{envFromNames: []string{"s40"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manager.RegisterPod(podWithSecrets("ns1", "name1", s2))
|
||||||
|
// Create another pod, but with same secrets in different namespace.
|
||||||
|
manager.RegisterPod(podWithSecrets("ns2", "name2", s2))
|
||||||
|
// Create and delete a pod with some other secrets.
|
||||||
|
s3 := secretsToAttach{
|
||||||
|
imagePullSecretNames: []string{"s5"},
|
||||||
|
containerEnvSecrets: []envSecrets{
|
||||||
|
{envVarNames: []string{"s6"}},
|
||||||
|
{envFromNames: []string{"s60"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manager.RegisterPod(podWithSecrets("ns3", "name", s3))
|
||||||
|
manager.UnregisterPod(podWithSecrets("ns3", "name", s3))
|
||||||
|
|
||||||
|
// We should have only: s1, s3 and s4 secrets in namespaces: ns1 and ns2.
|
||||||
|
for _, ns := range []string{"ns1", "ns2", "ns3"} {
|
||||||
|
for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} {
|
||||||
|
shouldExist :=
|
||||||
|
(secret == "s1" || secret == "s3" || secret == "s4" || secret == "s40") && (ns == "ns1" || ns == "ns2")
|
||||||
|
checkObject(t, store, ns, secret, shouldExist)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -92,6 +92,7 @@ filegroup(
|
|||||||
"//pkg/kubelet/util/cache:all-srcs",
|
"//pkg/kubelet/util/cache:all-srcs",
|
||||||
"//pkg/kubelet/util/format:all-srcs",
|
"//pkg/kubelet/util/format:all-srcs",
|
||||||
"//pkg/kubelet/util/ioutils:all-srcs",
|
"//pkg/kubelet/util/ioutils:all-srcs",
|
||||||
|
"//pkg/kubelet/util/manager:all-srcs",
|
||||||
"//pkg/kubelet/util/queue:all-srcs",
|
"//pkg/kubelet/util/queue:all-srcs",
|
||||||
"//pkg/kubelet/util/sliceutils:all-srcs",
|
"//pkg/kubelet/util/sliceutils:all-srcs",
|
||||||
"//pkg/kubelet/util/store:all-srcs",
|
"//pkg/kubelet/util/store:all-srcs",
|
||||||
|
54
pkg/kubelet/util/manager/BUILD
Normal file
54
pkg/kubelet/util/manager/BUILD
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = [
|
||||||
|
"cache_based_manager.go",
|
||||||
|
"manager.go",
|
||||||
|
],
|
||||||
|
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/kubelet/util:go_default_library",
|
||||||
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["cache_based_manager_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
272
pkg/kubelet/util/manager/cache_based_manager.go
Normal file
272
pkg/kubelet/util/manager/cache_based_manager.go
Normal file
@ -0,0 +1,272 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetObjectTTLFunc defines a function to get value of TTL.
|
||||||
|
type GetObjectTTLFunc func() (time.Duration, bool)
|
||||||
|
|
||||||
|
// GetObjectFunc defines a function to get object with a given namespace and name.
|
||||||
|
type GetObjectFunc func(string, string, metav1.GetOptions) (runtime.Object, error)
|
||||||
|
|
||||||
|
type objectKey struct {
|
||||||
|
namespace string
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
// objectStoreItems is a single item stored in objectStore.
|
||||||
|
type objectStoreItem struct {
|
||||||
|
refCount int
|
||||||
|
data *objectData
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectData struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
object runtime.Object
|
||||||
|
err error
|
||||||
|
lastUpdateTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// objectStore is a local cache of objects.
|
||||||
|
type objectStore struct {
|
||||||
|
getObject GetObjectFunc
|
||||||
|
clock clock.Clock
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
items map[objectKey]*objectStoreItem
|
||||||
|
|
||||||
|
defaultTTL time.Duration
|
||||||
|
getTTL GetObjectTTLFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewObjectStore returns a new ttl-based instance of Store interface.
|
||||||
|
func NewObjectStore(getObject GetObjectFunc, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) Store {
|
||||||
|
return &objectStore{
|
||||||
|
getObject: getObject,
|
||||||
|
clock: clock,
|
||||||
|
items: make(map[objectKey]*objectStoreItem),
|
||||||
|
defaultTTL: ttl,
|
||||||
|
getTTL: getTTL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isObjectOlder(newObject, oldObject runtime.Object) bool {
|
||||||
|
if newObject == nil || oldObject == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newObject)
|
||||||
|
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldObject)
|
||||||
|
return newVersion < oldVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectStore) AddReference(namespace, name string) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
// AddReference is called from RegisterPod, thus it needs to be efficient.
|
||||||
|
// Thus Add() is only increasing refCount and generation of a given object.
|
||||||
|
// Then Get() is responsible for fetching if needed.
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
item, exists := s.items[key]
|
||||||
|
if !exists {
|
||||||
|
item = &objectStoreItem{
|
||||||
|
refCount: 0,
|
||||||
|
data: &objectData{},
|
||||||
|
}
|
||||||
|
s.items[key] = item
|
||||||
|
}
|
||||||
|
|
||||||
|
item.refCount++
|
||||||
|
// This will trigger fetch on the next Get() operation.
|
||||||
|
item.data = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectStore) DeleteReference(namespace, name string) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if item, ok := s.items[key]; ok {
|
||||||
|
item.refCount--
|
||||||
|
if item.refCount == 0 {
|
||||||
|
delete(s.items, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObjectTTLFromNodeFunc returns a function that returns TTL value
|
||||||
|
// from a given Node object.
|
||||||
|
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||||
|
return func() (time.Duration, bool) {
|
||||||
|
node, err := getNode()
|
||||||
|
if err != nil {
|
||||||
|
return time.Duration(0), false
|
||||||
|
}
|
||||||
|
if node != nil && node.Annotations != nil {
|
||||||
|
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
||||||
|
if intValue, err := strconv.Atoi(value); err == nil {
|
||||||
|
return time.Duration(intValue) * time.Second, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return time.Duration(0), false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectStore) isObjectFresh(data *objectData) bool {
|
||||||
|
objectTTL := s.defaultTTL
|
||||||
|
if ttl, ok := s.getTTL(); ok {
|
||||||
|
objectTTL = ttl
|
||||||
|
}
|
||||||
|
return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
data := func() *objectData {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
item, exists := s.items[key]
|
||||||
|
if !exists {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if item.data == nil {
|
||||||
|
item.data = &objectData{}
|
||||||
|
}
|
||||||
|
return item.data
|
||||||
|
}()
|
||||||
|
if data == nil {
|
||||||
|
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After updating data in objectStore, lock the data, fetch object if
|
||||||
|
// needed and return data.
|
||||||
|
data.Lock()
|
||||||
|
defer data.Unlock()
|
||||||
|
if data.err != nil || !s.isObjectFresh(data) {
|
||||||
|
opts := metav1.GetOptions{}
|
||||||
|
if data.object != nil && data.err == nil {
|
||||||
|
// This is just a periodic refresh of an object we successfully fetched previously.
|
||||||
|
// In this case, server data from apiserver cache to reduce the load on both
|
||||||
|
// etcd and apiserver (the cache is eventually consistent).
|
||||||
|
util.FromApiserverCache(&opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
object, err := s.getObject(namespace, name, opts)
|
||||||
|
if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil {
|
||||||
|
// Couldn't fetch the latest object, but there is no cached data to return.
|
||||||
|
// Return the fetch result instead.
|
||||||
|
return object, err
|
||||||
|
}
|
||||||
|
if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) {
|
||||||
|
// If the fetch succeeded with a newer version of the object, or if the
|
||||||
|
// object could not be found in the apiserver, update the cached data to
|
||||||
|
// reflect the current status.
|
||||||
|
data.object = object
|
||||||
|
data.err = err
|
||||||
|
data.lastUpdateTime = s.clock.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return data.object, data.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheBasedManager keeps a store with objects necessary
|
||||||
|
// for registered pods. Different implementations of the store
|
||||||
|
// may result in different semantics for freshness of objects
|
||||||
|
// (e.g. ttl-based implementation vs watch-based implementation).
|
||||||
|
type cacheBasedManager struct {
|
||||||
|
objectStore Store
|
||||||
|
getReferencedObjects func(*v1.Pod) sets.String
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
registeredPods map[objectKey]*v1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
|
||||||
|
return c.objectStore.Get(namespace, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
|
||||||
|
names := c.getReferencedObjects(pod)
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
for name := range names {
|
||||||
|
c.objectStore.AddReference(pod.Namespace, name)
|
||||||
|
}
|
||||||
|
var prev *v1.Pod
|
||||||
|
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||||
|
prev = c.registeredPods[key]
|
||||||
|
c.registeredPods[key] = pod
|
||||||
|
if prev != nil {
|
||||||
|
for name := range c.getReferencedObjects(prev) {
|
||||||
|
// On an update, the .Add() call above will have re-incremented the
|
||||||
|
// ref count of any existing object, so any objects that are in both
|
||||||
|
// names and prev need to have their ref counts decremented. Any that
|
||||||
|
// are only in prev need to be completely removed. This unconditional
|
||||||
|
// call takes care of both cases.
|
||||||
|
c.objectStore.DeleteReference(prev.Namespace, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
|
||||||
|
var prev *v1.Pod
|
||||||
|
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
prev = c.registeredPods[key]
|
||||||
|
delete(c.registeredPods, key)
|
||||||
|
if prev != nil {
|
||||||
|
for name := range c.getReferencedObjects(prev) {
|
||||||
|
c.objectStore.DeleteReference(prev.Namespace, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCacheBasedManager creates a manager that keeps a cache of all objects
|
||||||
|
// necessary for registered pods.
|
||||||
|
// It implements the following logic:
|
||||||
|
// - whenever a pod is created or updated, the cached versions of all objects
|
||||||
|
// is is referencing are invalidated
|
||||||
|
// - every GetObject() call tries to fetch the value from local cache; if it is
|
||||||
|
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||||
|
// value in cache; otherwise it is just fetched from cache
|
||||||
|
func NewCacheBasedManager(objectStore Store, getReferencedObjects func(*v1.Pod) sets.String) Manager {
|
||||||
|
return &cacheBasedManager{
|
||||||
|
objectStore: objectStore,
|
||||||
|
getReferencedObjects: getReferencedObjects,
|
||||||
|
registeredPods: make(map[objectKey]*v1.Pod),
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright 2016 The Kubernetes Authors.
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package secret
|
package manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -25,23 +25,27 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func checkSecret(t *testing.T, store *secretStore, ns, name string, shouldExist bool) {
|
func checkObject(t *testing.T, store *objectStore, ns, name string, shouldExist bool) {
|
||||||
_, err := store.Get(ns, name)
|
_, err := store.Get(ns, name)
|
||||||
if shouldExist && err != nil {
|
if shouldExist && err != nil {
|
||||||
t.Errorf("unexpected actions: %#v", err)
|
t.Errorf("unexpected actions: %#v", err)
|
||||||
}
|
}
|
||||||
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("secret %q/%q not registered", ns, name))) {
|
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) {
|
||||||
t.Errorf("unexpected actions: %#v", err)
|
t.Errorf("unexpected actions: %#v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -50,6 +54,35 @@ func noObjectTTL() (time.Duration, bool) {
|
|||||||
return time.Duration(0), false
|
return time.Duration(0), false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSecret(fakeClient clientset.Interface) GetObjectFunc {
|
||||||
|
return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
|
||||||
|
return fakeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSecretStore(fakeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *objectStore {
|
||||||
|
return &objectStore{
|
||||||
|
getObject: getSecret(fakeClient),
|
||||||
|
clock: clock,
|
||||||
|
items: make(map[objectKey]*objectStoreItem),
|
||||||
|
defaultTTL: ttl,
|
||||||
|
getTTL: getTTL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSecretNames(pod *v1.Pod) sets.String {
|
||||||
|
result := sets.NewString()
|
||||||
|
podutil.VisitPodSecretNames(pod, func(name string) bool {
|
||||||
|
result.Insert(name)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCacheBasedSecretManager(store Store) Manager {
|
||||||
|
return NewCacheBasedManager(store, getSecretNames)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSecretStore(t *testing.T) {
|
func TestSecretStore(t *testing.T) {
|
||||||
fakeClient := &fake.Clientset{}
|
fakeClient := &fake.Clientset{}
|
||||||
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||||
@ -78,10 +111,10 @@ func TestSecretStore(t *testing.T) {
|
|||||||
assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a)
|
assert.True(t, a.Matches("get", "secrets"), "unexpected actions: %#v", a)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkSecret(t, store, "ns1", "name1", true)
|
checkObject(t, store, "ns1", "name1", true)
|
||||||
checkSecret(t, store, "ns2", "name2", false)
|
checkObject(t, store, "ns2", "name2", false)
|
||||||
checkSecret(t, store, "ns3", "name3", true)
|
checkObject(t, store, "ns3", "name3", true)
|
||||||
checkSecret(t, store, "ns4", "name4", false)
|
checkObject(t, store, "ns4", "name4", false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSecretStoreDeletingSecret(t *testing.T) {
|
func TestSecretStoreDeletingSecret(t *testing.T) {
|
||||||
@ -481,10 +514,10 @@ func TestCacheRefcounts(t *testing.T) {
|
|||||||
assert.Equal(t, 1, refs("ns1", "s70"))
|
assert.Equal(t, 1, refs("ns1", "s70"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCachingSecretManager(t *testing.T) {
|
func TestCacheBasedSecretManager(t *testing.T) {
|
||||||
fakeClient := &fake.Clientset{}
|
fakeClient := &fake.Clientset{}
|
||||||
secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||||
manager := newCacheBasedSecretManager(secretStore)
|
manager := newCacheBasedSecretManager(store)
|
||||||
|
|
||||||
// Create a pod with some secrets.
|
// Create a pod with some secrets.
|
||||||
s1 := secretsToAttach{
|
s1 := secretsToAttach{
|
||||||
@ -524,7 +557,7 @@ func TestCachingSecretManager(t *testing.T) {
|
|||||||
for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} {
|
for _, secret := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} {
|
||||||
shouldExist :=
|
shouldExist :=
|
||||||
(secret == "s1" || secret == "s3" || secret == "s4" || secret == "s40") && (ns == "ns1" || ns == "ns2")
|
(secret == "s1" || secret == "s3" || secret == "s4" || secret == "s40") && (ns == "ns1" || ns == "ns2")
|
||||||
checkSecret(t, secretStore, ns, secret, shouldExist)
|
checkObject(t, store, ns, secret, shouldExist)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
56
pkg/kubelet/util/manager/manager.go
Normal file
56
pkg/kubelet/util/manager/manager.go
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager is the interface for registering and unregistering
|
||||||
|
// objects referenced by pods in the underlying cache and
|
||||||
|
// extracting those from that cache if needed.
|
||||||
|
type Manager interface {
|
||||||
|
// Get object by its namespace and name.
|
||||||
|
GetObject(namespace, name string) (runtime.Object, error)
|
||||||
|
|
||||||
|
// WARNING: Register/UnregisterPod functions should be efficient,
|
||||||
|
// i.e. should not block on network operations.
|
||||||
|
|
||||||
|
// RegisterPod registers all objects referenced from a given pod.
|
||||||
|
RegisterPod(pod *v1.Pod)
|
||||||
|
|
||||||
|
// UnregisterPod unregisters objects referenced from a given pod that are not
|
||||||
|
// used by any other registered pod.
|
||||||
|
UnregisterPod(pod *v1.Pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store is the interface for a object cache that
|
||||||
|
// can be used by cacheBasedManager.
|
||||||
|
type Store interface {
|
||||||
|
// AddReference adds a reference to the object to the store.
|
||||||
|
// Note that multiple additions to the store has to be allowed
|
||||||
|
// in the implementations and effectively treated as refcounted.
|
||||||
|
AddReference(namespace, name string)
|
||||||
|
// DeleteReference deletes reference to the object from the store.
|
||||||
|
// Note that object should be deleted only when there was a
|
||||||
|
// corresponding Delete call for each of Add calls (effectively
|
||||||
|
// when refcount was reduced to zero).
|
||||||
|
DeleteReference(namespace, name string)
|
||||||
|
// Get an object from a store.
|
||||||
|
Get(namespace, name string) (runtime.Object, error)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user