Merge pull request #59692 from mtaufen/dkcfg-unpack-configmaps

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

unpack dynamic kubelet config payloads to files

This PR unpacks the downloaded ConfigMap to a set of files on the node.

This enables other config files to ride alongside the
KubeletConfiguration, and the KubeletConfiguration to refer to these
cohabitants with relative paths.

This PR also stops storing dynamic config metadata (e.g. current,
last-known-good config records) in the same directory as config
checkpoints. Instead, it splits the storage into `meta` and
`checkpoints` dirs.

The current store dir structure is as follows:
```
- dir named by --dynamic-config-dir (root for managing dynamic config)
| - meta (dir for metadata, e.g. which config source is currently assigned, last-known-good)
  | - current (a serialized v1 NodeConfigSource object, indicating the assigned config)
  | - last-known-good (a serialized v1 NodeConfigSource object, indicating the last-known-good config)
| - checkpoints (dir for config checkpoints)
  | - uid1 (dir for unpacked config, identified by uid1)
    | - file1
    | - file2
    | - ...
  | - uid2
  | - ...
```

There are some likely changes to the above structure before dynamic config goes beta, such as renaming "current" to "assigned" for clarity, and extending the checkpoint identifier to include a resource version, as part of resolving #61643.

```release-note
NONE
```

/cc @luxas @smarterclayton
This commit is contained in:
Kubernetes Submit Queue 2018-04-24 12:01:37 -07:00 committed by GitHub
commit 44b57338d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1105 additions and 1022 deletions

View File

@ -9,23 +9,16 @@ load(
go_test(
name = "go_default_test",
srcs = [
"checkpoint_test.go",
"configmap_test.go",
"download_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)
@ -33,15 +26,12 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"configmap.go",
"download.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
@ -49,7 +39,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

View File

@ -1,72 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
)
// Checkpoint represents a local copy of a config source (payload) object
type Checkpoint interface {
// UID returns the UID of the config source object behind the Checkpoint
UID() string
// Parse extracts the KubeletConfiguration from the checkpoint, applies defaults, and converts to the internal type
Parse() (*kubeletconfig.KubeletConfiguration, error)
// Encode returns a []byte representation of the config source object behind the Checkpoint
Encode() ([]byte, error)
// object returns the underlying checkpointed object. If you want to compare sources for equality, use EqualCheckpoints,
// which compares the underlying checkpointed objects for semantic API equality.
object() interface{}
}
// DecodeCheckpoint is a helper for using the apimachinery to decode serialized checkpoints
func DecodeCheckpoint(data []byte) (Checkpoint, error) {
// decode the checkpoint
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
// TODO(mtaufen): for now we assume we are trying to load a ConfigMap checkpoint, may need to extend this if we allow other checkpoint types
// convert it to the external ConfigMap type, so we're consistently working with the external type outside of the on-disk representation
cm := &apiv1.ConfigMap{}
err = legacyscheme.Scheme.Convert(obj, cm, nil)
if err != nil {
return nil, fmt.Errorf("failed to convert decoded object into a v1 ConfigMap, error: %v", err)
}
return NewConfigMapCheckpoint(cm)
}
// EqualCheckpoints compares two Checkpoints for equality, if their underlying objects are equal, so are the Checkpoints
func EqualCheckpoints(a, b Checkpoint) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.object(), b.object())
}
if a == nil && b == nil {
return true
}
return false
}

View File

@ -1,89 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestDecodeCheckpoint(t *testing.T) {
// generate correct Checkpoint for v1/ConfigMap test case
cm, err := NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID("uid")}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// generate unsupported object encoding for unsupported type test case
unsupported := newUnsupportedEncoded(t)
// test cases
cases := []struct {
desc string
data []byte
expect Checkpoint // expect a deeply-equal Checkpoint to be returned from Decode
err string // expect error to contain this substring
}{
// v1/ConfigMap
{"v1/ConfigMap", []byte(`{"apiVersion": "v1","kind": "ConfigMap","metadata": {"uid": "uid"}}`), cm, ""},
// malformed
{"malformed", []byte("malformed"), nil, "failed to decode"},
// no UID
{"no UID", []byte(`{"apiVersion": "v1","kind": "ConfigMap"}`), nil, "ConfigMap must have a UID"},
// well-formed, but unsupported type
{"well-formed, but unsupported encoded type", unsupported, nil, "failed to convert"},
}
for _, c := range cases {
cpt, err := DecodeCheckpoint(c.data)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// Unfortunately reflect.DeepEqual treats nil data structures as != empty data structures, so
// we have to settle for semantic equality of the underlying checkpointed API objects.
// If additional fields are added to the object that implements the Checkpoint interface,
// they should be added to a named sub-object to facilitate a DeepEquals comparison
// of the extra fields.
// decoded checkpoint should match expected checkpoint
if !apiequality.Semantic.DeepEqual(cpt.object(), c.expect.object()) {
t.Errorf("case %q, expect checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt))
}
}
}
// newUnsupportedEncoded returns an encoding of an object that does not have a Checkpoint implementation
func newUnsupportedEncoded(t *testing.T) []byte {
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
if err != nil {
t.Fatalf("could not create an encoder, error: %v", err)
}
unsupported := &apiv1.Node{}
data, err := runtime.Encode(encoder, unsupported)
if err != nil {
t.Fatalf("could not encode object, error: %v", err)
}
return data
}

View File

