remove residual code associated with DynamicKubeletConfig

This commit is contained in:
SataQiu 2023-03-07 22:57:08 +08:00
parent 1af56548af
commit 5a5ca8aa0c
10 changed files with 0 additions and 1905 deletions

View File

@ -1,59 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
)
// configMapPayload implements Payload, backed by a v1/ConfigMap config source object
type configMapPayload struct {
cm *apiv1.ConfigMap
}
var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil")
} else if cm.ObjectMeta.UID == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty UID")
} else if cm.ObjectMeta.ResourceVersion == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion")
}
return &configMapPayload{cm}, nil
}
func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
func (p *configMapPayload) ResourceVersion() string {
return p.cm.ResourceVersion
}
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}
func (p *configMapPayload) object() interface{} {
return p.cm
}

View File

@ -1,148 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewConfigMapPayload(t *testing.T) {
cases := []struct {
desc string
cm *apiv1.ConfigMap
err string
}{
{
desc: "nil",
cm: nil,
err: "ConfigMap must be non-nil",
},
{
desc: "missing uid",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
ResourceVersion: "rv",
},
},
err: "ConfigMap must have a non-empty UID",
},
{
desc: "missing resourceVersion",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
},
},
err: "ConfigMap must have a non-empty ResourceVersion",
},
{
desc: "populated v1/ConfigMap",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
ResourceVersion: "rv",
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
err: "",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(c.cm)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.cm, payload.object()) {
t.Errorf("expect %s but got %s", spew.Sdump(c.cm), spew.Sdump(payload))
}
})
}
}
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
uid := payload.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestConfigMapPayloadResourceVersion(t *testing.T) {
const expect = "rv"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
resourceVersion := payload.ResourceVersion()
if expect != resourceVersion {
t.Errorf("expect %q, but got %q", expect, resourceVersion)
}
}
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
data map[string]string
expect map[string]string
}{
{"nil", nil, nil},
{"empty", map[string]string{}, map[string]string{}},
{"populated",
map[string]string{
"foo": "1",
"bar": "2",
},
map[string]string{
"foo": "1",
"bar": "2",
}},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
files := payload.Files()
if !reflect.DeepEqual(c.expect, files) {
t.Errorf("expected %v, but got %v", c.expect, files)
}
})
}
}

View File

@ -1,262 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"context"
"fmt"
"k8s.io/klog/v2"
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
)
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
// The return value is guaranteed non-empty.
UID() string
// ResourceVersion returns a resource version for the payload.
// The return value is guaranteed non-empty.
ResourceVersion() string
// Files returns a map of filenames to file contents.
Files() map[string]string
// object returns the underlying checkpointed object.
object() interface{}
}
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
KubeletFilename() string
// APIPath returns the API path to the remote resource.
APIPath() string
// UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
UID() string
// ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
ResourceVersion() string
// Download downloads the remote config source's target object and returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails.
// Download takes an optional store as an argument. If provided, Download will check this store for the
// target object prior to contacting the API server.
// Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
Download(client clientset.Interface, store cache.Store) (Payload, string, error)
// Informer returns an informer that can be used to detect changes to the remote config source
Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
// All RemoteConfigSources are expected to be backed by a NodeConfigSource,
// though the convenience methods on the interface will target the source
// type that was detected in a call to NewRemoteConfigSource.
NodeConfigSource() *apiv1.NodeConfigSource
}
// NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object
// You should only call this with a non-nil config source.
// Note that the API server validates Node.Spec.ConfigSource.
func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, string, error) {
// NOTE: Even though the API server validates the config, we check whether all *known* fields are
// nil here, so that if a new API server allows a new config source type, old clients can send
// an error message rather than crashing due to a nil pointer dereference.
// Exactly one reference subfield of the config source must be non-nil.
// Currently ConfigMap is the only reference subfield.
if source.ConfigMap == nil {
return nil, status.AllNilSubfieldsError, fmt.Errorf("%s, NodeConfigSource was: %#v", status.AllNilSubfieldsError, source)
}
return &remoteConfigMap{source}, "", nil
}
// DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
// e.g. the metadata stored by checkpoint/store/fsstore.go
func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// Decode the remote config source. We want this to be non-strict
// so we don't error out on newer API keys.
_, codecs, err := scheme.NewSchemeAndCodecs(serializer.DisableStrict)
if err != nil {
return nil, err
}
obj, err := runtime.Decode(codecs.UniversalDecoder(), data)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
// for now we assume we are trying to load an kubeletconfigv1beta1.SerializedNodeConfigSource,
// this may need to be extended if e.g. a new version of the api is born
cs, ok := obj.(*kubeletconfiginternal.SerializedNodeConfigSource)
if !ok {
return nil, fmt.Errorf("failed to cast decoded remote config source to *k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig.SerializedNodeConfigSource")
}
// we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
source, _, err := NewRemoteConfigSource(&cs.Source)
if err != nil {
return nil, err
}
return source, nil
}
// EqualRemoteConfigSources is a helper for comparing remote config sources by
// comparing the underlying API objects for semantic equality.
func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.NodeConfigSource(), b.NodeConfigSource())
}
return a == b
}
// remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
type remoteConfigMap struct {
source *apiv1.NodeConfigSource
}
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) KubeletFilename() string {
return r.source.ConfigMap.KubeletConfigKey
}
const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
func (r *remoteConfigMap) APIPath() string {
ref := r.source.ConfigMap
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
func (r *remoteConfigMap) ResourceVersion() string {
return r.source.ConfigMap.ResourceVersion
}
func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
var (
cm *apiv1.ConfigMap
err error
)
// check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
if store != nil {
klog.InfoS("Kubelet config controller checking in-memory store for remoteConfigMap", "apiPath", r.APIPath())
cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
if err != nil {
// just log the error, we'll attempt a direct download instead
klog.ErrorS(err, "Kubelet config controller failed to check in-memory store for remoteConfigMap", "apiPath", r.APIPath())
} else if cm != nil {
klog.InfoS("Kubelet config controller found remoteConfigMap in in-memory store", "apiPath", r.APIPath(), "configMapUID", cm.UID, "resourceVersion", cm.ResourceVersion)
} else {
klog.InfoS("Kubelet config controller did not find remoteConfigMap in in-memory store", "apiPath", r.APIPath())
}
}
// if we didn't find the ConfigMap in the in-memory store, download it from the API server
if cm == nil {
klog.InfoS("Kubelet config controller attempting to download remoteConfigMap", "apiPath", r.APIPath())
cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(context.TODO(), r.source.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
}
klog.InfoS("Kubelet config controller successfully downloaded remoteConfigMap", "apiPath", r.APIPath(), "configMapUID", cm.UID, "resourceVersion", cm.ResourceVersion)
} // Assert: Now we have a non-nil ConfigMap
// construct Payload from the ConfigMap
payload, err := NewConfigMapPayload(cm)
if err != nil {
// We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
// never happen on objects in the informer's store, or objects downloaded from the API server
// directly, so we report InternalError.
return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// update internal UID and ResourceVersion based on latest ConfigMap
r.source.ConfigMap.UID = cm.UID
r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
return payload, "", nil
}
func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
// select ConfigMap by name
fieldSelector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", r.source.ConfigMap.Namespace, fieldSelector)
informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
informer.AddEventHandler(handler)
return informer
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
encoder, err := utilcodec.NewKubeletconfigYAMLEncoder(kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return nil, err
}
data, err := runtime.Encode(encoder, &kubeletconfigv1beta1.SerializedNodeConfigSource{Source: *r.source})
if err != nil {
return nil, err
}
return data, nil
}
func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
return r.source.DeepCopy()
}
func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
obj, ok, err := store.GetByKey(key)
if err != nil || !ok {
return nil, err
}
cm, ok := obj.(*apiv1.ConfigMap)
if !ok {
err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
klog.ErrorS(err, "Kubelet config controller")
return nil, err
}
return cm, nil
}

View File