@ -20,75 +20,36 @@ import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
)
const configMapConfigKey = "kubelet"
// configMapCheckpoint implements Checkpoint, backed by a v1/ConfigMap config source object
type configMapCheckpoint struct {
kubeletCodecs *serializer.CodecFactory // codecs for the KubeletConfiguration
configMap *apiv1.ConfigMap
// configMapPayload implements Payload, backed by a v1/ConfigMap config source object
type configMapPayload struct {
cm *apiv1.ConfigMap
}
// NewConfigMapCheckpoint returns a Checkpoint backed by `cm`. `cm` must be non-nil
// and have a non-empty ObjectMeta.UID, or an error will be returned.
func NewConfigMapCheckpoint(cm *apiv1.ConfigMap) (Checkpoint, error) {
var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil to be treated as a Checkpoint")
return nil, fmt.Errorf("ConfigMap must be non-nil to be a Payload")
} else if len(cm.ObjectMeta.UID) == 0 {
return nil, fmt.Errorf("ConfigMap must have a UID to be treated as a Checkpoint")
return nil, fmt.Errorf("ConfigMap must have a UID to be a Payload")
}
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
return &configMapCheckpoint{kubeletCodecs, cm}, nil
return &configMapPayload{cm}, nil
}
// UID returns the UID of a configMapCheckpoint
func (c *configMapCheckpoint) UID() string {
return string(c.configMap.UID)
func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
// Parse extracts the KubeletConfiguration from v1/ConfigMap checkpoints, applies defaults, and converts to the internal type
func (c *configMapCheckpoint) Parse() (*kubeletconfig.KubeletConfiguration, error) {
const emptyCfgErr = "config was empty, but some parameters are required"
if len(c.configMap.Data) == 0 {
return nil, fmt.Errorf(emptyCfgErr)
}
config, ok := c.configMap.Data[configMapConfigKey]
if !ok {
return nil, fmt.Errorf("key %q not found in ConfigMap", configMapConfigKey)
} else if len(config) == 0 {
return nil, fmt.Errorf(emptyCfgErr)
}
return utilcodec.DecodeKubeletConfiguration(c.kubeletCodecs, []byte(config))
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}
// Encode encodes a configMapCheckpoint
func (c *configMapCheckpoint) Encode() ([]byte, error) {
cm := c.configMap
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
if err != nil {
return nil, err
}
data, err := runtime.Encode(encoder, cm)
if err != nil {
return nil, err
}
return data, nil
}
func (c *configMapCheckpoint) object() interface{} {
return c.configMap
func (p *configMapPayload) object() interface{} {
return p.cm
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package checkpoint
import (
"fmt"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
@ -25,14 +25,10 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewConfigMapCheckpoint(t *testing.T) {
func TestNewConfigMapPayload(t *testing.T) {
cases := []struct {
desc string
cm *apiv1.ConfigMap
@ -44,7 +40,7 @@ func TestNewConfigMapCheckpoint(t *testing.T) {
&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: types.UID("uid"),
UID: "uid",
},
Data: map[string]string{
"key1": "value1",
@ -54,184 +50,60 @@ func TestNewConfigMapCheckpoint(t *testing.T) {
}
for _, c := range cases {
cpt, err := NewConfigMapCheckpoint(c.cm)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(cpt.object(), c.cm) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.cm), spew.Sdump(cpt))
}
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(c.cm)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.cm, payload.object()) {
t.Errorf("expect %s but got %s", spew.Sdump(c.cm), spew.Sdump(payload))
}
})
}
}
func TestConfigMapCheckpointUID(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatalf("error constructing payload: %v", err)
}
uid := payload.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"}
for _, uidIn := range cases {
cpt := &configMapCheckpoint{
kubeletCodecs,
&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uidIn)},
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
data map[string]string
expect map[string]string
}{
{"nil", nil, nil},
{"empty", map[string]string{}, map[string]string{}},
{"populated",
map[string]string{
"foo": "1",
"bar": "2",
},
}
// UID method should return the correct value of the UID
uidOut := cpt.UID()
if uidIn != uidOut {
t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut)
}
}
}
func TestConfigMapCheckpointParse(t *testing.T) {
kubeletScheme, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// get the built-in default configuration
external := &kubeletconfigv1beta1.KubeletConfiguration{}
kubeletScheme.Default(external)
defaultConfig := &kubeletconfig.KubeletConfiguration{}
err = kubeletScheme.Convert(external, defaultConfig, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
cm *apiv1.ConfigMap
expect *kubeletconfig.KubeletConfiguration
err string
}{
{"empty data", &apiv1.ConfigMap{}, nil, "config was empty"},
// missing kubelet key
{"missing kubelet key", &apiv1.ConfigMap{Data: map[string]string{
"bogus": "stuff"}}, nil, fmt.Sprintf("key %q not found", configMapConfigKey)},
// invalid format
{"invalid yaml", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": "*"}}, nil, "failed to decode"},
{"invalid json", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": "{*"}}, nil, "failed to decode"},
// invalid object
{"missing kind", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"},
{"missing version", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration"}`}}, nil, "failed to decode"},
{"unregistered kind", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"BogusKind","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"},
{"unregistered version", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration","apiVersion":"bogusversion"}`}}, nil, "failed to decode"},
// empty object with correct kind and version should result in the defaults for that kind and version
{"default from yaml", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1`}}, defaultConfig, ""},
{"default from json", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, defaultConfig, ""},
map[string]string{
"foo": "1",
"bar": "2",
}},
}
for _, c := range cases {
cpt := &configMapCheckpoint{kubeletCodecs, c.cm}
kc, err := cpt.Parse()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// we expect the parsed configuration to match what we described in the ConfigMap
if !apiequality.Semantic.DeepEqual(c.expect, kc) {
t.Errorf("case %q, expect config %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(kc))
}
}
}
func TestConfigMapCheckpointEncode(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// only one case, based on output from the existing encoder, and since
// this is hard to test (key order isn't guaranteed), we should probably
// just stick to this test case and mostly rely on the round-trip test.
cases := []struct {
desc string
cpt *configMapCheckpoint
expect string
}{
// we expect Checkpoints to be encoded as a json representation of the underlying API object
{"one-key",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "one-key"},
Data: map[string]string{"one": ""}}},
`{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"one-key","creationTimestamp":null},"data":{"one":""}}
`},
}
for _, c := range cases {
data, err := c.cpt.Encode()
// we don't expect any errors from encoding
if utiltest.SkipRest(t, c.desc, err, "") {
continue
}
if string(data) != c.expect {
t.Errorf("case %q, expect encoding %q but got %q", c.desc, c.expect, string(data))
}
}
}
func TestConfigMapCheckpointRoundTrip(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
cpt *configMapCheckpoint
decodeErr string
}{
// empty data
{"empty data",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "empty-data-sha256-e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
UID: "uid",
},
Data: map[string]string{}}},
""},
// two keys
{"two keys",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431",
UID: "uid",
},
Data: map[string]string{"one": "", "two": "2"}}},
""},
// missing uid
{"missing uid",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431",
UID: "",
},
Data: map[string]string{"one": "", "two": "2"}}},
"must have a UID"},
}
for _, c := range cases {
// we don't expect any errors from encoding
data, err := c.cpt.Encode()
if utiltest.SkipRest(t, c.desc, err, "") {
continue
}
after, err := DecodeCheckpoint(data)
if utiltest.SkipRest(t, c.desc, err, c.decodeErr) {
continue
}
if !apiequality.Semantic.DeepEqual(c.cpt.object(), after.object()) {
t.Errorf("case %q, expect round-trip result %s but got %s", c.desc, spew.Sdump(c.cpt), spew.Sdump(after))
}
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
files := payload.Files()
if !reflect.DeepEqual(c.expect, files) {
t.Errorf("expected %v, but got %v", c.expect, files)
}
})
}
}

View File

@ -30,19 +30,32 @@ import (
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
UID() string
// Files returns a map of filenames to file contents.
Files() map[string]string
// object returns the underlying checkpointed object.
object() interface{}
}
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// UID returns the UID of the remote config source object
// UID returns a globally unique identifier of the source described by the remote config source object
UID() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// Download downloads the remote config source object returns a Checkpoint backed by the object,
// Download downloads the remote config source object returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails
Download(client clientset.Interface) (Checkpoint, string, error)
Download(client clientset.Interface) (Payload, string, error)
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// object returns the underlying source object. If you want to compare sources for equality, use EqualRemoteConfigSources,
// object returns the underlying source object.
// If you want to compare sources for equality, use EqualRemoteConfigSources,
// which compares the underlying source objects for semantic API equality.
object() interface{}
}
@ -70,7 +83,7 @@ func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource,
}
// DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
// e.g. the objects stored in the .cur and .lkg files by checkpoint/store/fsstore.go
// e.g. the metadata stored by checkpoint/store/fsstore.go
func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// decode the remote config source
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data)
@ -97,10 +110,7 @@ func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.object(), b.object())
}
if a == nil && b == nil {
return true
}
return false
return a == b
}
// remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
@ -108,6 +118,8 @@ type remoteConfigMap struct {
source *apiv1.NodeConfigSource
}
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMapRef.UID)
}
@ -119,7 +131,7 @@ func (r *remoteConfigMap) APIPath() string {
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, string, error) {
func (r *remoteConfigMap) Download(client clientset.Interface) (Payload, string, error) {
var reason string
uid := string(r.source.ConfigMapRef.UID)
@ -138,18 +150,18 @@ func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, stri
return nil, reason, fmt.Errorf(reason)
}
checkpoint, err := NewConfigMapCheckpoint(cm)
payload, err := NewConfigMapPayload(cm)
if err != nil {
reason = fmt.Sprintf("invalid downloaded object")
return nil, reason, fmt.Errorf("%s, error: %v", reason, err)
}
utillog.Infof("successfully downloaded ConfigMap with UID %q", uid)
return checkpoint, "", nil
return payload, "", nil
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
encoder, err := utilcodec.NewYAMLEncoder(apiv1.GroupName)
if err != nil {
return nil, err
}

View File

@ -25,9 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
fakeclient "k8s.io/client-go/kubernetes/fake"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
@ -68,51 +66,47 @@ func TestNewRemoteConfigSource(t *testing.T) {
}
for _, c := range cases {
src, _, err := NewRemoteConfigSource(c.source)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.object(), src.object()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(src))
}
t.Run(c.desc, func(t *testing.T) {
source, _, err := NewRemoteConfigSource(c.source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.object(), source.object()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestRemoteConfigMapUID(t *testing.T) {
cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"}
for _, uidIn := range cases {
cpt := &remoteConfigMap{
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: types.UID(uidIn)}},
}
// UID method should return the correct value of the UID
uidOut := cpt.UID()
if uidIn != uidOut {
t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut)
}
const expect = "uid"
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: expect}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
uid := source.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestRemoteConfigMapAPIPath(t *testing.T) {
name := "name"
namespace := "namespace"
cpt := &remoteConfigMap{
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: ""}},
const namespace = "namespace"
const name = "name"
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: "uid"}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
expect := fmt.Sprintf(configMapAPIPathFmt, cpt.source.ConfigMapRef.Namespace, cpt.source.ConfigMapRef.Name)
// APIPath() method should return the correct path to the referenced resource
path := cpt.APIPath()
expect := fmt.Sprintf(configMapAPIPathFmt, namespace, name)
path := source.APIPath()
if expect != path {
t.Errorf("expect APIPath() to return %q, but got %q", expect, namespace)
t.Errorf("expect %q, but got %q", expect, path)
}
}
func TestRemoteConfigMapDownload(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cm := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
@ -120,36 +114,76 @@ func TestRemoteConfigMapDownload(t *testing.T) {
UID: "uid",
}}
client := fakeclient.NewSimpleClientset(cm)
payload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
makeSource := func(source *apiv1.NodeConfigSource) RemoteConfigSource {
s, _, err := NewRemoteConfigSource(source)
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
return s
}
cases := []struct {
desc string
source RemoteConfigSource
expect Checkpoint
expect Payload
err string
}{
// object doesn't exist
{"object doesn't exist",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "bogus", Namespace: "namespace", UID: "bogus"}}},
makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "bogus", Namespace: "namespace", UID: "bogus"}}),
nil, "not found"},
// UID of downloaded object doesn't match UID of referent found via namespace/name
{"UID is incorrect for namespace/name",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "bogus"}}},
makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "bogus"}}),
nil, "does not match"},
// successful download
{"object exists and reference is correct",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}},
&configMapCheckpoint{kubeletCodecs, cm}, ""},
makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}),
payload, ""},
}
for _, c := range cases {
cpt, _, err := c.source.Download(client)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// "downloaded" object should match the expected
if !apiequality.Semantic.DeepEqual(c.expect.object(), cpt.object()) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt))
}
t.Run(c.desc, func(t *testing.T) {
payload, _, err := c.source.Download(client)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(c.expect.object(), payload.object()) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(payload))
}
})
}
}
func TestEqualRemoteConfigSources(t *testing.T) {
cases := []struct {
desc string
a RemoteConfigSource
b RemoteConfigSource
expect bool
}{
{"both nil", nil, nil, true},
{"a nil", nil, &remoteConfigMap{}, false},
{"b nil", &remoteConfigMap{}, nil, false},
{"neither nil, equal", &remoteConfigMap{}, &remoteConfigMap{}, true},
{"neither nil, not equal",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "a"}}},
&remoteConfigMap{},
false},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
if EqualRemoteConfigSources(c.a, c.b) != c.expect {
t.Errorf("expected EqualRemoteConfigSources to return %t, but got %t", c.expect, !c.expect)
}
})
}
}

View File

@ -14,7 +14,11 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
@ -34,7 +38,9 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/util/filesystem:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
@ -29,19 +30,21 @@ type fakeStore struct {
lastKnownGood checkpoint.RemoteConfigSource
}
var _ Store = (*fakeStore)(nil)
func (s *fakeStore) Initialize() error {
return fmt.Errorf("Initialize method not supported")
}
func (s *fakeStore) Exists(uid string) (bool, error) {
func (s *fakeStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
return false, fmt.Errorf("Exists method not supported")
}
func (s *fakeStore) Save(c checkpoint.Checkpoint) error {
func (s *fakeStore) Save(c checkpoint.Payload) error {
return fmt.Errorf("Save method not supported")
}
func (s *fakeStore) Load(uid string) (checkpoint.Checkpoint, error) {
func (s *fakeStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("Load method not supported")
}
@ -62,10 +65,6 @@ func (s *fakeStore) SetCurrent(source checkpoint.RemoteConfigSource) error {
return nil
}
func (s *fakeStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) {
return setCurrentUpdated(s, source)
}
func (s *fakeStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
s.lastKnownGood = source
return nil

View File

@ -21,82 +21,99 @@ import (
"path/filepath"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
curFile = ".cur"
lkgFile = ".lkg"
metaDir = "meta"
currentFile = "current"
lastKnownGoodFile = "last-known-good"
checkpointsDir = "checkpoints"
kubeletKey = "kubelet" // TODO(mtaufen): eventually the API will have a way to parameterize the kubelet file name, and then we can remove this
)
// fsStore is for tracking checkpoints in the local filesystem, implements Store
type fsStore struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// checkpointsDir is the absolute path to the storage directory for fsStore
checkpointsDir string
// dir is the absolute path to the storage directory for fsStore
dir string
}
// NewFsStore returns a Store that saves its data in `checkpointsDir`
func NewFsStore(fs utilfs.Filesystem, checkpointsDir string) Store {
var _ Store = (*fsStore)(nil)
// NewFsStore returns a Store that saves its data in dir
func NewFsStore(fs utilfs.Filesystem, dir string) Store {
return &fsStore{
fs: fs,
checkpointsDir: checkpointsDir,
fs: fs,
dir: dir,
}
}
func (s *fsStore) Initialize() error {
utillog.Infof("initializing config checkpoints directory %q", s.checkpointsDir)
if err := utilfiles.EnsureDir(s.fs, s.checkpointsDir); err != nil {
utillog.Infof("initializing config checkpoints directory %q", s.dir)
// ensure top-level dir for store
if err := utilfiles.EnsureDir(s.fs, s.dir); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, curFile)); err != nil {
// ensure metadata directory and reference files (tracks current and lkg configs)
if err := utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, metaDir)); err != nil {
return err
}
return utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, lkgFile))
if err := utilfiles.EnsureFile(s.fs, s.metaPath(currentFile)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(lastKnownGoodFile)); err != nil {
return err
}
// ensure checkpoints directory (saves unpacked payloads in subdirectories named after payload UID)
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(uid string) (bool, error) {
ok, err := utilfiles.FileExists(s.fs, filepath.Join(s.checkpointsDir, uid))
func (s *fsStore) Exists(c checkpoint.RemoteConfigSource) (bool, error) {
// we check whether the directory was created for the resource
uid := c.UID()
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(uid))
if err != nil {
return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err)
}
return ok, nil
}
func (s *fsStore) Save(c checkpoint.Checkpoint) error {
// encode the checkpoint
data, err := c.Encode()
if err != nil {
return err
}
// save the file
return utilfiles.ReplaceFile(s.fs, filepath.Join(s.checkpointsDir, c.UID()), data)
func (s *fsStore) Save(c checkpoint.Payload) error {
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, s.checkpointPath(c.UID()), c.Files())
}
func (s *fsStore) Load(uid string) (checkpoint.Checkpoint, error) {
filePath := filepath.Join(s.checkpointsDir, uid)
utillog.Infof("loading configuration from %q", filePath)
// load the file
data, err := s.fs.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read checkpoint file %q, error: %v", filePath, err)
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s:%s", source.APIPath(), source.UID())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, fmt.Errorf("failed to determine if a checkpoint exists for source %s", sourceFmt)
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// decode it
c, err := checkpoint.DecodeCheckpoint(data)
// load the kubelet config file
utillog.Infof("loading kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID()), kubeletKey))
if err != nil {
return nil, fmt.Errorf("failed to decode checkpoint file %q, error: %v", filePath, err)
return nil, err
}
return c, nil
kc, err := loader.Load()
if err != nil {
return nil, err
}
return kc, nil
}
func (s *fsStore) CurrentModified() (time.Time, error) {
path := filepath.Join(s.checkpointsDir, curFile)
path := s.metaPath(currentFile)
info, err := s.fs.Stat(path)
if err != nil {
return time.Time{}, fmt.Errorf("failed to stat %q while checking modification time, error: %v", path, err)
@ -105,34 +122,35 @@ func (s *fsStore) CurrentModified() (time.Time, error) {
}
func (s *fsStore) Current() (checkpoint.RemoteConfigSource, error) {
return s.sourceFromFile(curFile)
return readRemoteConfigSource(s.fs, s.metaPath(currentFile))
}
func (s *fsStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return s.sourceFromFile(lkgFile)
return readRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile))
}
func (s *fsStore) SetCurrent(source checkpoint.RemoteConfigSource) error {
return s.setSourceFile(curFile, source)
}
func (s *fsStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) {
return setCurrentUpdated(s, source)
return writeRemoteConfigSource(s.fs, s.metaPath(currentFile), source)
}
func (s *fsStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
return s.setSourceFile(lkgFile, source)
return writeRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile), source)
}
func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
// sourceFromFile returns the RemoteConfigSource stored in the file at `s.checkpointsDir/relPath`,
// or nil if the file is empty
func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource, error) {
path := filepath.Join(s.checkpointsDir, relPath)
data, err := s.fs.ReadFile(path)
func (s *fsStore) checkpointPath(uid string) string {
return filepath.Join(s.dir, checkpointsDir, uid)
}
func (s *fsStore) metaPath(name string) string {
return filepath.Join(s.dir, metaDir, name)
}
func readRemoteConfigSource(fs utilfs.Filesystem, path string) (checkpoint.RemoteConfigSource, error) {
data, err := fs.ReadFile(path)
if err != nil {
return nil, err
} else if len(data) == 0 {
@ -141,17 +159,15 @@ func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource,
return checkpoint.DecodeRemoteConfigSource(data)
}
// set source file replaces the file at `s.checkpointsDir/relPath` with a file containing `source`
func (s *fsStore) setSourceFile(relPath string, source checkpoint.RemoteConfigSource) error {
path := filepath.Join(s.checkpointsDir, relPath)
func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoint.RemoteConfigSource) error {
// if nil, reset the file
if source == nil {
return utilfiles.ReplaceFile(s.fs, path, []byte{})
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {
return err
}
return utilfiles.ReplaceFile(s.fs, path, data)
return utilfiles.ReplaceFile(fs, path, data)
}

View File

@ -18,7 +18,9 @@ package store
import (
"fmt"
"io/ioutil"
"path/filepath"
"reflect"
"testing"
"time"
@ -27,17 +29,37 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const testCheckpointsDir = "/test-checkpoints-dir"
var testdir string
func init() {
tmp, err := ioutil.TempDir("", "fsstore-test")
if err != nil {
panic(err)
}
testdir = tmp
}
func newInitializedFakeFsStore() (*fsStore, error) {
fs := utilfs.NewFakeFs()
store := NewFsStore(fs, testCheckpointsDir)
// Test with the default filesystem, the fake filesystem has an issue caused by afero: https://github.com/spf13/afero/issues/141
// The default filesystem also behaves more like production, so we should probably not mock the filesystem for unit tests.
fs := utilfs.DefaultFs{}
tmpdir, err := fs.TempDir(testdir, "store-")
if err != nil {
return nil, err
}
store := NewFsStore(fs, tmpdir)
if err := store.Initialize(); err != nil {
return nil, err
}
@ -50,167 +72,203 @@ func TestFsStoreInitialize(t *testing.T) {
t.Fatalf("fsStore.Initialize() failed with error: %v", err)
}
// check that testCheckpointsDir exists
_, err = store.fs.Stat(testCheckpointsDir)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", testCheckpointsDir, err)
// check that store.dir exists
if _, err := store.fs.Stat(store.dir); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.dir, err)
}
// check that testCheckpointsDir contains the curFile
curPath := filepath.Join(testCheckpointsDir, curFile)
_, err = store.fs.Stat(curPath)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", curPath, err)
// check that meta dir exists
if _, err := store.fs.Stat(store.metaPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(""), err)
}
// check that testCheckpointsDir contains the lkgFile
lkgPath := filepath.Join(testCheckpointsDir, lkgFile)
_, err = store.fs.Stat(lkgPath)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", lkgPath, err)
// check that checkpoints dir exists
if _, err := store.fs.Stat(store.checkpointPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.checkpointPath(""), err)
}
// check that currentFile exists
if _, err := store.fs.Stat(store.metaPath(currentFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(currentFile), err)
}
// check that lastKnownGoodFile exists
if _, err := store.fs.Stat(store.metaPath(lastKnownGoodFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(lastKnownGoodFile), err)
}
}
func TestFsStoreExists(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
// create a checkpoint file; this is enough for an exists check
cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: "uid"},
})
// checkpoint a payload
const uid = "uid"
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid}})
if err != nil {
t.Fatalf("could not construct checkpoint, error: %v", err)
}
saveTestCheckpointFile(t, store.fs, cpt)
store.Save(p)
cases := []struct {
desc string
uid string // the uid to test
uid types.UID
expect bool
err string
}{
{"exists", "uid", true, ""},
{"exists", uid, true, ""},
{"does not exist", "bogus-uid", false, ""},
}
for _, c := range cases {
ok, err := store.Exists(c.uid)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if c.expect != ok {
t.Errorf("case %q, expect %t but got %t", c.desc, c.expect, ok)
}
t.Run(c.desc, func(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: c.uid}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
ok, err := store.Exists(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if c.expect != ok {
t.Errorf("expect %t but got %t", c.expect, ok)
}
})
}
}
func TestFsStoreSave(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: "uid"},
})
if err != nil {
t.Fatalf("could not construct checkpoint, error: %v", err)
nameTooLong := func() string {
s := ""
for i := 0; i < 256; i++ {
s += "a"
}
return s
}()
cases := []struct {
desc string
files map[string]string
err string
}{
{"valid payload", map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
}
// save the checkpoint
err = store.Save(cpt)
if err != nil {
t.Fatalf("unable to save checkpoint, error: %v", err)
}
// expect the saved checkpoint file to match the encoding of the checkpoint
data, err := cpt.Encode()
if err != nil {
t.Fatalf("unable to encode the checkpoint, error: %v", err)
}
expect := string(data)
data = readTestCheckpointFile(t, store.fs, cpt.UID())
cptFile := string(data)
if expect != cptFile {
t.Errorf("expect %q but got %q", expect, cptFile)
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// construct the payload
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: "uid"},
Data: c.files,
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
err = store.Save(p)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// read the saved checkpoint
m, err := mapFromCheckpoint(store, p.UID())
if err != nil {
t.Fatalf("error loading checkpoint to map: %v", err)
}
// compare our expectation to what got saved
expect := p.Files()
if !reflect.DeepEqual(expect, m) {
t.Errorf("expect %v, but got %v", expect, m)
}
})
}
}
func TestFsStoreLoad(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}})
// encode a kubelet configuration that has all defaults set
expect, err := newKubeletConfiguration()
if err != nil {
t.Fatalf("unable to construct checkpoint, error: %v", err)
t.Fatalf("error constructing KubeletConfiguration: %v", err)
}
data, err := utilcodec.EncodeKubeletConfig(expect, v1beta1.SchemeGroupVersion)
if err != nil {
t.Fatalf("error encoding KubeletConfiguration: %v", err)
}
// construct a payload that contains the kubeletconfig
const uid = "uid"
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)},
Data: map[string]string{
kubeletKey: string(data),
},
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
err = store.Save(p)
if err != nil {
t.Fatalf("error saving payload: %v", err)
}
cases := []struct {
desc string
loadUID string
cpt checkpoint.Checkpoint
err string
desc string
uid types.UID
err string
}{
{"checkpoint exists", uid, cpt, ""},
{"checkpoint does not exist", "bogus-uid", nil, "failed to read"},
{"checkpoint exists", uid, ""},
{"checkpoint does not exist", "bogus-uid", "no checkpoint for source"},
}
for _, c := range cases {
if c.cpt != nil {
saveTestCheckpointFile(t, store.fs, c.cpt)
}
cpt, err := store.Load(c.loadUID)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !checkpoint.EqualCheckpoints(c.cpt, cpt) {
t.Errorf("case %q, expect %q but got %q", c.desc, spew.Sdump(c.cpt), spew.Sdump(cpt))
}
}
}
func TestFsStoreRoundTrip(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
}
const uid = "uid"
cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}})
if err != nil {
t.Fatalf("unable to construct checkpoint, error: %v", err)
}
err = store.Save(cpt)
if err != nil {
t.Fatalf("unable to save checkpoint, error: %v", err)
}
cptAfter, err := store.Load(uid)
if err != nil {
t.Fatalf("unable to load checkpoint, error: %v", err)
}
if !checkpoint.EqualCheckpoints(cpt, cptAfter) {
t.Errorf("expect %q but got %q", spew.Sdump(cpt), spew.Sdump(cptAfter))
t.Run(c.desc, func(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: c.uid}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
loaded, err := store.Load(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !reflect.DeepEqual(expect, loaded) {
t.Errorf("expect %#v, but got %#v", expect, loaded)
}
})
}
}
func TestFsStoreCurrentModified(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
// create an empty current file, this is good enough for testing
saveTestSourceFile(t, store.fs, curFile, nil)
saveTestSourceFile(t, store, currentFile, nil)
// set the timestamps to the current time, so we can compare to result of store.SetCurrentModified
// set the timestamps to the current time, so we can compare to result of store.CurrentModified
now := time.Now()
err = store.fs.Chtimes(filepath.Join(testCheckpointsDir, curFile), now, now)
err = store.fs.Chtimes(store.metaPath(currentFile), now, now)
if err != nil {
t.Fatalf("could not change timestamps, error: %v", err)
}
@ -229,7 +287,7 @@ func TestFsStoreCurrentModified(t *testing.T) {
func TestFsStoreCurrent(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
@ -247,24 +305,27 @@ func TestFsStoreCurrent(t *testing.T) {
{"non-default source", source, ""},
}
for _, c := range cases {
// save the last known good source
saveTestSourceFile(t, store.fs, curFile, c.expect)
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, currentFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.Current()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
// load last-known-good and compare to expected result
source, err := store.Current()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
@ -282,28 +343,37 @@ func TestFsStoreLastKnownGood(t *testing.T) {
{"non-default source", source, ""},
}
for _, c := range cases {
// save the last known good source
saveTestSourceFile(t, store.fs, lkgFile, c.expect)
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, lastKnownGoodFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.LastKnownGood()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
// load last-known-good and compare to expected result
source, err := store.LastKnownGood()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreSetCurrent(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, uid, "\n")
expect := fmt.Sprintf(`apiVersion: v1
configMapRef:
name: name
namespace: namespace
uid: %s
kind: NodeConfigSource
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{
Name: "name", Namespace: "namespace", UID: types.UID(uid)}})
if err != nil {
@ -316,97 +386,26 @@ func TestFsStoreSetCurrent(t *testing.T) {
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store.fs, curFile)
data := readTestSourceFile(t, store, currentFile)
if expect != string(data) {
t.Errorf("expect current source file to contain %q, but got %q", expect, string(data))
}
}
func TestFsStoreSetCurrentUpdated(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
}
cases := []struct {
current string
newCurrent string
expectUpdated bool
err string
}{
{"", "", false, ""},
{"uid", "", true, ""},
{"", "uid", true, ""},
{"uid", "uid", false, ""},
{"uid", "other-uid", true, ""},
{"other-uid", "uid", true, ""},
{"other-uid", "other-uid", false, ""},
}
for _, c := range cases {
// construct current source
var source checkpoint.RemoteConfigSource
expectSource := ""
if len(c.current) > 0 {
expectSource = fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, c.current, "\n")
source, _, err = checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{
Name: "name", Namespace: "namespace", UID: types.UID(c.current)}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// construct new source
var newSource checkpoint.RemoteConfigSource
expectNewSource := ""
if len(c.newCurrent) > 0 {
expectNewSource = fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"new-name","uid":"%s"}}%s`, c.newCurrent, "\n")
newSource, _, err = checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{
Name: "new-name", Namespace: "namespace", UID: types.UID(c.newCurrent)}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// set the initial current
if err := store.SetCurrent(source); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// update to the new current
updated, err := store.SetCurrentUpdated(newSource)
if utiltest.SkipRest(t, fmt.Sprintf("%q -> %q", c.current, c.newCurrent), err, c.err) {
continue
}
// check that SetCurrentUpdated correctly reports whether the current checkpoint changed
if c.expectUpdated != updated {
t.Errorf("case %q -> %q, expect %v but got %v", c.current, c.newCurrent, c.expectUpdated, updated)
}
// check that curFile is saved by SetCurrentUpdated as we expect
data := readTestSourceFile(t, store.fs, curFile)
if c.current == c.newCurrent {
// same UID should leave file unchanged
if expectSource != string(data) {
t.Errorf("case %q -> %q, expect current source file to contain %q, but got %q", c.current, c.newCurrent, expectSource, string(data))
}
} else if expectNewSource != string(data) {
// otherwise expect the file to change
t.Errorf("case %q -> %q, expect current source file to contain %q, but got %q", c.current, c.newCurrent, expectNewSource, string(data))
}
}
}
func TestFsStoreSetLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, uid, "\n")
expect := fmt.Sprintf(`apiVersion: v1
configMapRef:
name: name
namespace: namespace
uid: %s
kind: NodeConfigSource
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{
Name: "name", Namespace: "namespace", UID: types.UID(uid)}})
if err != nil {
@ -419,7 +418,7 @@ func TestFsStoreSetLastKnownGood(t *testing.T) {
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store.fs, lkgFile)
data := readTestSourceFile(t, store, lastKnownGoodFile)
if expect != string(data) {
t.Errorf("expect last-known-good source file to contain %q, but got %q", expect, string(data))
}
@ -428,7 +427,7 @@ func TestFsStoreSetLastKnownGood(t *testing.T) {
func TestFsStoreReset(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}})
@ -453,47 +452,49 @@ func TestFsStoreReset(t *testing.T) {
{"otherSource -> source", otherSource, source, true},
}
for _, c := range cases {
// manually save the sources to their respective files
saveTestSourceFile(t, store.fs, curFile, c.current)
saveTestSourceFile(t, store.fs, lkgFile, c.lastKnownGood)
t.Run(c.desc, func(t *testing.T) {
// manually save the sources to their respective files
saveTestSourceFile(t, store, currentFile, c.current)
saveTestSourceFile(t, store, lastKnownGoodFile, c.lastKnownGood)
// reset
updated, err := store.Reset()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// reset
updated, err := store.Reset()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// make sure the files were emptied
if size := testSourceFileSize(t, store.fs, curFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, curFile, size)
}
if size := testSourceFileSize(t, store.fs, lkgFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lkgFile, size)
}
// make sure the files were emptied
if size := testSourceFileSize(t, store, currentFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, currentFile, size)
}
if size := testSourceFileSize(t, store, lastKnownGoodFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lastKnownGoodFile, size)
}
// make sure Current() and LastKnownGood() both return nil
current, err := store.Current()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
lastKnownGood, err := store.LastKnownGood()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if current != nil || lastKnownGood != nil {
t.Errorf("case %q, expect nil for current and last-known-good checkpoints, but still have %q and %q, respectively",
c.desc, current, lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated)
}
// make sure Current() and LastKnownGood() both return nil
current, err := store.Current()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
lastKnownGood, err := store.LastKnownGood()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if current != nil || lastKnownGood != nil {
t.Errorf("case %q, expect nil for current and last-known-good checkpoints, but still have %q and %q, respectively",
c.desc, current, lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated)
}
})
}
}
func TestFsStoreSourceFromFile(t *testing.T) {
func TestFsStoreReadRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
@ -513,21 +514,24 @@ func TestFsStoreSourceFromFile(t *testing.T) {
const name = "some-source-file"
for _, c := range cases {
saveTestSourceFile(t, store.fs, name, c.expect)
source, err := store.sourceFromFile(name)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
t.Run(c.desc, func(t *testing.T) {
saveTestSourceFile(t, store, name, c.expect)
source, err := readRemoteConfigSource(store.fs, store.metaPath(name))
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreSetSourceFile(t *testing.T) {
func TestFsStoreWriteRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("failed to construct a store, error: %v", err)
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}})
@ -536,93 +540,112 @@ func TestFsStoreSetSourceFile(t *testing.T) {
}
cases := []struct {
desc string
source checkpoint.RemoteConfigSource
}{
{nil},
{source},
{"nil source", nil},
{"non-nil source", source},
}
const name = "some-source-file"
for _, c := range cases {
// set the source file
err := store.setSourceFile(name, c.source)
if err != nil {
t.Fatalf("unable to set source file, error: %v", err)
}
// read back the file
data := readTestSourceFile(t, store.fs, name)
str := string(data)
if c.source != nil {
// expect the contents to match the encoding of the source
data, err := c.source.Encode()
expect := string(data)
t.Run(c.desc, func(t *testing.T) {
// set the source file
err := writeRemoteConfigSource(store.fs, store.metaPath(name), c.source)
if err != nil {
t.Fatalf("couldn't encode source, error: %v", err)
t.Fatalf("unable to set source file, error: %v", err)
}
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
} else {
// expect empty file
expect := ""
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
// read back the file
data := readTestSourceFile(t, store, name)
str := string(data)
if c.source != nil {
// expect the contents to match the encoding of the source
data, err := c.source.Encode()
expect := string(data)
if err != nil {
t.Fatalf("couldn't encode source, error: %v", err)
}
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
} else {
// expect empty file
expect := ""
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
}
})
}
}
func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid))
if err != nil {
return nil, err
}
m := map[string]string{}
for _, f := range files {
// expect no subdirs, only regular files
if !f.Mode().IsRegular() {
return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid)
}
// read the file contents and build the map
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid), f.Name()))
if err != nil {
return nil, err
}
m[f.Name()] = string(data)
}
return m, nil
}
func readTestCheckpointFile(t *testing.T, fs utilfs.Filesystem, uid string) []byte {
data, err := fs.ReadFile(filepath.Join(testCheckpointsDir, uid))
if err != nil {
t.Fatalf("unable to read test checkpoint file, error: %v", err)
}
return data
}
func saveTestCheckpointFile(t *testing.T, fs utilfs.Filesystem, cpt checkpoint.Checkpoint) {
data, err := cpt.Encode()
if err != nil {
t.Fatalf("unable to encode test checkpoint, error: %v", err)
}
fmt.Println(cpt.UID())
err = utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, cpt.UID()), data)
if err != nil {
t.Fatalf("unable to save test checkpoint file, error: %v", err)
}
}
func readTestSourceFile(t *testing.T, fs utilfs.Filesystem, relPath string) []byte {
data, err := fs.ReadFile(filepath.Join(testCheckpointsDir, relPath))
func readTestSourceFile(t *testing.T, store *fsStore, relPath string) []byte {
data, err := store.fs.ReadFile(store.metaPath(relPath))
if err != nil {
t.Fatalf("unable to read test source file, error: %v", err)
}
return data
}
func saveTestSourceFile(t *testing.T, fs utilfs.Filesystem, relPath string, source checkpoint.RemoteConfigSource) {
func saveTestSourceFile(t *testing.T, store *fsStore, relPath string, source checkpoint.RemoteConfigSource) {
if source != nil {
data, err := source.Encode()
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
err = utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, relPath), data)
err = utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), data)
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
} else {
err := utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, relPath), []byte{})
err := utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), []byte{})
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
}
}
func testSourceFileSize(t *testing.T, fs utilfs.Filesystem, relPath string) int64 {
info, err := fs.Stat(filepath.Join(testCheckpointsDir, relPath))
func testSourceFileSize(t *testing.T, store *fsStore, relPath string) int64 {
info, err := store.fs.Stat(store.metaPath(relPath))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return info.Size()
}
// newKubeletConfiguration will create a new KubeletConfiguration with default values set
func newKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
s, _, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
versioned := &v1beta1.KubeletConfiguration{}
s.Default(versioned)
config := &kubeletconfig.KubeletConfiguration{}
if err := s.Convert(versioned, config, nil); err != nil {
return nil, err
}
return config, nil
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
@ -27,22 +28,25 @@ import (
type Store interface {
// Initialize sets up the storage layer
Initialize() error
// Exists returns true if a checkpoint with `uid` exists in the store, false otherwise
Exists(uid string) (bool, error)
// Save saves the checkpoint to the storage layer
Save(c checkpoint.Checkpoint) error
// Load loads the checkpoint with UID `uid` from the storage layer, or returns an error if the checkpoint does not exist
Load(uid string) (checkpoint.Checkpoint, error)
// Exists returns true if the object referenced by `source` has been checkpointed.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:
// - k8s.io/api/core/v1.ConfigMap
Save(c checkpoint.Payload) error
// Load loads the KubeletConfiguration from the checkpoint referenced by `source`.
Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error)
// CurrentModified returns the last time that the current UID was set
CurrentModified() (time.Time, error)
// Current returns the source that points to the current checkpoint, or nil if no current checkpoint is set
Current() (checkpoint.RemoteConfigSource, error)
// LastKnownGood returns the source that points to the last-known-good checkpoint, or nil if no last-known-good checkpoint is set
LastKnownGood() (checkpoint.RemoteConfigSource, error)
// SetCurrent saves the source that points to the current checkpoint, set to nil to unset
SetCurrent(source checkpoint.RemoteConfigSource) error
// SetCurrentUpdated is similar to SetCurrent, but also returns whether the current checkpoint changed as a result
SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error)
// SetLastKnownGood saves the source that points to the last-known-good checkpoint, set to nil to unset
SetLastKnownGood(source checkpoint.RemoteConfigSource) error
// Reset unsets the current and last-known-good UIDs and returns whether the current UID was unset as a result of the reset
@ -51,34 +55,15 @@ type Store interface {
// reset is a helper for implementing Reset, which can be implemented in terms of Store methods
func reset(s Store) (bool, error) {
current, err := s.Current()
if err != nil {
return false, err
}
if err := s.SetLastKnownGood(nil); err != nil {
return false, fmt.Errorf("failed to reset last-known-good UID in checkpoint store, error: %v", err)
}
updated, err := s.SetCurrentUpdated(nil)
if err != nil {
if err := s.SetCurrent(nil); err != nil {
return false, fmt.Errorf("failed to reset current UID in checkpoint store, error: %v", err)
}
return updated, nil
}
// setCurrentUpdated is a helper for implementing SetCurrentUpdated, which can be implemented in terms of Store methods
func setCurrentUpdated(s Store, source checkpoint.RemoteConfigSource) (bool, error) {
cur, err := s.Current()
if err != nil {
return false, err
}
// if both are nil, no need to update
if cur == nil && source == nil {
return false, nil
}
// if UIDs match, no need to update
if (source != nil && cur != nil) && cur.UID() == source.UID() {
return false, nil
}
// update the source
if err := s.SetCurrent(source); err != nil {
return false, err
}
return true, nil
return current != nil, nil
}

View File

@ -59,38 +59,3 @@ func TestReset(t *testing.T) {
}
}
}
func TestSetCurrentUpdated(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "other-name", Namespace: "namespace", UID: "other-uid"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
s *fakeStore
newCurrent checkpoint.RemoteConfigSource
updated bool
}{
{&fakeStore{current: nil}, nil, false},
{&fakeStore{current: nil}, source, true},
{&fakeStore{current: source}, source, false},
{&fakeStore{current: source}, nil, true},
{&fakeStore{current: source}, otherSource, true},
}
for _, c := range cases {
current := c.s.current
updated, err := setCurrentUpdated(c.s, c.newCurrent)
if err != nil {
t.Fatalf("case %q -> %q, unexpected error: %v", current, c.newCurrent, err)
}
if c.newCurrent != c.s.current {
t.Errorf("case %q -> %q, expect current UID to be %q, but got %q", current, c.newCurrent, c.newCurrent, c.s.current)
}
if c.updated != updated {
t.Errorf("case %q -> %q, expect setCurrentUpdated to return %t, but got %t", current, c.newCurrent, c.updated, updated)
}
}
}