@ -1,242 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewRemoteConfigSource(t *testing.T) {
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect RemoteConfigSource
err string
}{
{
desc: "all NodeConfigSource subfields nil",
source: &apiv1.NodeConfigSource{},
expect: nil,
err: "exactly one subfield must be non-nil",
},
{
desc: "ConfigMap: valid reference",
source: &apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
expect: &remoteConfigMap{&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}}},
err: "",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
source, _, err := NewRemoteConfigSource(c.source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.NodeConfigSource(), source.NodeConfigSource()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestRemoteConfigMapUID(t *testing.T) {
const expect = "uid"
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: expect,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
uid := source.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestRemoteConfigMapAPIPath(t *testing.T) {
const (
name = "name"
namespace = "namespace"
)
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: name,
Namespace: namespace,
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
expect := fmt.Sprintf(configMapAPIPathFmt, namespace, name)
path := source.APIPath()
if expect != path {
t.Errorf("expect %q, but got %q", expect, path)
}
}
func TestRemoteConfigMapDownload(t *testing.T) {
cm := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
}}
source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
KubeletConfigKey: "kubelet",
}}
expectPayload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
if err := hasStore.Add(cm); err != nil {
t.Fatalf("unexpected error constructing hasStore")
}
missingClient := fakeclient.NewSimpleClientset()
hasClient := fakeclient.NewSimpleClientset(cm)
cases := []struct {
desc string
client clientset.Interface
store cache.Store
err string
}{
{
desc: "nil store, object does not exist in API server",
client: missingClient,
err: "not found",
},
{
desc: "nil store, object exists in API server",
client: hasClient,
},
{
desc: "object exists in store and API server",
store: hasStore,
client: hasClient,
},
{
desc: "object exists in store, but does not exist in API server",
store: hasStore,
client: missingClient,
},
{
desc: "object does not exist in store, but exists in API server",
store: missingStore,
client: hasClient,
},
{
desc: "object does not exist in store or API server",
client: missingClient,
store: missingStore,
err: "not found",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// deep copy so we can always check the UID/ResourceVersion are set after Download
s, _, err := NewRemoteConfigSource(source.DeepCopy())
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
// attempt download
p, _, err := s.Download(c.client, c.store)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) {
t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p))
}
// source UID and ResourceVersion should be updated by Download
if p.UID() != s.UID() {
t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID())
}
if p.ResourceVersion() != s.ResourceVersion() {
t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion())
}
})
}
}
func TestEqualRemoteConfigSources(t *testing.T) {
cases := []struct {
desc string
a RemoteConfigSource
b RemoteConfigSource
expect bool
}{
{"both nil", nil, nil, true},
{"a nil", nil, &remoteConfigMap{}, false},
{"b nil", &remoteConfigMap{}, nil, false},
{"neither nil, equal", &remoteConfigMap{}, &remoteConfigMap{}, true},
{
desc: "neither nil, not equal",
a: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{Name: "a"}}},
b: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{KubeletConfigKey: "kubelet"}}},
expect: false,
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
if EqualRemoteConfigSources(c.a, c.b) != c.expect {
t.Errorf("expected EqualRemoteConfigSources to return %t, but got %t", c.expect, !c.expect)
}
})
}
}

View File

@ -1,75 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"time"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// so far only implements Assigned(), LastKnownGood(), SetAssigned(), and SetLastKnownGood()
type fakeStore struct {
assigned checkpoint.RemoteConfigSource
lastKnownGood checkpoint.RemoteConfigSource
}
var _ Store = (*fakeStore)(nil)
func (s *fakeStore) Initialize() error {
return fmt.Errorf("Initialize method not supported")
}
func (s *fakeStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
return false, fmt.Errorf("Exists method not supported")
}
func (s *fakeStore) Save(c checkpoint.Payload) error {
return fmt.Errorf("Save method not supported")
}
func (s *fakeStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("Load method not supported")
}
func (s *fakeStore) AssignedModified() (time.Time, error) {
return time.Time{}, fmt.Errorf("AssignedModified method not supported")
}
func (s *fakeStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return s.assigned, nil
}
func (s *fakeStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return s.lastKnownGood, nil
}
func (s *fakeStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
s.assigned = source
return nil
}
func (s *fakeStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
s.lastKnownGood = source
return nil
}
func (s *fakeStore) Reset() (bool, error) {
return false, fmt.Errorf("Reset method not supported")
}

View File

@ -1,194 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"k8s.io/klog/v2"
"path/filepath"
"time"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
metaDir = "meta"
assignedFile = "assigned"
lastKnownGoodFile = "last-known-good"
checkpointsDir = "checkpoints"
)
// fsStore is for tracking checkpoints in the local filesystem, implements Store
type fsStore struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// dir is the absolute path to the storage directory for fsStore
dir string
}
var _ Store = (*fsStore)(nil)
// NewFsStore returns a Store that saves its data in dir
func NewFsStore(fs utilfs.Filesystem, dir string) Store {
return &fsStore{
fs: fs,
dir: dir,
}
}
func (s *fsStore) Initialize() error {
klog.InfoS("Kubelet config controller initializing config checkpoints directory", "path", s.dir)
// ensure top-level dir for store
if err := utilfiles.EnsureDir(s.fs, s.dir); err != nil {
return err
}
// ensure metadata directory and reference files (tracks assigned and lkg configs)
if err := utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, metaDir)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(assignedFile)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(lastKnownGoodFile)); err != nil {
return err
}
// ensure checkpoints directory (saves unpacked payloads in subdirectories named after payload UID)
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v"
if len(source.UID()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous")
}
if len(source.ResourceVersion()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous")
}
// we check whether the directory was created for the resource
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion()))
if err != nil {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err)
}
return ok, nil
}
func (s *fsStore) Save(payload checkpoint.Payload) error {
// Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty
path := s.checkpointPath(payload.UID(), payload.ResourceVersion())
// ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replace
// to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap
if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil {
return err
}
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, path, payload.Files())
}
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, err
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// load the kubelet config file
klog.InfoS("Kubelet config controller loading Kubelet configuration checkpoint for source", "apiPath", source.APIPath(), "sourceUID", source.UID(), "resourceVersion", source.ResourceVersion())
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename()))
if err != nil {
return nil, err
}
kc, err := loader.Load()
if err != nil {
return nil, err
}
return kc, nil
}
func (s *fsStore) AssignedModified() (time.Time, error) {
path := s.metaPath(assignedFile)
info, err := s.fs.Stat(path)
if err != nil {
return time.Time{}, fmt.Errorf("failed to stat %q while checking modification time, error: %v", path, err)
}
return info.ModTime(), nil
}
func (s *fsStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return readRemoteConfigSource(s.fs, s.metaPath(assignedFile))
}
func (s *fsStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return readRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile))
}
func (s *fsStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
return writeRemoteConfigSource(s.fs, s.metaPath(assignedFile), source)
}
func (s *fsStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
return writeRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile), source)
}
func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
func (s *fsStore) checkpointPath(uid, resourceVersion string) string {
return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion)
}
func (s *fsStore) metaPath(name string) string {
return filepath.Join(s.dir, metaDir, name)
}
func readRemoteConfigSource(fs utilfs.Filesystem, path string) (checkpoint.RemoteConfigSource, error) {
data, err := fs.ReadFile(path)
if err != nil {
return nil, err
} else if len(data) == 0 {
return nil, nil
}
return checkpoint.DecodeRemoteConfigSource(data)
}
func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoint.RemoteConfigSource) error {
// if nil, reset the file
if source == nil {
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// check that UID and ResourceVersion are non-empty,
// error to save reference if the checkpoint can't be fully resolved
if source.UID() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous")
}
if source.ResourceVersion() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous")
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {
return err
}
return utilfiles.ReplaceFile(fs, path, data)
}

View File