View File

@ -135,44 +135,48 @@ func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *api
// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist,
// if a failure occurs, returns a sanitized failure reason and an error
func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) {
uid := source.UID()
// if the checkpoint already exists, skip downloading
if ok, err := cc.checkpointStore.Exists(uid); err != nil {
reason := fmt.Sprintf(status.FailSyncReasonCheckpointExistenceFmt, source.APIPath(), uid)
if ok, err := cc.checkpointStore.Exists(source); err != nil {
reason := fmt.Sprintf(status.FailSyncReasonCheckpointExistenceFmt, source.APIPath(), source.UID())
return reason, fmt.Errorf("%s, error: %v", reason, err)
} else if ok {
utillog.Infof("checkpoint already exists for object with UID %q, skipping download", uid)
utillog.Infof("checkpoint already exists for object %s with UID %s, skipping download", source.APIPath(), source.UID())
return "", nil
}
// download
checkpoint, reason, err := source.Download(client)
payload, reason, err := source.Download(client)
if err != nil {
return reason, fmt.Errorf("%s, error: %v", reason, err)
}
// save
err = cc.checkpointStore.Save(checkpoint)
err = cc.checkpointStore.Save(payload)
if err != nil {
reason := fmt.Sprintf(status.FailSyncReasonSaveCheckpointFmt, source.APIPath(), checkpoint.UID())
reason := fmt.Sprintf(status.FailSyncReasonSaveCheckpointFmt, source.APIPath(), payload.UID())
return reason, fmt.Errorf("%s, error: %v", reason, err)
}
return "", nil
}
// setCurrentConfig updates UID of the current checkpoint in the checkpoint store to `uid` and returns whether the
// current UID changed as a result, or a sanitized failure reason and an error.
// setCurrentConfig the current checkpoint config in the store
// returns whether the current config changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setCurrentConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
updated, err := cc.checkpointStore.SetCurrentUpdated(source)
if err != nil {
failReason := func(s checkpoint.RemoteConfigSource) string {
if source == nil {
return false, status.FailSyncReasonSetCurrentLocal, err
return status.FailSyncReasonSetCurrentLocal
}
return false, fmt.Sprintf(status.FailSyncReasonSetCurrentUIDFmt, source.APIPath(), source.UID()), err
return fmt.Sprintf(status.FailSyncReasonSetCurrentUIDFmt, source.APIPath(), source.UID())
}
return updated, "", nil
current, err := cc.checkpointStore.Current()
if err != nil {
return false, failReason(source), err
}
if err := cc.checkpointStore.SetCurrent(source); err != nil {
return false, failReason(source), err
}
return !checkpoint.EqualRemoteConfigSources(current, source), "", nil
}
// resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and

View File

@ -38,7 +38,7 @@ import (
)
const (
checkpointsDir = "checkpoints"
storeDir = "store"
// TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
// because it is not especially clear where this should live in the API.
configTrialDuration = 10 * time.Minute
@ -70,7 +70,7 @@ func NewController(defaultConfig *kubeletconfig.KubeletConfiguration, dynamicCon
// channels must have capacity at least 1, since we signal with non-blocking writes
pendingConfigSource: make(chan bool, 1),
configOk: status.NewConfigOkCondition(),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, checkpointsDir)),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
}
}
@ -182,28 +182,23 @@ func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.E
// a clean success or failure reason that can be reported in the status, and any error that occurs.
// If the local config should be used, it will be returned. You should validate local before passing it to this function.
func (cc *Controller) loadAssignedConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, string, error) {
src, err := cc.checkpointStore.Current()
source, err := cc.checkpointStore.Current()
if err != nil {
return nil, nil, fmt.Sprintf(status.CurFailLoadReasonFmt, "unknown"), err
}
// nil source is the signal to use the local config
if src == nil {
return local, src, status.CurLocalOkayReason, nil
if source == nil {
return local, source, status.CurLocalOkayReason, nil
}
curUID := src.UID()
// load from checkpoint
checkpoint, err := cc.checkpointStore.Load(curUID)
// load KubeletConfiguration from checkpoint
kc, err := cc.checkpointStore.Load(source)
if err != nil {
return nil, src, fmt.Sprintf(status.CurFailLoadReasonFmt, src.APIPath()), err
return nil, source, fmt.Sprintf(status.CurFailLoadReasonFmt, source.APIPath()), err
}
cur, err := checkpoint.Parse()
if err != nil {
return nil, src, fmt.Sprintf(status.CurFailParseReasonFmt, src.APIPath()), err
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
return nil, source, fmt.Sprintf(status.CurFailValidateReasonFmt, source.APIPath()), err
}
if err := validation.ValidateKubeletConfiguration(cur); err != nil {
return nil, src, fmt.Sprintf(status.CurFailValidateReasonFmt, src.APIPath()), err
}
return cur, src, status.CurRemoteOkayReason, nil
return kc, source, status.CurRemoteOkayReason, nil
}
// loadLastKnownGoodConfig loads the Kubelet's last-known-good config,
@ -212,28 +207,23 @@ func (cc *Controller) loadAssignedConfig(local *kubeletconfig.KubeletConfigurati
// and any error that occurs.
// If the local config should be used, it will be returned. You should validate local before passing it to this function.
func (cc *Controller) loadLastKnownGoodConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, error) {
src, err := cc.checkpointStore.LastKnownGood()
source, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return nil, nil, fmt.Errorf("unable to determine last-known-good config, error: %v", err)
}
// nil source is the signal to use the local config
if src == nil {
return local, src, nil
if source == nil {
return local, source, nil
}
lkgUID := src.UID()
// load from checkpoint
checkpoint, err := cc.checkpointStore.Load(lkgUID)
kc, err := cc.checkpointStore.Load(source)
if err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailLoadReasonFmt, src.APIPath()), err)
return nil, source, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailLoadReasonFmt, source.APIPath()), err)
}
lkg, err := checkpoint.Parse()
if err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailParseReasonFmt, src.APIPath()), err)
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
return nil, source, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailValidateReasonFmt, source.APIPath()), err)
}
if err := validation.ValidateKubeletConfiguration(lkg); err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailValidateReasonFmt, src.APIPath()), err)
}
return lkg, src, nil
return kc, source, nil
}
// initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
@ -269,14 +259,14 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
return false, nil
}
// graduateAssignedToLastKnownGood sets the last-known-good UID on the checkpointStore
// to the same value as the current UID maintained by the checkpointStore
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the current config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
curUID, err := cc.checkpointStore.Current()
current, err := cc.checkpointStore.Current()
if err != nil {
return err
}
err = cc.checkpointStore.SetLastKnownGood(curUID)
err = cc.checkpointStore.SetLastKnownGood(current)
if err != nil {
return err
}