@ -1,727 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubelet/config/v1beta1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
func newInitializedFakeFsStore() (*fsStore, error) {
// Test with the default filesystem, the fake filesystem has an issue caused by afero: https://github.com/spf13/afero/issues/141
// The default filesystem also behaves more like production, so we should probably not mock the filesystem for unit tests.
fs := &utilfs.DefaultFs{}
tmpDir, err := fs.TempDir("", "fsstore-test-")
if err != nil {
return nil, err
}
store := NewFsStore(fs, tmpDir)
if err := store.Initialize(); err != nil {
return nil, err
}
return store.(*fsStore), nil
}
func cleanupFakeFsStore(store *fsStore) {
_ = store.fs.RemoveAll(store.dir)
}
func TestFsStoreInitialize(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("fsStore.Initialize() failed with error: %v", err)
}
defer cleanupFakeFsStore(store)
// check that store.dir exists
if _, err := store.fs.Stat(store.dir); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.dir, err)
}
// check that meta dir exists
if _, err := store.fs.Stat(store.metaPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(""), err)
}
// check that checkpoints dir exists
if _, err := store.fs.Stat(filepath.Join(store.dir, checkpointsDir)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", filepath.Join(store.dir, checkpointsDir), err)
}
// check that assignedFile exists
if _, err := store.fs.Stat(store.metaPath(assignedFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(assignedFile), err)
}
// check that lastKnownGoodFile exists
if _, err := store.fs.Stat(store.metaPath(lastKnownGoodFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(lastKnownGoodFile), err)
}
}
func TestFsStoreExists(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
// checkpoint a payload
const (
uid = "uid"
resourceVersion = "1"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid, ResourceVersion: resourceVersion}})
if err != nil {
t.Fatalf("could not construct Payload, error: %v", err)
}
if err := store.Save(p); err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
uid types.UID
resourceVersion string
expect bool
err string
}{
{"exists", uid, resourceVersion, true, ""},
{"does not exist", "bogus-uid", "bogus-resourceVersion", false, ""},
{"ambiguous UID", "", "bogus-resourceVersion", false, "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", false, "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ok, err := store.Exists(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if c.expect != ok {
t.Errorf("expect %t but got %t", c.expect, ok)
}
})
}
}
func TestFsStoreSave(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
nameTooLong := func() string {
s := ""
for i := 0; i < 256; i++ {
s += "a"
}
return s
}()
const (
uid = "uid"
resourceVersion = "1"
)
cases := []struct {
desc string
uid types.UID
resourceVersion string
files map[string]string
err string
}{
{"valid payload", uid, resourceVersion, map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", uid, resourceVersion, map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", uid, resourceVersion, map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", uid, resourceVersion, map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", uid, resourceVersion, map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", uid, resourceVersion, map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", uid, resourceVersion, map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// construct the payload
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: c.uid, ResourceVersion: c.resourceVersion},
Data: c.files,
})
// if no error, save the payload, otherwise skip straight to error handler
if err == nil {
err = store.Save(p)
}
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// read the saved checkpoint
m, err := mapFromCheckpoint(store, p.UID(), p.ResourceVersion())
if err != nil {
t.Fatalf("error loading checkpoint to map: %v", err)
}
// compare our expectation to what got saved
expect := p.Files()
if !reflect.DeepEqual(expect, m) {
t.Errorf("expect %v, but got %v", expect, m)
}
})
}
}
func TestFsStoreLoad(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
// encode a kubelet configuration that has all defaults set
expect, err := newKubeletConfiguration()
if err != nil {
t.Fatalf("error constructing KubeletConfiguration: %v", err)
}
data, err := utilcodec.EncodeKubeletConfig(expect, v1beta1.SchemeGroupVersion)
if err != nil {
t.Fatalf("error encoding KubeletConfiguration: %v", err)
}
// construct a payload that contains the kubeletconfig
const (
uid = "uid"
resourceVersion = "1"
kubeletKey = "kubelet"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), ResourceVersion: resourceVersion},
Data: map[string]string{
kubeletKey: string(data),
},
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
err = store.Save(p)
if err != nil {
t.Fatalf("error saving payload: %v", err)
}
cases := []struct {
desc string
uid types.UID
resourceVersion string
err string
skipOnWindows bool
}{
{"checkpoint exists", uid, resourceVersion, "", true},
{"checkpoint does not exist", "bogus-uid", "bogus-resourceVersion", "no checkpoint for source", false},
{"ambiguous UID", "", "bogus-resourceVersion", "empty UID is ambiguous", false},
{"ambiguous ResourceVersion", "bogus-uid", "", "empty ResourceVersion is ambiguous", false},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
if c.skipOnWindows && runtime.GOOS == "windows" {
t.Skip("Skipping test that fails on Windows")
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: kubeletKey,
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
loaded, err := store.Load(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !reflect.DeepEqual(expect, loaded) {
t.Errorf("expect %#v, but got %#v", expect, loaded)
}
})
}
}
func TestFsStoreAssignedModified(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
// create an empty assigned file, this is good enough for testing
saveTestSourceFile(t, store, assignedFile, nil)
// round the current time to the nearest second because some file systems do not support sub-second precision.
now := time.Now().Round(time.Second)
// set the timestamps to the current time, so we can compare to result of store.AssignedModified
err = store.fs.Chtimes(store.metaPath(assignedFile), now, now)
if err != nil {
t.Fatalf("could not change timestamps, error: %v", err)
}
modTime, err := store.AssignedModified()
if err != nil {
t.Fatalf("unable to determine modification time of assigned config source, error: %v", err)
}
if !now.Equal(modTime) {
t.Errorf("expect %q but got %q", now.Format(time.RFC3339), modTime.Format(time.RFC3339))
}
}
func TestFsStoreAssigned(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, assignedFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.Assigned()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, lastKnownGoodFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.LastKnownGood()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreSetAssigned(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
source = s
}
// save the assigned source
err = store.SetAssigned(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, assignedFile)
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
func TestFsStoreSetLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
source = s
}
// save the assigned source
err = store.SetLastKnownGood(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, lastKnownGoodFile)
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
func TestFsStoreReset(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
defer cleanupFakeFsStore(store)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "other-name",
Namespace: "namespace",
UID: "other-uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
assigned checkpoint.RemoteConfigSource
lastKnownGood checkpoint.RemoteConfigSource
updated bool
}{
{"nil -> nil", nil, nil, false},
{"source -> nil", source, nil, true},
{"nil -> source", nil, source, false},
{"source -> source", source, source, true},
{"source -> otherSource", source, otherSource, true},
{"otherSource -> source", otherSource, source, true},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// manually save the sources to their respective files
saveTestSourceFile(t, store, assignedFile, c.assigned)
saveTestSourceFile(t, store, lastKnownGoodFile, c.lastKnownGood)
// reset
updated, err := store.Reset()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// make sure the files were emptied
if size := testSourceFileSize(t, store, assignedFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, assignedFile, size)
}
if size := testSourceFileSize(t, store, lastKnownGoodFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lastKnownGoodFile, size)
}
// make sure Assigned() and LastKnownGood() both return nil
assigned, err := store.Assigned()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
lastKnownGood, err := store.LastKnownGood()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if assigned != nil || lastKnownGood != nil {
t.Errorf("case %q, expect nil for assigned and last-known-good checkpoints, but still have %q and %q, respectively",
c.desc, assigned, lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated)
}
})
}
}
func mapFromCheckpoint(store *fsStore, uid, resourceVersion string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid, resourceVersion))
if err != nil {
return nil, err
}
m := map[string]string{}
for _, f := range files {
// expect no subdirs, only regular files
if !f.Type().IsRegular() {
return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid)
}
// read the file contents and build the map
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid, resourceVersion), f.Name()))
if err != nil {
return nil, err
}
m[f.Name()] = string(data)
}
return m, nil
}
func readTestSourceFile(t *testing.T, store *fsStore, relPath string) []byte {
data, err := store.fs.ReadFile(store.metaPath(relPath))
if err != nil {
t.Fatalf("unable to read test source file, error: %v", err)
}
return data
}
func saveTestSourceFile(t *testing.T, store *fsStore, relPath string, source checkpoint.RemoteConfigSource) {
if source != nil {
data, err := source.Encode()
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
err = utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), data)
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
} else {
err := utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), []byte{})
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
}
}
func testSourceFileSize(t *testing.T, store *fsStore, relPath string) int64 {
info, err := store.fs.Stat(store.metaPath(relPath))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return info.Size()
}
// newKubeletConfiguration will create a new KubeletConfiguration with default values set
func newKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
s, _, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
versioned := &v1beta1.KubeletConfiguration{}
s.Default(versioned)
config := &kubeletconfig.KubeletConfiguration{}
if err := s.Convert(versioned, config, nil); err != nil {
return nil, err
}
return config, nil
}

View File

@ -1,70 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"time"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// Store saves checkpoints and information about which is the assigned and last-known-good checkpoint to a storage layer
type Store interface {
// Initialize sets up the storage layer
Initialize() error
// Exists returns true if the object referenced by `source` has been checkpointed.
// The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:
// - k8s.io/api/core/v1.ConfigMap
Save(c checkpoint.Payload) error
// Load loads the KubeletConfiguration from the checkpoint referenced by `source`.
Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error)
// AssignedModified returns the last time that the assigned checkpoint was set
AssignedModified() (time.Time, error)
// Assigned returns the source that points to the checkpoint currently assigned to the Kubelet, or nil if no assigned checkpoint is set
Assigned() (checkpoint.RemoteConfigSource, error)
// LastKnownGood returns the source that points to the last-known-good checkpoint, or nil if no last-known-good checkpoint is set
LastKnownGood() (checkpoint.RemoteConfigSource, error)
// SetAssigned saves the source that points to the assigned checkpoint, set to nil to unset
SetAssigned(source checkpoint.RemoteConfigSource) error
// SetLastKnownGood saves the source that points to the last-known-good checkpoint, set to nil to unset
SetLastKnownGood(source checkpoint.RemoteConfigSource) error
// Reset unsets the assigned and last-known-good checkpoints and returns whether the assigned checkpoint was unset as a result of the reset
Reset() (bool, error)
}
// reset is a helper for implementing Reset, which can be implemented in terms of Store methods
func reset(s Store) (bool, error) {
assigned, err := s.Assigned()
if err != nil {
return false, err
}
if err := s.SetLastKnownGood(nil); err != nil {
return false, fmt.Errorf("failed to reset last-known-good UID in checkpoint store, error: %v", err)
}
if err := s.SetAssigned(nil); err != nil {
return false, fmt.Errorf("failed to reset assigned UID in checkpoint store, error: %v", err)
}
return assigned != nil, nil
}