View File

@ -51,8 +51,6 @@ const (
// CurFailLoadReasonFmt indicates that the Kubelet failed to load the current config checkpoint for an API source
CurFailLoadReasonFmt = "failed to load current: %s"
// CurFailParseReasonFmt indicates that the Kubelet failed to parse the current config checkpoint for an API source
CurFailParseReasonFmt = "failed to parse current: %s"
// CurFailValidateReasonFmt indicates that the Kubelet failed to validate the current config checkpoint for an API source
CurFailValidateReasonFmt = "failed to validate current: %s"
@ -60,8 +58,6 @@ const (
// LkgFailLoadReasonFmt indicates that the Kubelet failed to load the last-known-good config checkpoint for an API source
LkgFailLoadReasonFmt = "failed to load last-known-good: %s"
// LkgFailParseReasonFmt indicates that the Kubelet failed to parse the last-known-good config checkpoint for an API source
LkgFailParseReasonFmt = "failed to parse last-known-good: %s"
// LkgFailValidateReasonFmt indicates that the Kubelet failed to validate the last-known-good config checkpoint for an API source
LkgFailValidateReasonFmt = "failed to validate last-known-good: %s"

View File

@ -13,7 +13,9 @@ go_library(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
],
)

View File

@ -23,17 +23,45 @@ import (
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
)
// TODO(mtaufen): allow an encoder to be injected into checkpoint objects at creation time? (then we could ultimately instantiate only one encoder)
// EncodeKubeletConfig encodes an internal KubeletConfiguration to an external YAML representation
func EncodeKubeletConfig(internal *kubeletconfig.KubeletConfiguration, targetVersion schema.GroupVersion) ([]byte, error) {
encoder, err := newKubeletConfigYAMLEncoder(targetVersion)
if err != nil {
return nil, err
}
// encoder will convert to external version
data, err := runtime.Encode(encoder, internal)
if err != nil {
return nil, err
}
return data, nil
}
// NewJSONEncoder generates a new runtime.Encoder that encodes objects to JSON
func NewJSONEncoder(groupName string) (runtime.Encoder, error) {
// encode to json
mediaType := "application/json"
// newKubeletConfigYAMLEncoder returns an encoder that can write a KubeletConfig to YAML
func newKubeletConfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Encoder, error) {
_, codecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
return codecs.EncoderForVersion(info.Serializer, targetVersion), nil
}
// NewYAMLEncoder generates a new runtime.Encoder that encodes objects to YAML
func NewYAMLEncoder(groupName string) (runtime.Encoder, error) {
// encode to YAML
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)

View File

@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -24,3 +25,13 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["files_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)

View File

@ -24,7 +24,10 @@ import (
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const defaultPerm = 0666
const (
defaultPerm = 0755
tmptag = "tmp_" // additional prefix to prevent accidental collisions
)
// FileExists returns true if a regular file exists at `path`, false if `path` does not exist, otherwise an error
func FileExists(fs utilfs.Filesystem, path string) (bool, error) {
@ -66,7 +69,7 @@ func EnsureFile(fs utilfs.Filesystem, path string) error {
// WriteTmpFile creates a temporary file at `path`, writes `data` into it, and fsyncs the file
func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath string, retErr error) {
dir := filepath.Dir(path)
prefix := filepath.Base(path)
prefix := tmptag + filepath.Base(path)
// create the tmp file
tmpFile, err := fs.TempFile(dir, prefix)
@ -81,7 +84,7 @@ func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath strin
// if there was an error writing, syncing, or closing, delete the temporary file and return the error
if retErr != nil {
if err := fs.Remove(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", path, retErr, err)
retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
tmpPath = ""
}
@ -137,3 +140,88 @@ func EnsureDir(fs utilfs.Filesystem, path string) error {
// create the dir
return fs.MkdirAll(path, defaultPerm)
}
// WriteTempDir creates a temporary dir at `path`, writes `files` into it, and fsyncs all the files
// The keys of `files` represent file names. These names must not:
// - be empty
// - be a path that contains more than the base name of a file (e.g. foo/bar is invalid, as is /bar)
// - match `.` or `..` exactly
// - be longer than 255 characters
// The above validation rules are based on atomic_writer.go, though in this case are more restrictive
// because we only allow a flat hierarchy.
func WriteTempDir(fs utilfs.Filesystem, path string, files map[string]string) (tmpPath string, retErr error) {
// validate the filename keys; for now we only allow a flat keyset
for name := range files {
// invalidate empty names
if name == "" {
return "", fmt.Errorf("invalid file key: must not be empty: %q", name)
}
// invalidate: foo/bar and /bar
if name != filepath.Base(name) {
return "", fmt.Errorf("invalid file key %q, only base names are allowed", name)
}
// invalidate `.` and `..`
if name == "." || name == ".." {
return "", fmt.Errorf("invalid file key, may not be '.' or '..'")
}
// invalidate length > 255 characters
if len(name) > 255 {
return "", fmt.Errorf("invalid file key %q, must be less than 255 characters", name)
}
}
// write the temp directory in parent dir and return path to the tmp directory
dir := filepath.Dir(path)
prefix := tmptag + filepath.Base(path)
// create the tmp dir
var err error
tmpPath, err = fs.TempDir(dir, prefix)
if err != nil {
return "", err
}
// be sure to clean up if there was an error
defer func() {
if retErr != nil {
if err := fs.RemoveAll(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary directory %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
}
}()
// write data
for name, data := range files {
// create the file
file, err := fs.Create(filepath.Join(tmpPath, name))
if err != nil {
return tmpPath, err
}
// be sure to close the file when we're done
defer func() {
// close the file when we're done, don't overwrite primary retErr if close fails
if err := file.Close(); retErr == nil {
retErr = err
}
}()
// write the file
if _, err := file.Write([]byte(data)); err != nil {
return tmpPath, err
}
// sync the file, to ensure it's written in case a hard reset happens
if err := file.Sync(); err != nil {
return tmpPath, err
}
}
return tmpPath, nil
}
// ReplaceDir replaces the contents of the dir at `path` with `files` by writing to a tmp dir in the same
// dir as `path` and renaming the tmp dir over `path`. The dir does not have to exist to use ReplaceDir.
func ReplaceDir(fs utilfs.Filesystem, path string, files map[string]string) error {
// write data to a temporary directory
tmpPath, err := WriteTempDir(fs, path, files)
if err != nil {
return err
}
// rename over target directory
return fs.Rename(tmpPath, path)
}

View File

@ -0,0 +1,293 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package files
import (
"fmt"
"os"
"path/filepath"
"testing"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
prefix = "test-util-files"
)
type file struct {
name string
// mode distinguishes file type,
// we only check for regular vs. directory in these tests,
// specify regular as 0, directory as os.ModeDir
mode os.FileMode
data string // ignored if mode == os.ModeDir
}
func (f *file) write(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
if err := fs.MkdirAll(path, defaultPerm); err != nil {
return err
}
} else if f.mode.IsRegular() {
// create parent directories, if necessary
parents := filepath.Dir(path)
if err := fs.MkdirAll(parents, defaultPerm); err != nil {
return err
}
// create the file
handle, err := fs.Create(path)
if err != nil {
return err
}
_, err = handle.Write([]byte(f.data))
if err != nil {
if cerr := handle.Close(); cerr != nil {
return fmt.Errorf("error %v closing file after error: %v", cerr, err)
}
return err
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
func (f *file) expect(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.IsDir() {
return fmt.Errorf("expected directory, got mode %s", info.Mode().String())
}
} else if f.mode.IsRegular() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return fmt.Errorf("expected regular file, got mode %s", info.Mode().String())
}
data, err := fs.ReadFile(path)
if err != nil {
return err
}
if f.data != string(data) {
return fmt.Errorf("expected file data %q, got %q", f.data, string(data))
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
// write files, perform some function, then attempt to read files back
// if err is non-empty, expects an error from the function performed in the test
// and skips reading back the expected files
type test struct {
desc string
writes []file
expects []file
fn func(fs utilfs.Filesystem, dir string, c *test) []error
err string
}
func (c *test) write(t *testing.T, fs utilfs.Filesystem, dir string) {
for _, f := range c.writes {
if err := f.write(fs, dir); err != nil {
t.Fatalf("error pre-writing file: %v", err)
}
}
}
// you can optionally skip calling t.Errorf by passing a nil t, and process the
// returned errors instead
func (c *test) expect(t *testing.T, fs utilfs.Filesystem, dir string) []error {
errs := []error{}
for _, f := range c.expects {
if err := f.expect(fs, dir); err != nil {
msg := fmt.Errorf("expect %#v, got error: %v", f, err)
errs = append(errs, msg)
if t != nil {
t.Errorf("%s", msg)
}
}
}
return errs
}
// run a test case, with an arbitrary function to execute between write and expect
// if c.fn is nil, errors from c.expect are checked against c.err, instead of errors
// from fn being checked against c.err
func (c *test) run(t *testing.T, fs utilfs.Filesystem) {
// isolate each test case in a new temporary directory
dir, err := fs.TempDir("", prefix)
if err != nil {
t.Fatalf("error creating temporary directory for test: %v", err)
}
c.write(t, fs, dir)
// if fn exists, check errors from fn, then check expected files
if c.fn != nil {
errs := c.fn(fs, dir, c)
if len(errs) > 0 {
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
// skip checking expected files if we expected errors
// (usually means we didn't create file)
return
}
c.expect(t, fs, dir)
return
}
// just check expected files, and compare errors from c.expect to c.err
// (this lets us test the helper functions above)
errs := c.expect(nil, fs, dir)
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
}
// simple test of the above helper functions
func TestHelpers(t *testing.T) {
// omitting the test.fn means test.err is compared to errors from test.expect
cases := []test{
{
desc: "regular file",
writes: []file{{name: "foo", data: "bar"}},
expects: []file{{name: "foo", data: "bar"}},
},
{
desc: "directory",
writes: []file{{name: "foo", mode: os.ModeDir}},
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
desc: "deep regular file",
writes: []file{{name: "foo/bar", data: "baz"}},
expects: []file{{name: "foo/bar", data: "baz"}},
},
{
desc: "deep directory",
writes: []file{{name: "foo/bar", mode: os.ModeDir}},
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
},
{
desc: "missing file",
expects: []file{{name: "foo", data: "bar"}},
err: "no such file or directory",
},
{
desc: "missing directory",
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
err: "no such file or directory",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestWriteTempDir(t *testing.T) {
// writing a tmp dir is covered by TestReplaceDir, but we additionally test filename validation here
c := test{
desc: "invalid file key",
err: "invalid file key",
fn: func(fs utilfs.Filesystem, dir string, c *test) []error {
if _, err := WriteTempDir(fs, filepath.Join(dir, "tmpdir"), map[string]string{"foo/bar": ""}); err != nil {
return []error{err}
}
return nil
},
}
c.run(t, utilfs.DefaultFs{})
}
func TestReplaceDir(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
errs := []error{}
// compute filesets from expected files and call ReplaceDir for each
// we don't nest dirs in test cases, order of ReplaceDir call is not guaranteed
dirs := map[string]map[string]string{}
// allocate dirs
for _, f := range c.expects {
if f.mode.IsDir() {
path := filepath.Join(dir, f.name)
if _, ok := dirs[path]; !ok {
dirs[path] = map[string]string{}
}
} else if f.mode.IsRegular() {
path := filepath.Join(dir, filepath.Dir(f.name))
if _, ok := dirs[path]; !ok {
// require an expectation for the parent directory if there is an expectation for the file
errs = append(errs, fmt.Errorf("no prior parent directory in c.expects for file %s", f.name))
continue
}
dirs[path][filepath.Base(f.name)] = f.data
}
}
// short-circuit test case validation errors
if len(errs) > 0 {
return errs
}
// call ReplaceDir for each desired dir
for path, files := range dirs {
if err := ReplaceDir(fs, path, files); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "fn catches invalid test case",
expects: []file{{name: "foo/bar"}},
err: "no prior parent directory",
},
{
fn: fn,
desc: "empty dir",
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir with files",
expects: []file{
{name: "foo", mode: os.ModeDir},
{name: "foo/bar", data: "baz"},
{name: "foo/baz", data: "bar"},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}

View File

@ -21,6 +21,21 @@ import (
"testing"
)
// ExpectError calls t.Fatalf if the error does not contain a substr match.
// If substr is empty, a nil error is expected.
// It is useful to call ExpectError from subtests.
func ExpectError(t *testing.T, err error, substr string) {
if err != nil {
if len(substr) == 0 {
t.Fatalf("expect nil error but got %q", err.Error())
} else if !strings.Contains(err.Error(), substr) {
t.Fatalf("expect error to contain %q but got %q", substr, err.Error())
}
} else if len(substr) > 0 {
t.Fatalf("expect error to contain %q but got nil error", substr)
}
}
// SkipRest returns true if there was a non-nil error or if we expected an error that didn't happen,
// and logs the appropriate error on the test object.
// The return value indicates whether we should skip the rest of the test case due to the error result.

View File

@ -72,6 +72,11 @@ func (DefaultFs) ReadFile(filename string) ([]byte, error) {
return ioutil.ReadFile(filename)
}
// TempDir via ioutil.TempDir
func (DefaultFs) TempDir(dir, prefix string) (string, error) {
return ioutil.TempDir(dir, prefix)
}
// TempFile via ioutil.TempFile
func (DefaultFs) TempFile(dir, prefix string) (File, error) {
file, err := ioutil.TempFile(dir, prefix)

View File

@ -68,6 +68,11 @@ func (fs *fakeFs) ReadFile(filename string) ([]byte, error) {
return fs.a.ReadFile(filename)
}
// TempDir via afero.TempDir
func (fs *fakeFs) TempDir(dir, prefix string) (string, error) {
return fs.a.TempDir(dir, prefix)
}
// TempFile via afero.TempFile
func (fs *fakeFs) TempFile(dir, prefix string) (File, error) {
file, err := fs.a.TempFile(dir, prefix)

View File

@ -35,6 +35,7 @@ type Filesystem interface {
// from "io/ioutil"
ReadFile(filename string) ([]byte, error)
TempDir(dir, prefix string) (string, error)
TempFile(dir, prefix string) (File, error)
ReadDir(dirname string) ([]os.FileInfo, error)
Walk(root string, walkFn filepath.WalkFunc) error

View File

@ -30,11 +30,11 @@ go_library(
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/remote:go_default_library",
"//test/e2e/common:go_default_library",
@ -54,7 +54,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",

View File

@ -180,7 +180,7 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube
Name: failParseConfigMap.Name}},
expectConfigOk: &apiv1.NodeCondition{Type: apiv1.NodeKubeletConfigOk, Status: apiv1.ConditionFalse,
Message: status.LkgLocalMessage,
Reason: fmt.Sprintf(status.CurFailParseReasonFmt, configMapAPIPath(failParseConfigMap))},
Reason: fmt.Sprintf(status.CurFailLoadReasonFmt, configMapAPIPath(failParseConfigMap))},
expectConfig: nil,
event: true,
},
@ -248,7 +248,7 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube
Name: badConfigMap.Name}},
expectConfigOk: &apiv1.NodeCondition{Type: apiv1.NodeKubeletConfigOk, Status: apiv1.ConditionFalse,
Message: fmt.Sprintf(status.LkgRemoteMessageFmt, configMapAPIPath(lkgConfigMap)),
Reason: fmt.Sprintf(status.CurFailParseReasonFmt, configMapAPIPath(badConfigMap))},
Reason: fmt.Sprintf(status.CurFailLoadReasonFmt, configMapAPIPath(badConfigMap))},
expectConfig: lkgKC,
event: true,
},

View File

@ -27,8 +27,8 @@ go_library(
"//pkg/controller/namespace:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e_node/builder:go_default_library",
"//test/e2e_node/remote:go_default_library",
@ -41,7 +41,6 @@ go_library(
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",

View File

@ -30,14 +30,13 @@ import (
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e_node/builder"
"k8s.io/kubernetes/test/e2e_node/remote"
@ -354,21 +353,7 @@ func addKubeletConfigFlags(cmdArgs *[]string, kc *kubeletconfig.KubeletConfigura
// writeKubeletConfigFile writes the kubelet config file based on the args and returns the filename
func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
// extract the KubeletConfiguration and convert to versioned
versioned := &v1beta1.KubeletConfiguration{}
scheme, _, err := scheme.NewSchemeAndCodecs()
if err != nil {
return err
}
if err := scheme.Convert(internal, versioned, nil); err != nil {
return err
}
// encode
encoder, err := newKubeletConfigJSONEncoder()
if err != nil {
return err
}
data, err := runtime.Encode(encoder, versioned)
data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return err
}
@ -384,20 +369,6 @@ func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path s
return nil
}
func newKubeletConfigJSONEncoder() (runtime.Encoder, error) {
_, kubeletCodecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
mediaType := "application/json"
info, ok := runtime.SerializerInfoForMediaType(kubeletCodecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
return kubeletCodecs.EncoderForVersion(info.Serializer, v1beta1.SchemeGroupVersion), nil
}
// createPodDirectory creates pod directory.
func createPodDirectory() (string, error) {
cwd, err := os.Getwd()

View File

@ -32,16 +32,15 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/test/e2e/framework"
@ -299,17 +298,7 @@ func createConfigMap(f *framework.Framework, internalKC *kubeletconfig.KubeletCo
// constructs a ConfigMap, populating one of its keys with the KubeletConfiguration. Always uses GenerateName to generate a suffix.
func newKubeletConfigMap(name string, internalKC *kubeletconfig.KubeletConfiguration) *apiv1.ConfigMap {
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
framework.ExpectNoError(err)
versioned := &kubeletconfigv1beta1.KubeletConfiguration{}
err = scheme.Convert(internalKC, versioned, nil)
framework.ExpectNoError(err)
encoder, err := newKubeletConfigJSONEncoder()
framework.ExpectNoError(err)
data, err := runtime.Encode(encoder, versioned)
data, err := kubeletconfigcodec.EncodeKubeletConfig(internalKC, kubeletconfigv1beta1.SchemeGroupVersion)
framework.ExpectNoError(err)
cmap := &apiv1.ConfigMap{
@ -353,20 +342,6 @@ func logKubeletMetrics(metricKeys ...string) {
}
}
func newKubeletConfigJSONEncoder() (runtime.Encoder, error) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
mediaType := "application/json"
info, ok := runtime.SerializerInfoForMediaType(kubeletCodecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
return kubeletCodecs.EncoderForVersion(info.Serializer, kubeletconfigv1beta1.SchemeGroupVersion), nil
}
// runCommand runs the cmd and returns the combined stdout and stderr, or an
// error if the command failed.
func runCommand(cmd ...string) (string, error) {