View File

@ -1,71 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
func TestReset(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "other-name",
Namespace: "namespace",
UID: "other-uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
s *fakeStore
updated bool
}{
{&fakeStore{assigned: nil, lastKnownGood: nil}, false},
{&fakeStore{assigned: source, lastKnownGood: nil}, true},
{&fakeStore{assigned: nil, lastKnownGood: source}, false},
{&fakeStore{assigned: source, lastKnownGood: source}, true},
{&fakeStore{assigned: source, lastKnownGood: otherSource}, true},
{&fakeStore{assigned: otherSource, lastKnownGood: source}, true},
}
for _, c := range cases {
updated, err := reset(c.s)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if c.s.assigned != nil || c.s.lastKnownGood != nil {
t.Errorf("case %q, expect nil for assigned and last-known-good checkpoints, but still have %q and %q, respectively",
spew.Sdump(c.s), c.s.assigned, c.s.lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", spew.Sdump(c.s), c.updated, updated)
}
}
}

View File

@ -1,57 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
apiv1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
)
const (
// LoadError indicates that the Kubelet failed to load the config checkpoint
LoadError = "failed to load config, see Kubelet log for details"
// ValidateError indicates that the Kubelet failed to validate the config checkpoint
ValidateError = "failed to validate config, see Kubelet log for details"
// AllNilSubfieldsError is used when no subfields are set
// This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
// DownloadError is used when the download fails, e.g. due to network issues
DownloadError = "failed to download config, see Kubelet log for details"
// InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
InternalError = "internal failure, see Kubelet log for details"
// SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
SyncErrorFmt = "failed to sync: %s"
)
// NodeConfigStatus represents Node.Status.Config
type NodeConfigStatus interface {
// SetActive sets the active source in the status
SetActive(source *apiv1.NodeConfigSource)
// SetAssigned sets the assigned source in the status
SetAssigned(source *apiv1.NodeConfigSource)
// SetLastKnownGood sets the last-known-good source in the status
SetLastKnownGood(source *apiv1.NodeConfigSource)
// SetError sets the error associated with the status
SetError(err string)
// SetErrorOverride sets an error that overrides the base error set by SetError.
// If the override is set to the empty string, the base error is reported in
// the status, otherwise the override is reported.
SetErrorOverride(err string)
// Sync patches the current status into the Node identified by `nodeName` if an update is pending
Sync(client clientset.Interface, nodeName string)
}