diff --git a/pkg/kubelet/kubeletconfig/checkpoint/BUILD b/pkg/kubelet/kubeletconfig/checkpoint/BUILD index a04eed0d3fe..94844991563 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/BUILD +++ b/pkg/kubelet/kubeletconfig/checkpoint/BUILD @@ -9,23 +9,16 @@ load( go_test( name = "go_default_test", srcs = [ - "checkpoint_test.go", "configmap_test.go", "download_test.go", ], embed = [":go_default_library"], deps = [ - "//pkg/kubelet/apis/kubeletconfig:go_default_library", - "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", - "//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library", - "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//pkg/kubelet/kubeletconfig/util/test:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) @@ -33,15 +26,12 @@ go_test( go_library( name = "go_default_library", srcs = [ - "checkpoint.go", "configmap.go", "download.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint", deps = [ "//pkg/api/legacyscheme:go_default_library", - "//pkg/kubelet/apis/kubeletconfig:go_default_library", - "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", "//pkg/kubelet/kubeletconfig/status:go_default_library", "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//pkg/kubelet/kubeletconfig/util/log:go_default_library", @@ -49,7 +39,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/kubelet/kubeletconfig/checkpoint/checkpoint.go b/pkg/kubelet/kubeletconfig/checkpoint/checkpoint.go deleted file mode 100644 index be44aaaa39c..00000000000 --- a/pkg/kubelet/kubeletconfig/checkpoint/checkpoint.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package checkpoint - -import ( - "fmt" - - apiv1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" -) - -// Checkpoint represents a local copy of a config source (payload) object -type Checkpoint interface { - // UID returns the UID of the config source object behind the Checkpoint - UID() string - // Parse extracts the KubeletConfiguration from the checkpoint, applies defaults, and converts to the internal type - Parse() (*kubeletconfig.KubeletConfiguration, error) - // Encode returns a []byte representation of the config source object behind the Checkpoint - Encode() ([]byte, error) - - // object returns the underlying checkpointed object. If you want to compare sources for equality, use EqualCheckpoints, - // which compares the underlying checkpointed objects for semantic API equality. - object() interface{} -} - -// DecodeCheckpoint is a helper for using the apimachinery to decode serialized checkpoints -func DecodeCheckpoint(data []byte) (Checkpoint, error) { - // decode the checkpoint - obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data) - if err != nil { - return nil, fmt.Errorf("failed to decode, error: %v", err) - } - - // TODO(mtaufen): for now we assume we are trying to load a ConfigMap checkpoint, may need to extend this if we allow other checkpoint types - - // convert it to the external ConfigMap type, so we're consistently working with the external type outside of the on-disk representation - cm := &apiv1.ConfigMap{} - err = legacyscheme.Scheme.Convert(obj, cm, nil) - if err != nil { - return nil, fmt.Errorf("failed to convert decoded object into a v1 ConfigMap, error: %v", err) - } - - return NewConfigMapCheckpoint(cm) -} - -// EqualCheckpoints compares two Checkpoints for equality, if their underlying objects are equal, so are the Checkpoints -func EqualCheckpoints(a, b Checkpoint) bool { - if a != nil && b != nil { - return apiequality.Semantic.DeepEqual(a.object(), b.object()) - } - if a == nil && b == nil { - return true - } - return false -} diff --git a/pkg/kubelet/kubeletconfig/checkpoint/checkpoint_test.go b/pkg/kubelet/kubeletconfig/checkpoint/checkpoint_test.go deleted file mode 100644 index abcf9981b1c..00000000000 --- a/pkg/kubelet/kubeletconfig/checkpoint/checkpoint_test.go +++ /dev/null @@ -1,89 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package checkpoint - -import ( - "testing" - - "github.com/davecgh/go-spew/spew" - - apiv1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" - utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" -) - -func TestDecodeCheckpoint(t *testing.T) { - // generate correct Checkpoint for v1/ConfigMap test case - cm, err := NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID("uid")}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // generate unsupported object encoding for unsupported type test case - unsupported := newUnsupportedEncoded(t) - - // test cases - cases := []struct { - desc string - data []byte - expect Checkpoint // expect a deeply-equal Checkpoint to be returned from Decode - err string // expect error to contain this substring - }{ - // v1/ConfigMap - {"v1/ConfigMap", []byte(`{"apiVersion": "v1","kind": "ConfigMap","metadata": {"uid": "uid"}}`), cm, ""}, - // malformed - {"malformed", []byte("malformed"), nil, "failed to decode"}, - // no UID - {"no UID", []byte(`{"apiVersion": "v1","kind": "ConfigMap"}`), nil, "ConfigMap must have a UID"}, - // well-formed, but unsupported type - {"well-formed, but unsupported encoded type", unsupported, nil, "failed to convert"}, - } - - for _, c := range cases { - cpt, err := DecodeCheckpoint(c.data) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - // Unfortunately reflect.DeepEqual treats nil data structures as != empty data structures, so - // we have to settle for semantic equality of the underlying checkpointed API objects. - // If additional fields are added to the object that implements the Checkpoint interface, - // they should be added to a named sub-object to facilitate a DeepEquals comparison - // of the extra fields. - // decoded checkpoint should match expected checkpoint - if !apiequality.Semantic.DeepEqual(cpt.object(), c.expect.object()) { - t.Errorf("case %q, expect checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt)) - } - } -} - -// newUnsupportedEncoded returns an encoding of an object that does not have a Checkpoint implementation -func newUnsupportedEncoded(t *testing.T) []byte { - encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName) - if err != nil { - t.Fatalf("could not create an encoder, error: %v", err) - } - unsupported := &apiv1.Node{} - data, err := runtime.Encode(encoder, unsupported) - if err != nil { - t.Fatalf("could not encode object, error: %v", err) - } - return data -} diff --git a/pkg/kubelet/kubeletconfig/checkpoint/configmap.go b/pkg/kubelet/kubeletconfig/checkpoint/configmap.go index 076e7ed33d8..2cdc348f37d 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/configmap.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/configmap.go @@ -20,75 +20,36 @@ import ( "fmt" apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" - kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" - utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" ) const configMapConfigKey = "kubelet" -// configMapCheckpoint implements Checkpoint, backed by a v1/ConfigMap config source object -type configMapCheckpoint struct { - kubeletCodecs *serializer.CodecFactory // codecs for the KubeletConfiguration - configMap *apiv1.ConfigMap +// configMapPayload implements Payload, backed by a v1/ConfigMap config source object +type configMapPayload struct { + cm *apiv1.ConfigMap } -// NewConfigMapCheckpoint returns a Checkpoint backed by `cm`. `cm` must be non-nil -// and have a non-empty ObjectMeta.UID, or an error will be returned. -func NewConfigMapCheckpoint(cm *apiv1.ConfigMap) (Checkpoint, error) { +var _ Payload = (*configMapPayload)(nil) + +// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID +func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) { if cm == nil { - return nil, fmt.Errorf("ConfigMap must be non-nil to be treated as a Checkpoint") + return nil, fmt.Errorf("ConfigMap must be non-nil to be a Payload") } else if len(cm.ObjectMeta.UID) == 0 { - return nil, fmt.Errorf("ConfigMap must have a UID to be treated as a Checkpoint") + return nil, fmt.Errorf("ConfigMap must have a UID to be a Payload") } - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - return nil, err - } - - return &configMapCheckpoint{kubeletCodecs, cm}, nil + return &configMapPayload{cm}, nil } -// UID returns the UID of a configMapCheckpoint -func (c *configMapCheckpoint) UID() string { - return string(c.configMap.UID) +func (p *configMapPayload) UID() string { + return string(p.cm.UID) } -// Parse extracts the KubeletConfiguration from v1/ConfigMap checkpoints, applies defaults, and converts to the internal type -func (c *configMapCheckpoint) Parse() (*kubeletconfig.KubeletConfiguration, error) { - const emptyCfgErr = "config was empty, but some parameters are required" - - if len(c.configMap.Data) == 0 { - return nil, fmt.Errorf(emptyCfgErr) - } - - config, ok := c.configMap.Data[configMapConfigKey] - if !ok { - return nil, fmt.Errorf("key %q not found in ConfigMap", configMapConfigKey) - } else if len(config) == 0 { - return nil, fmt.Errorf(emptyCfgErr) - } - - return utilcodec.DecodeKubeletConfiguration(c.kubeletCodecs, []byte(config)) +func (p *configMapPayload) Files() map[string]string { + return p.cm.Data } -// Encode encodes a configMapCheckpoint -func (c *configMapCheckpoint) Encode() ([]byte, error) { - cm := c.configMap - encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName) - if err != nil { - return nil, err - } - data, err := runtime.Encode(encoder, cm) - if err != nil { - return nil, err - } - return data, nil -} - -func (c *configMapCheckpoint) object() interface{} { - return c.configMap +func (p *configMapPayload) object() interface{} { + return p.cm } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go b/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go index a60f34aa6ba..d7770f05a42 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go @@ -17,7 +17,7 @@ limitations under the License. package checkpoint import ( - "fmt" + "reflect" "testing" "github.com/davecgh/go-spew/spew" @@ -25,14 +25,10 @@ import ( apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" - kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" - kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" ) -func TestNewConfigMapCheckpoint(t *testing.T) { +func TestNewConfigMapPayload(t *testing.T) { cases := []struct { desc string cm *apiv1.ConfigMap @@ -44,7 +40,7 @@ func TestNewConfigMapCheckpoint(t *testing.T) { &apiv1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "name", - UID: types.UID("uid"), + UID: "uid", }, Data: map[string]string{ "key1": "value1", @@ -54,184 +50,60 @@ func TestNewConfigMapCheckpoint(t *testing.T) { } for _, c := range cases { - cpt, err := NewConfigMapCheckpoint(c.cm) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - // underlying object should match the object passed in - if !apiequality.Semantic.DeepEqual(cpt.object(), c.cm) { - t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.cm), spew.Sdump(cpt)) - } + t.Run(c.desc, func(t *testing.T) { + payload, err := NewConfigMapPayload(c.cm) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // underlying object should match the object passed in + if !apiequality.Semantic.DeepEqual(c.cm, payload.object()) { + t.Errorf("expect %s but got %s", spew.Sdump(c.cm), spew.Sdump(payload)) + } + }) } } -func TestConfigMapCheckpointUID(t *testing.T) { - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() +func TestConfigMapPayloadUID(t *testing.T) { + const expect = "uid" + payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect}}) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("error constructing payload: %v", err) } + uid := payload.UID() + if expect != uid { + t.Errorf("expect %q, but got %q", expect, uid) + } +} - cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"} - for _, uidIn := range cases { - cpt := &configMapCheckpoint{ - kubeletCodecs, - &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{UID: types.UID(uidIn)}, +func TestConfigMapPayloadFiles(t *testing.T) { + cases := []struct { + desc string + data map[string]string + expect map[string]string + }{ + {"nil", nil, nil}, + {"empty", map[string]string{}, map[string]string{}}, + {"populated", + map[string]string{ + "foo": "1", + "bar": "2", }, - } - // UID method should return the correct value of the UID - uidOut := cpt.UID() - if uidIn != uidOut { - t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut) - } - } -} - -func TestConfigMapCheckpointParse(t *testing.T) { - kubeletScheme, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // get the built-in default configuration - external := &kubeletconfigv1beta1.KubeletConfiguration{} - kubeletScheme.Default(external) - defaultConfig := &kubeletconfig.KubeletConfiguration{} - err = kubeletScheme.Convert(external, defaultConfig, nil) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - cases := []struct { - desc string - cm *apiv1.ConfigMap - expect *kubeletconfig.KubeletConfiguration - err string - }{ - {"empty data", &apiv1.ConfigMap{}, nil, "config was empty"}, - // missing kubelet key - {"missing kubelet key", &apiv1.ConfigMap{Data: map[string]string{ - "bogus": "stuff"}}, nil, fmt.Sprintf("key %q not found", configMapConfigKey)}, - // invalid format - {"invalid yaml", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": "*"}}, nil, "failed to decode"}, - {"invalid json", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": "{*"}}, nil, "failed to decode"}, - // invalid object - {"missing kind", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `{"apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"}, - {"missing version", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `{"kind":"KubeletConfiguration"}`}}, nil, "failed to decode"}, - {"unregistered kind", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `{"kind":"BogusKind","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"}, - {"unregistered version", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `{"kind":"KubeletConfiguration","apiVersion":"bogusversion"}`}}, nil, "failed to decode"}, - // empty object with correct kind and version should result in the defaults for that kind and version - {"default from yaml", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `kind: KubeletConfiguration -apiVersion: kubelet.config.k8s.io/v1beta1`}}, defaultConfig, ""}, - {"default from json", &apiv1.ConfigMap{Data: map[string]string{ - "kubelet": `{"kind":"KubeletConfiguration","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, defaultConfig, ""}, + map[string]string{ + "foo": "1", + "bar": "2", + }}, } for _, c := range cases { - cpt := &configMapCheckpoint{kubeletCodecs, c.cm} - kc, err := cpt.Parse() - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - // we expect the parsed configuration to match what we described in the ConfigMap - if !apiequality.Semantic.DeepEqual(c.expect, kc) { - t.Errorf("case %q, expect config %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(kc)) - } - } -} - -func TestConfigMapCheckpointEncode(t *testing.T) { - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // only one case, based on output from the existing encoder, and since - // this is hard to test (key order isn't guaranteed), we should probably - // just stick to this test case and mostly rely on the round-trip test. - cases := []struct { - desc string - cpt *configMapCheckpoint - expect string - }{ - // we expect Checkpoints to be encoded as a json representation of the underlying API object - {"one-key", - &configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: "one-key"}, - Data: map[string]string{"one": ""}}}, - `{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"one-key","creationTimestamp":null},"data":{"one":""}} -`}, - } - - for _, c := range cases { - data, err := c.cpt.Encode() - // we don't expect any errors from encoding - if utiltest.SkipRest(t, c.desc, err, "") { - continue - } - if string(data) != c.expect { - t.Errorf("case %q, expect encoding %q but got %q", c.desc, c.expect, string(data)) - } - } -} - -func TestConfigMapCheckpointRoundTrip(t *testing.T) { - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - cases := []struct { - desc string - cpt *configMapCheckpoint - decodeErr string - }{ - // empty data - {"empty data", - &configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "empty-data-sha256-e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - UID: "uid", - }, - Data: map[string]string{}}}, - ""}, - // two keys - {"two keys", - &configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431", - UID: "uid", - }, - Data: map[string]string{"one": "", "two": "2"}}}, - ""}, - // missing uid - {"missing uid", - &configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431", - UID: "", - }, - Data: map[string]string{"one": "", "two": "2"}}}, - "must have a UID"}, - } - for _, c := range cases { - // we don't expect any errors from encoding - data, err := c.cpt.Encode() - if utiltest.SkipRest(t, c.desc, err, "") { - continue - } - after, err := DecodeCheckpoint(data) - if utiltest.SkipRest(t, c.desc, err, c.decodeErr) { - continue - } - if !apiequality.Semantic.DeepEqual(c.cpt.object(), after.object()) { - t.Errorf("case %q, expect round-trip result %s but got %s", c.desc, spew.Sdump(c.cpt), spew.Sdump(after)) - } + t.Run(c.desc, func(t *testing.T) { + payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid"}, Data: c.data}) + if err != nil { + t.Fatalf("error constructing payload: %v", err) + } + files := payload.Files() + if !reflect.DeepEqual(c.expect, files) { + t.Errorf("expected %v, but got %v", c.expect, files) + } + }) } } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download.go b/pkg/kubelet/kubeletconfig/checkpoint/download.go index 9b516a0eb72..bbc624adb76 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download.go @@ -30,19 +30,32 @@ import ( utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log" ) +// Payload represents a local copy of a config source (payload) object +type Payload interface { + // UID returns a globally unique (space and time) identifier for the payload. + UID() string + + // Files returns a map of filenames to file contents. + Files() map[string]string + + // object returns the underlying checkpointed object. + object() interface{} +} + // RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint type RemoteConfigSource interface { - // UID returns the UID of the remote config source object + // UID returns a globally unique identifier of the source described by the remote config source object UID() string // APIPath returns the API path to the remote resource, e.g. its SelfLink APIPath() string - // Download downloads the remote config source object returns a Checkpoint backed by the object, + // Download downloads the remote config source object returns a Payload backed by the object, // or a sanitized failure reason and error if the download fails - Download(client clientset.Interface) (Checkpoint, string, error) + Download(client clientset.Interface) (Payload, string, error) // Encode returns a []byte representation of the object behind the RemoteConfigSource Encode() ([]byte, error) - // object returns the underlying source object. If you want to compare sources for equality, use EqualRemoteConfigSources, + // object returns the underlying source object. + // If you want to compare sources for equality, use EqualRemoteConfigSources, // which compares the underlying source objects for semantic API equality. object() interface{} } @@ -70,7 +83,7 @@ func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, } // DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources; -// e.g. the objects stored in the .cur and .lkg files by checkpoint/store/fsstore.go +// e.g. the metadata stored by checkpoint/store/fsstore.go func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) { // decode the remote config source obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data) @@ -97,10 +110,7 @@ func EqualRemoteConfigSources(a, b RemoteConfigSource) bool { if a != nil && b != nil { return apiequality.Semantic.DeepEqual(a.object(), b.object()) } - if a == nil && b == nil { - return true - } - return false + return a == b } // remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources @@ -108,6 +118,8 @@ type remoteConfigMap struct { source *apiv1.NodeConfigSource } +var _ RemoteConfigSource = (*remoteConfigMap)(nil) + func (r *remoteConfigMap) UID() string { return string(r.source.ConfigMapRef.UID) } @@ -119,7 +131,7 @@ func (r *remoteConfigMap) APIPath() string { return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name) } -func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, string, error) { +func (r *remoteConfigMap) Download(client clientset.Interface) (Payload, string, error) { var reason string uid := string(r.source.ConfigMapRef.UID) @@ -138,18 +150,18 @@ func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, stri return nil, reason, fmt.Errorf(reason) } - checkpoint, err := NewConfigMapCheckpoint(cm) + payload, err := NewConfigMapPayload(cm) if err != nil { reason = fmt.Sprintf("invalid downloaded object") return nil, reason, fmt.Errorf("%s, error: %v", reason, err) } utillog.Infof("successfully downloaded ConfigMap with UID %q", uid) - return checkpoint, "", nil + return payload, "", nil } func (r *remoteConfigMap) Encode() ([]byte, error) { - encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName) + encoder, err := utilcodec.NewYAMLEncoder(apiv1.GroupName) if err != nil { return nil, err } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go index 345319fd37c..0f9b3a362cd 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go @@ -25,9 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" fakeclient "k8s.io/client-go/kubernetes/fake" - kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" ) @@ -68,51 +66,47 @@ func TestNewRemoteConfigSource(t *testing.T) { } for _, c := range cases { - src, _, err := NewRemoteConfigSource(c.source) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - // underlying object should match the object passed in - if !apiequality.Semantic.DeepEqual(c.expect.object(), src.object()) { - t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(src)) - } + t.Run(c.desc, func(t *testing.T) { + source, _, err := NewRemoteConfigSource(c.source) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // underlying object should match the object passed in + if !apiequality.Semantic.DeepEqual(c.expect.object(), source.object()) { + t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(source)) + } + }) } } func TestRemoteConfigMapUID(t *testing.T) { - cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"} - for _, uidIn := range cases { - cpt := &remoteConfigMap{ - &apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: types.UID(uidIn)}}, - } - // UID method should return the correct value of the UID - uidOut := cpt.UID() - if uidIn != uidOut { - t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut) - } + const expect = "uid" + source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: expect}}) + if err != nil { + t.Fatalf("error constructing remote config source: %v", err) + } + uid := source.UID() + if expect != uid { + t.Errorf("expect %q, but got %q", expect, uid) } } func TestRemoteConfigMapAPIPath(t *testing.T) { - name := "name" - namespace := "namespace" - cpt := &remoteConfigMap{ - &apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: ""}}, + const namespace = "namespace" + const name = "name" + source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: "uid"}}) + if err != nil { + t.Fatalf("error constructing remote config source: %v", err) } - expect := fmt.Sprintf(configMapAPIPathFmt, cpt.source.ConfigMapRef.Namespace, cpt.source.ConfigMapRef.Name) - // APIPath() method should return the correct path to the referenced resource - path := cpt.APIPath() + expect := fmt.Sprintf(configMapAPIPathFmt, namespace, name) + path := source.APIPath() if expect != path { - t.Errorf("expect APIPath() to return %q, but got %q", expect, namespace) + t.Errorf("expect %q, but got %q", expect, path) } } func TestRemoteConfigMapDownload(t *testing.T) { - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - cm := &apiv1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "name", @@ -120,36 +114,76 @@ func TestRemoteConfigMapDownload(t *testing.T) { UID: "uid", }} client := fakeclient.NewSimpleClientset(cm) + payload, err := NewConfigMapPayload(cm) + if err != nil { + t.Fatalf("error constructing payload: %v", err) + } + + makeSource := func(source *apiv1.NodeConfigSource) RemoteConfigSource { + s, _, err := NewRemoteConfigSource(source) + if err != nil { + t.Fatalf("error constructing remote config source %v", err) + } + return s + } cases := []struct { desc string source RemoteConfigSource - expect Checkpoint + expect Payload err string }{ - // object doesn't exist {"object doesn't exist", - &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "bogus", Namespace: "namespace", UID: "bogus"}}}, + makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "bogus", Namespace: "namespace", UID: "bogus"}}), nil, "not found"}, // UID of downloaded object doesn't match UID of referent found via namespace/name {"UID is incorrect for namespace/name", - &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "bogus"}}}, + makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "bogus"}}), nil, "does not match"}, // successful download {"object exists and reference is correct", - &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}}, - &configMapCheckpoint{kubeletCodecs, cm}, ""}, + makeSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}), + payload, ""}, } for _, c := range cases { - cpt, _, err := c.source.Download(client) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - // "downloaded" object should match the expected - if !apiequality.Semantic.DeepEqual(c.expect.object(), cpt.object()) { - t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt)) - } + t.Run(c.desc, func(t *testing.T) { + payload, _, err := c.source.Download(client) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // downloaded object should match the expected + if !apiequality.Semantic.DeepEqual(c.expect.object(), payload.object()) { + t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(payload)) + } + }) + } +} + +func TestEqualRemoteConfigSources(t *testing.T) { + cases := []struct { + desc string + a RemoteConfigSource + b RemoteConfigSource + expect bool + }{ + {"both nil", nil, nil, true}, + {"a nil", nil, &remoteConfigMap{}, false}, + {"b nil", &remoteConfigMap{}, nil, false}, + {"neither nil, equal", &remoteConfigMap{}, &remoteConfigMap{}, true}, + {"neither nil, not equal", + &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "a"}}}, + &remoteConfigMap{}, + false}, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + if EqualRemoteConfigSources(c.a, c.b) != c.expect { + t.Errorf("expected EqualRemoteConfigSources to return %t, but got %t", c.expect, !c.expect) + } + }) } } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/BUILD b/pkg/kubelet/kubeletconfig/checkpoint/store/BUILD index 035a9641d6f..76f46e824ef 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/BUILD +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/BUILD @@ -14,7 +14,11 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/kubelet/apis/kubeletconfig:go_default_library", + "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", + "//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library", "//pkg/kubelet/kubeletconfig/checkpoint:go_default_library", + "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//pkg/kubelet/kubeletconfig/util/files:go_default_library", "//pkg/kubelet/kubeletconfig/util/test:go_default_library", "//pkg/util/filesystem:go_default_library", @@ -34,7 +38,9 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store", deps = [ + "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/kubeletconfig/checkpoint:go_default_library", + "//pkg/kubelet/kubeletconfig/configfiles:go_default_library", "//pkg/kubelet/kubeletconfig/util/files:go_default_library", "//pkg/kubelet/kubeletconfig/util/log:go_default_library", "//pkg/util/filesystem:go_default_library", diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go index cb5d2491b26..4f76a01f7c5 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fakestore.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint" ) @@ -29,19 +30,21 @@ type fakeStore struct { lastKnownGood checkpoint.RemoteConfigSource } +var _ Store = (*fakeStore)(nil) + func (s *fakeStore) Initialize() error { return fmt.Errorf("Initialize method not supported") } -func (s *fakeStore) Exists(uid string) (bool, error) { +func (s *fakeStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) { return false, fmt.Errorf("Exists method not supported") } -func (s *fakeStore) Save(c checkpoint.Checkpoint) error { +func (s *fakeStore) Save(c checkpoint.Payload) error { return fmt.Errorf("Save method not supported") } -func (s *fakeStore) Load(uid string) (checkpoint.Checkpoint, error) { +func (s *fakeStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) { return nil, fmt.Errorf("Load method not supported") } @@ -62,10 +65,6 @@ func (s *fakeStore) SetCurrent(source checkpoint.RemoteConfigSource) error { return nil } -func (s *fakeStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) { - return setCurrentUpdated(s, source) -} - func (s *fakeStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error { s.lastKnownGood = source return nil diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go index 1026b97a842..3ce1744fabc 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go @@ -21,82 +21,99 @@ import ( "path/filepath" "time" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files" utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log" utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) const ( - curFile = ".cur" - lkgFile = ".lkg" + metaDir = "meta" + currentFile = "current" + lastKnownGoodFile = "last-known-good" + + checkpointsDir = "checkpoints" + kubeletKey = "kubelet" // TODO(mtaufen): eventually the API will have a way to parameterize the kubelet file name, and then we can remove this ) // fsStore is for tracking checkpoints in the local filesystem, implements Store type fsStore struct { // fs is the filesystem to use for storage operations; can be mocked for testing fs utilfs.Filesystem - // checkpointsDir is the absolute path to the storage directory for fsStore - checkpointsDir string + // dir is the absolute path to the storage directory for fsStore + dir string } -// NewFsStore returns a Store that saves its data in `checkpointsDir` -func NewFsStore(fs utilfs.Filesystem, checkpointsDir string) Store { +var _ Store = (*fsStore)(nil) + +// NewFsStore returns a Store that saves its data in dir +func NewFsStore(fs utilfs.Filesystem, dir string) Store { return &fsStore{ - fs: fs, - checkpointsDir: checkpointsDir, + fs: fs, + dir: dir, } } func (s *fsStore) Initialize() error { - utillog.Infof("initializing config checkpoints directory %q", s.checkpointsDir) - if err := utilfiles.EnsureDir(s.fs, s.checkpointsDir); err != nil { + utillog.Infof("initializing config checkpoints directory %q", s.dir) + // ensure top-level dir for store + if err := utilfiles.EnsureDir(s.fs, s.dir); err != nil { return err } - if err := utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, curFile)); err != nil { + // ensure metadata directory and reference files (tracks current and lkg configs) + if err := utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, metaDir)); err != nil { return err } - return utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, lkgFile)) + if err := utilfiles.EnsureFile(s.fs, s.metaPath(currentFile)); err != nil { + return err + } + if err := utilfiles.EnsureFile(s.fs, s.metaPath(lastKnownGoodFile)); err != nil { + return err + } + // ensure checkpoints directory (saves unpacked payloads in subdirectories named after payload UID) + return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir)) } -func (s *fsStore) Exists(uid string) (bool, error) { - ok, err := utilfiles.FileExists(s.fs, filepath.Join(s.checkpointsDir, uid)) +func (s *fsStore) Exists(c checkpoint.RemoteConfigSource) (bool, error) { + // we check whether the directory was created for the resource + uid := c.UID() + ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(uid)) if err != nil { return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err) } return ok, nil } -func (s *fsStore) Save(c checkpoint.Checkpoint) error { - // encode the checkpoint - data, err := c.Encode() - if err != nil { - return err - } - // save the file - return utilfiles.ReplaceFile(s.fs, filepath.Join(s.checkpointsDir, c.UID()), data) +func (s *fsStore) Save(c checkpoint.Payload) error { + // save the checkpoint's files in the appropriate checkpoint dir + return utilfiles.ReplaceDir(s.fs, s.checkpointPath(c.UID()), c.Files()) } -func (s *fsStore) Load(uid string) (checkpoint.Checkpoint, error) { - filePath := filepath.Join(s.checkpointsDir, uid) - utillog.Infof("loading configuration from %q", filePath) - - // load the file - data, err := s.fs.ReadFile(filePath) - if err != nil { - return nil, fmt.Errorf("failed to read checkpoint file %q, error: %v", filePath, err) +func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) { + sourceFmt := fmt.Sprintf("%s:%s", source.APIPath(), source.UID()) + // check if a checkpoint exists for the source + if ok, err := s.Exists(source); err != nil { + return nil, fmt.Errorf("failed to determine if a checkpoint exists for source %s", sourceFmt) + } else if !ok { + return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt) } - - // decode it - c, err := checkpoint.DecodeCheckpoint(data) + // load the kubelet config file + utillog.Infof("loading kubelet configuration checkpoint for source %s", sourceFmt) + loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID()), kubeletKey)) if err != nil { - return nil, fmt.Errorf("failed to decode checkpoint file %q, error: %v", filePath, err) + return nil, err } - return c, nil + kc, err := loader.Load() + if err != nil { + return nil, err + } + return kc, nil } func (s *fsStore) CurrentModified() (time.Time, error) { - path := filepath.Join(s.checkpointsDir, curFile) + path := s.metaPath(currentFile) info, err := s.fs.Stat(path) if err != nil { return time.Time{}, fmt.Errorf("failed to stat %q while checking modification time, error: %v", path, err) @@ -105,34 +122,35 @@ func (s *fsStore) CurrentModified() (time.Time, error) { } func (s *fsStore) Current() (checkpoint.RemoteConfigSource, error) { - return s.sourceFromFile(curFile) + return readRemoteConfigSource(s.fs, s.metaPath(currentFile)) } func (s *fsStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) { - return s.sourceFromFile(lkgFile) + return readRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile)) } func (s *fsStore) SetCurrent(source checkpoint.RemoteConfigSource) error { - return s.setSourceFile(curFile, source) -} - -func (s *fsStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) { - return setCurrentUpdated(s, source) + return writeRemoteConfigSource(s.fs, s.metaPath(currentFile), source) } func (s *fsStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error { - return s.setSourceFile(lkgFile, source) + return writeRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile), source) } func (s *fsStore) Reset() (bool, error) { return reset(s) } -// sourceFromFile returns the RemoteConfigSource stored in the file at `s.checkpointsDir/relPath`, -// or nil if the file is empty -func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource, error) { - path := filepath.Join(s.checkpointsDir, relPath) - data, err := s.fs.ReadFile(path) +func (s *fsStore) checkpointPath(uid string) string { + return filepath.Join(s.dir, checkpointsDir, uid) +} + +func (s *fsStore) metaPath(name string) string { + return filepath.Join(s.dir, metaDir, name) +} + +func readRemoteConfigSource(fs utilfs.Filesystem, path string) (checkpoint.RemoteConfigSource, error) { + data, err := fs.ReadFile(path) if err != nil { return nil, err } else if len(data) == 0 { @@ -141,17 +159,15 @@ func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource, return checkpoint.DecodeRemoteConfigSource(data) } -// set source file replaces the file at `s.checkpointsDir/relPath` with a file containing `source` -func (s *fsStore) setSourceFile(relPath string, source checkpoint.RemoteConfigSource) error { - path := filepath.Join(s.checkpointsDir, relPath) +func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoint.RemoteConfigSource) error { // if nil, reset the file if source == nil { - return utilfiles.ReplaceFile(s.fs, path, []byte{}) + return utilfiles.ReplaceFile(fs, path, []byte{}) } // encode the source and save it to the file data, err := source.Encode() if err != nil { return err } - return utilfiles.ReplaceFile(s.fs, path, data) + return utilfiles.ReplaceFile(fs, path, data) } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go index 939ffb2ad5e..ee1c8b57e63 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go @@ -18,7 +18,9 @@ package store import ( "fmt" + "io/ioutil" "path/filepath" + "reflect" "testing" "time" @@ -27,17 +29,37 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint" + utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files" utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) -const testCheckpointsDir = "/test-checkpoints-dir" +var testdir string + +func init() { + tmp, err := ioutil.TempDir("", "fsstore-test") + if err != nil { + panic(err) + } + testdir = tmp +} func newInitializedFakeFsStore() (*fsStore, error) { - fs := utilfs.NewFakeFs() - store := NewFsStore(fs, testCheckpointsDir) + // Test with the default filesystem, the fake filesystem has an issue caused by afero: https://github.com/spf13/afero/issues/141 + // The default filesystem also behaves more like production, so we should probably not mock the filesystem for unit tests. + fs := utilfs.DefaultFs{} + + tmpdir, err := fs.TempDir(testdir, "store-") + if err != nil { + return nil, err + } + + store := NewFsStore(fs, tmpdir) if err := store.Initialize(); err != nil { return nil, err } @@ -50,167 +72,203 @@ func TestFsStoreInitialize(t *testing.T) { t.Fatalf("fsStore.Initialize() failed with error: %v", err) } - // check that testCheckpointsDir exists - _, err = store.fs.Stat(testCheckpointsDir) - if err != nil { - t.Fatalf("expect %q to exist, but stat failed with error: %v", testCheckpointsDir, err) + // check that store.dir exists + if _, err := store.fs.Stat(store.dir); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", store.dir, err) } - // check that testCheckpointsDir contains the curFile - curPath := filepath.Join(testCheckpointsDir, curFile) - _, err = store.fs.Stat(curPath) - if err != nil { - t.Fatalf("expect %q to exist, but stat failed with error: %v", curPath, err) + // check that meta dir exists + if _, err := store.fs.Stat(store.metaPath("")); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(""), err) } - // check that testCheckpointsDir contains the lkgFile - lkgPath := filepath.Join(testCheckpointsDir, lkgFile) - _, err = store.fs.Stat(lkgPath) - if err != nil { - t.Fatalf("expect %q to exist, but stat failed with error: %v", lkgPath, err) + // check that checkpoints dir exists + if _, err := store.fs.Stat(store.checkpointPath("")); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", store.checkpointPath(""), err) + } + + // check that currentFile exists + if _, err := store.fs.Stat(store.metaPath(currentFile)); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(currentFile), err) + } + + // check that lastKnownGoodFile exists + if _, err := store.fs.Stat(store.metaPath(lastKnownGoodFile)); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(lastKnownGoodFile), err) } } func TestFsStoreExists(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } - // create a checkpoint file; this is enough for an exists check - cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{UID: "uid"}, - }) + // checkpoint a payload + const uid = "uid" + p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid}}) if err != nil { t.Fatalf("could not construct checkpoint, error: %v", err) } - saveTestCheckpointFile(t, store.fs, cpt) + store.Save(p) cases := []struct { desc string - uid string // the uid to test + uid types.UID expect bool err string }{ - {"exists", "uid", true, ""}, + {"exists", uid, true, ""}, {"does not exist", "bogus-uid", false, ""}, } for _, c := range cases { - ok, err := store.Exists(c.uid) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - if c.expect != ok { - t.Errorf("case %q, expect %t but got %t", c.desc, c.expect, ok) - } + t.Run(c.desc, func(t *testing.T) { + source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ + ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: c.uid}}) + if err != nil { + t.Fatalf("error constructing remote config source: %v", err) + } + ok, err := store.Exists(source) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + if c.expect != ok { + t.Errorf("expect %t but got %t", c.expect, ok) + } + }) } } func TestFsStoreSave(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } - cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{UID: "uid"}, - }) - if err != nil { - t.Fatalf("could not construct checkpoint, error: %v", err) + nameTooLong := func() string { + s := "" + for i := 0; i < 256; i++ { + s += "a" + } + return s + }() + + cases := []struct { + desc string + files map[string]string + err string + }{ + {"valid payload", map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""}, + {"empty key name", map[string]string{"": "foocontent"}, "must not be empty"}, + {"key name is not a base file name (foo/bar)", map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"}, + {"key name is not a base file name (/foo)", map[string]string{"/bar": "foocontent"}, "only base names are allowed"}, + {"used .", map[string]string{".": "foocontent"}, "may not be '.' or '..'"}, + {"used ..", map[string]string{"..": "foocontent"}, "may not be '.' or '..'"}, + {"length violation", map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"}, } - // save the checkpoint - err = store.Save(cpt) - if err != nil { - t.Fatalf("unable to save checkpoint, error: %v", err) - } - - // expect the saved checkpoint file to match the encoding of the checkpoint - data, err := cpt.Encode() - if err != nil { - t.Fatalf("unable to encode the checkpoint, error: %v", err) - } - expect := string(data) - - data = readTestCheckpointFile(t, store.fs, cpt.UID()) - cptFile := string(data) - - if expect != cptFile { - t.Errorf("expect %q but got %q", expect, cptFile) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + // construct the payload + p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{UID: "uid"}, + Data: c.files, + }) + if err != nil { + t.Fatalf("error constructing payload: %v", err) + } + // save the payload + err = store.Save(p) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // read the saved checkpoint + m, err := mapFromCheckpoint(store, p.UID()) + if err != nil { + t.Fatalf("error loading checkpoint to map: %v", err) + } + // compare our expectation to what got saved + expect := p.Files() + if !reflect.DeepEqual(expect, m) { + t.Errorf("expect %v, but got %v", expect, m) + } + }) } } func TestFsStoreLoad(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } - - const uid = "uid" - cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}}) + // encode a kubelet configuration that has all defaults set + expect, err := newKubeletConfiguration() if err != nil { - t.Fatalf("unable to construct checkpoint, error: %v", err) + t.Fatalf("error constructing KubeletConfiguration: %v", err) + } + data, err := utilcodec.EncodeKubeletConfig(expect, v1beta1.SchemeGroupVersion) + if err != nil { + t.Fatalf("error encoding KubeletConfiguration: %v", err) + } + // construct a payload that contains the kubeletconfig + const uid = "uid" + p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}, + Data: map[string]string{ + kubeletKey: string(data), + }, + }) + if err != nil { + t.Fatalf("error constructing payload: %v", err) + } + // save the payload + err = store.Save(p) + if err != nil { + t.Fatalf("error saving payload: %v", err) } cases := []struct { - desc string - loadUID string - cpt checkpoint.Checkpoint - err string + desc string + uid types.UID + err string }{ - {"checkpoint exists", uid, cpt, ""}, - {"checkpoint does not exist", "bogus-uid", nil, "failed to read"}, + {"checkpoint exists", uid, ""}, + {"checkpoint does not exist", "bogus-uid", "no checkpoint for source"}, } for _, c := range cases { - if c.cpt != nil { - saveTestCheckpointFile(t, store.fs, c.cpt) - } - cpt, err := store.Load(c.loadUID) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - if !checkpoint.EqualCheckpoints(c.cpt, cpt) { - t.Errorf("case %q, expect %q but got %q", c.desc, spew.Sdump(c.cpt), spew.Sdump(cpt)) - } - } -} - -func TestFsStoreRoundTrip(t *testing.T) { - store, err := newInitializedFakeFsStore() - if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) - } - const uid = "uid" - cpt, err := checkpoint.NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}}) - if err != nil { - t.Fatalf("unable to construct checkpoint, error: %v", err) - } - err = store.Save(cpt) - if err != nil { - t.Fatalf("unable to save checkpoint, error: %v", err) - } - cptAfter, err := store.Load(uid) - if err != nil { - t.Fatalf("unable to load checkpoint, error: %v", err) - } - if !checkpoint.EqualCheckpoints(cpt, cptAfter) { - t.Errorf("expect %q but got %q", spew.Sdump(cpt), spew.Sdump(cptAfter)) + t.Run(c.desc, func(t *testing.T) { + source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ + ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: c.uid}}) + if err != nil { + t.Fatalf("error constructing remote config source: %v", err) + } + loaded, err := store.Load(source) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + if !reflect.DeepEqual(expect, loaded) { + t.Errorf("expect %#v, but got %#v", expect, loaded) + } + }) } } func TestFsStoreCurrentModified(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } // create an empty current file, this is good enough for testing - saveTestSourceFile(t, store.fs, curFile, nil) + saveTestSourceFile(t, store, currentFile, nil) - // set the timestamps to the current time, so we can compare to result of store.SetCurrentModified + // set the timestamps to the current time, so we can compare to result of store.CurrentModified now := time.Now() - err = store.fs.Chtimes(filepath.Join(testCheckpointsDir, curFile), now, now) + err = store.fs.Chtimes(store.metaPath(currentFile), now, now) if err != nil { t.Fatalf("could not change timestamps, error: %v", err) } @@ -229,7 +287,7 @@ func TestFsStoreCurrentModified(t *testing.T) { func TestFsStoreCurrent(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ @@ -247,24 +305,27 @@ func TestFsStoreCurrent(t *testing.T) { {"non-default source", source, ""}, } for _, c := range cases { - // save the last known good source - saveTestSourceFile(t, store.fs, curFile, c.expect) + t.Run(c.desc, func(t *testing.T) { + // save the last known good source + saveTestSourceFile(t, store, currentFile, c.expect) - // load last-known-good and compare to expected result - source, err := store.Current() - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - if !checkpoint.EqualRemoteConfigSources(c.expect, source) { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) - } + // load last-known-good and compare to expected result + source, err := store.Current() + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + if !checkpoint.EqualRemoteConfigSources(c.expect, source) { + t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) + } + }) } } func TestFsStoreLastKnownGood(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ @@ -282,28 +343,37 @@ func TestFsStoreLastKnownGood(t *testing.T) { {"non-default source", source, ""}, } for _, c := range cases { - // save the last known good source - saveTestSourceFile(t, store.fs, lkgFile, c.expect) + t.Run(c.desc, func(t *testing.T) { + // save the last known good source + saveTestSourceFile(t, store, lastKnownGoodFile, c.expect) - // load last-known-good and compare to expected result - source, err := store.LastKnownGood() - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - if !checkpoint.EqualRemoteConfigSources(c.expect, source) { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) - } + // load last-known-good and compare to expected result + source, err := store.LastKnownGood() + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + if !checkpoint.EqualRemoteConfigSources(c.expect, source) { + t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) + } + }) } } func TestFsStoreSetCurrent(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } const uid = "uid" - expect := fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, uid, "\n") + expect := fmt.Sprintf(`apiVersion: v1 +configMapRef: + name: name + namespace: namespace + uid: %s +kind: NodeConfigSource +`, uid) source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{ Name: "name", Namespace: "namespace", UID: types.UID(uid)}}) if err != nil { @@ -316,97 +386,26 @@ func TestFsStoreSetCurrent(t *testing.T) { } // check that the source saved as we would expect - data := readTestSourceFile(t, store.fs, curFile) + data := readTestSourceFile(t, store, currentFile) if expect != string(data) { t.Errorf("expect current source file to contain %q, but got %q", expect, string(data)) } } -func TestFsStoreSetCurrentUpdated(t *testing.T) { - store, err := newInitializedFakeFsStore() - if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) - } - - cases := []struct { - current string - newCurrent string - expectUpdated bool - err string - }{ - {"", "", false, ""}, - {"uid", "", true, ""}, - {"", "uid", true, ""}, - {"uid", "uid", false, ""}, - {"uid", "other-uid", true, ""}, - {"other-uid", "uid", true, ""}, - {"other-uid", "other-uid", false, ""}, - } - - for _, c := range cases { - // construct current source - var source checkpoint.RemoteConfigSource - expectSource := "" - if len(c.current) > 0 { - expectSource = fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, c.current, "\n") - source, _, err = checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{ - Name: "name", Namespace: "namespace", UID: types.UID(c.current)}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - - // construct new source - var newSource checkpoint.RemoteConfigSource - expectNewSource := "" - if len(c.newCurrent) > 0 { - expectNewSource = fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"new-name","uid":"%s"}}%s`, c.newCurrent, "\n") - newSource, _, err = checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{ - Name: "new-name", Namespace: "namespace", UID: types.UID(c.newCurrent)}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - - // set the initial current - if err := store.SetCurrent(source); err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // update to the new current - updated, err := store.SetCurrentUpdated(newSource) - if utiltest.SkipRest(t, fmt.Sprintf("%q -> %q", c.current, c.newCurrent), err, c.err) { - continue - } - - // check that SetCurrentUpdated correctly reports whether the current checkpoint changed - if c.expectUpdated != updated { - t.Errorf("case %q -> %q, expect %v but got %v", c.current, c.newCurrent, c.expectUpdated, updated) - } - - // check that curFile is saved by SetCurrentUpdated as we expect - data := readTestSourceFile(t, store.fs, curFile) - if c.current == c.newCurrent { - // same UID should leave file unchanged - if expectSource != string(data) { - t.Errorf("case %q -> %q, expect current source file to contain %q, but got %q", c.current, c.newCurrent, expectSource, string(data)) - } - } else if expectNewSource != string(data) { - // otherwise expect the file to change - t.Errorf("case %q -> %q, expect current source file to contain %q, but got %q", c.current, c.newCurrent, expectNewSource, string(data)) - } - } - -} - func TestFsStoreSetLastKnownGood(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } const uid = "uid" - expect := fmt.Sprintf(`{"kind":"NodeConfigSource","apiVersion":"v1","configMapRef":{"namespace":"namespace","name":"name","uid":"%s"}}%s`, uid, "\n") + expect := fmt.Sprintf(`apiVersion: v1 +configMapRef: + name: name + namespace: namespace + uid: %s +kind: NodeConfigSource +`, uid) source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{ Name: "name", Namespace: "namespace", UID: types.UID(uid)}}) if err != nil { @@ -419,7 +418,7 @@ func TestFsStoreSetLastKnownGood(t *testing.T) { } // check that the source saved as we would expect - data := readTestSourceFile(t, store.fs, lkgFile) + data := readTestSourceFile(t, store, lastKnownGoodFile) if expect != string(data) { t.Errorf("expect last-known-good source file to contain %q, but got %q", expect, string(data)) } @@ -428,7 +427,7 @@ func TestFsStoreSetLastKnownGood(t *testing.T) { func TestFsStoreReset(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}) @@ -453,47 +452,49 @@ func TestFsStoreReset(t *testing.T) { {"otherSource -> source", otherSource, source, true}, } for _, c := range cases { - // manually save the sources to their respective files - saveTestSourceFile(t, store.fs, curFile, c.current) - saveTestSourceFile(t, store.fs, lkgFile, c.lastKnownGood) + t.Run(c.desc, func(t *testing.T) { + // manually save the sources to their respective files + saveTestSourceFile(t, store, currentFile, c.current) + saveTestSourceFile(t, store, lastKnownGoodFile, c.lastKnownGood) - // reset - updated, err := store.Reset() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + // reset + updated, err := store.Reset() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } - // make sure the files were emptied - if size := testSourceFileSize(t, store.fs, curFile); size > 0 { - t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, curFile, size) - } - if size := testSourceFileSize(t, store.fs, lkgFile); size > 0 { - t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lkgFile, size) - } + // make sure the files were emptied + if size := testSourceFileSize(t, store, currentFile); size > 0 { + t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, currentFile, size) + } + if size := testSourceFileSize(t, store, lastKnownGoodFile); size > 0 { + t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lastKnownGoodFile, size) + } - // make sure Current() and LastKnownGood() both return nil - current, err := store.Current() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - lastKnownGood, err := store.LastKnownGood() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if current != nil || lastKnownGood != nil { - t.Errorf("case %q, expect nil for current and last-known-good checkpoints, but still have %q and %q, respectively", - c.desc, current, lastKnownGood) - } - if c.updated != updated { - t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated) - } + // make sure Current() and LastKnownGood() both return nil + current, err := store.Current() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + lastKnownGood, err := store.LastKnownGood() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if current != nil || lastKnownGood != nil { + t.Errorf("case %q, expect nil for current and last-known-good checkpoints, but still have %q and %q, respectively", + c.desc, current, lastKnownGood) + } + if c.updated != updated { + t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated) + } + }) } } -func TestFsStoreSourceFromFile(t *testing.T) { +func TestFsStoreReadRemoteConfigSource(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ @@ -513,21 +514,24 @@ func TestFsStoreSourceFromFile(t *testing.T) { const name = "some-source-file" for _, c := range cases { - saveTestSourceFile(t, store.fs, name, c.expect) - source, err := store.sourceFromFile(name) - if utiltest.SkipRest(t, c.desc, err, c.err) { - continue - } - if !checkpoint.EqualRemoteConfigSources(c.expect, source) { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) - } + t.Run(c.desc, func(t *testing.T) { + saveTestSourceFile(t, store, name, c.expect) + source, err := readRemoteConfigSource(store.fs, store.metaPath(name)) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + if !checkpoint.EqualRemoteConfigSources(c.expect, source) { + t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) + } + }) } } -func TestFsStoreSetSourceFile(t *testing.T) { +func TestFsStoreWriteRemoteConfigSource(t *testing.T) { store, err := newInitializedFakeFsStore() if err != nil { - t.Fatalf("failed to construct a store, error: %v", err) + t.Fatalf("error constructing store: %v", err) } source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}) @@ -536,93 +540,112 @@ func TestFsStoreSetSourceFile(t *testing.T) { } cases := []struct { + desc string source checkpoint.RemoteConfigSource }{ - {nil}, - {source}, + {"nil source", nil}, + {"non-nil source", source}, } const name = "some-source-file" for _, c := range cases { - // set the source file - err := store.setSourceFile(name, c.source) - if err != nil { - t.Fatalf("unable to set source file, error: %v", err) - } - // read back the file - data := readTestSourceFile(t, store.fs, name) - str := string(data) - - if c.source != nil { - // expect the contents to match the encoding of the source - data, err := c.source.Encode() - expect := string(data) + t.Run(c.desc, func(t *testing.T) { + // set the source file + err := writeRemoteConfigSource(store.fs, store.metaPath(name), c.source) if err != nil { - t.Fatalf("couldn't encode source, error: %v", err) + t.Fatalf("unable to set source file, error: %v", err) } - if expect != str { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) - } - } else { - // expect empty file - expect := "" - if expect != str { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) + // read back the file + data := readTestSourceFile(t, store, name) + str := string(data) + + if c.source != nil { + // expect the contents to match the encoding of the source + data, err := c.source.Encode() + expect := string(data) + if err != nil { + t.Fatalf("couldn't encode source, error: %v", err) + } + if expect != str { + t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) + } + } else { + // expect empty file + expect := "" + if expect != str { + t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) + } } + }) + } +} + +func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) { + files, err := store.fs.ReadDir(store.checkpointPath(uid)) + if err != nil { + return nil, err + } + m := map[string]string{} + for _, f := range files { + // expect no subdirs, only regular files + if !f.Mode().IsRegular() { + return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid) } + // read the file contents and build the map + data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid), f.Name())) + if err != nil { + return nil, err + } + m[f.Name()] = string(data) } + return m, nil } -func readTestCheckpointFile(t *testing.T, fs utilfs.Filesystem, uid string) []byte { - data, err := fs.ReadFile(filepath.Join(testCheckpointsDir, uid)) - if err != nil { - t.Fatalf("unable to read test checkpoint file, error: %v", err) - } - return data -} - -func saveTestCheckpointFile(t *testing.T, fs utilfs.Filesystem, cpt checkpoint.Checkpoint) { - data, err := cpt.Encode() - if err != nil { - t.Fatalf("unable to encode test checkpoint, error: %v", err) - } - fmt.Println(cpt.UID()) - err = utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, cpt.UID()), data) - if err != nil { - t.Fatalf("unable to save test checkpoint file, error: %v", err) - } -} - -func readTestSourceFile(t *testing.T, fs utilfs.Filesystem, relPath string) []byte { - data, err := fs.ReadFile(filepath.Join(testCheckpointsDir, relPath)) +func readTestSourceFile(t *testing.T, store *fsStore, relPath string) []byte { + data, err := store.fs.ReadFile(store.metaPath(relPath)) if err != nil { t.Fatalf("unable to read test source file, error: %v", err) } return data } -func saveTestSourceFile(t *testing.T, fs utilfs.Filesystem, relPath string, source checkpoint.RemoteConfigSource) { +func saveTestSourceFile(t *testing.T, store *fsStore, relPath string, source checkpoint.RemoteConfigSource) { if source != nil { data, err := source.Encode() if err != nil { t.Fatalf("unable to save test source file, error: %v", err) } - err = utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, relPath), data) + err = utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), data) if err != nil { t.Fatalf("unable to save test source file, error: %v", err) } } else { - err := utilfiles.ReplaceFile(fs, filepath.Join(testCheckpointsDir, relPath), []byte{}) + err := utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), []byte{}) if err != nil { t.Fatalf("unable to save test source file, error: %v", err) } } } -func testSourceFileSize(t *testing.T, fs utilfs.Filesystem, relPath string) int64 { - info, err := fs.Stat(filepath.Join(testCheckpointsDir, relPath)) +func testSourceFileSize(t *testing.T, store *fsStore, relPath string) int64 { + info, err := store.fs.Stat(store.metaPath(relPath)) if err != nil { t.Fatalf("unexpected error: %v", err) } return info.Size() } + +// newKubeletConfiguration will create a new KubeletConfiguration with default values set +func newKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) { + s, _, err := scheme.NewSchemeAndCodecs() + if err != nil { + return nil, err + } + versioned := &v1beta1.KubeletConfiguration{} + s.Default(versioned) + config := &kubeletconfig.KubeletConfiguration{} + if err := s.Convert(versioned, config, nil); err != nil { + return nil, err + } + return config, nil +} diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/store.go b/pkg/kubelet/kubeletconfig/checkpoint/store/store.go index a6bdb736b26..54fef4c6217 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/store.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/store.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint" ) @@ -27,22 +28,25 @@ import ( type Store interface { // Initialize sets up the storage layer Initialize() error - // Exists returns true if a checkpoint with `uid` exists in the store, false otherwise - Exists(uid string) (bool, error) - // Save saves the checkpoint to the storage layer - Save(c checkpoint.Checkpoint) error - // Load loads the checkpoint with UID `uid` from the storage layer, or returns an error if the checkpoint does not exist - Load(uid string) (checkpoint.Checkpoint, error) + + // Exists returns true if the object referenced by `source` has been checkpointed. + Exists(source checkpoint.RemoteConfigSource) (bool, error) + // Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration. + // The following payload types are supported: + // - k8s.io/api/core/v1.ConfigMap + Save(c checkpoint.Payload) error + // Load loads the KubeletConfiguration from the checkpoint referenced by `source`. + Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) + // CurrentModified returns the last time that the current UID was set CurrentModified() (time.Time, error) // Current returns the source that points to the current checkpoint, or nil if no current checkpoint is set Current() (checkpoint.RemoteConfigSource, error) // LastKnownGood returns the source that points to the last-known-good checkpoint, or nil if no last-known-good checkpoint is set LastKnownGood() (checkpoint.RemoteConfigSource, error) + // SetCurrent saves the source that points to the current checkpoint, set to nil to unset SetCurrent(source checkpoint.RemoteConfigSource) error - // SetCurrentUpdated is similar to SetCurrent, but also returns whether the current checkpoint changed as a result - SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) // SetLastKnownGood saves the source that points to the last-known-good checkpoint, set to nil to unset SetLastKnownGood(source checkpoint.RemoteConfigSource) error // Reset unsets the current and last-known-good UIDs and returns whether the current UID was unset as a result of the reset @@ -51,34 +55,15 @@ type Store interface { // reset is a helper for implementing Reset, which can be implemented in terms of Store methods func reset(s Store) (bool, error) { + current, err := s.Current() + if err != nil { + return false, err + } if err := s.SetLastKnownGood(nil); err != nil { return false, fmt.Errorf("failed to reset last-known-good UID in checkpoint store, error: %v", err) } - updated, err := s.SetCurrentUpdated(nil) - if err != nil { + if err := s.SetCurrent(nil); err != nil { return false, fmt.Errorf("failed to reset current UID in checkpoint store, error: %v", err) } - return updated, nil -} - -// setCurrentUpdated is a helper for implementing SetCurrentUpdated, which can be implemented in terms of Store methods -func setCurrentUpdated(s Store, source checkpoint.RemoteConfigSource) (bool, error) { - cur, err := s.Current() - if err != nil { - return false, err - } - - // if both are nil, no need to update - if cur == nil && source == nil { - return false, nil - } - // if UIDs match, no need to update - if (source != nil && cur != nil) && cur.UID() == source.UID() { - return false, nil - } - // update the source - if err := s.SetCurrent(source); err != nil { - return false, err - } - return true, nil + return current != nil, nil } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/store_test.go b/pkg/kubelet/kubeletconfig/checkpoint/store/store_test.go index c269d785890..4d4acb29349 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/store_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/store_test.go @@ -59,38 +59,3 @@ func TestReset(t *testing.T) { } } } - -func TestSetCurrentUpdated(t *testing.T) { - source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "other-name", Namespace: "namespace", UID: "other-uid"}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - cases := []struct { - s *fakeStore - newCurrent checkpoint.RemoteConfigSource - updated bool - }{ - {&fakeStore{current: nil}, nil, false}, - {&fakeStore{current: nil}, source, true}, - {&fakeStore{current: source}, source, false}, - {&fakeStore{current: source}, nil, true}, - {&fakeStore{current: source}, otherSource, true}, - } - for _, c := range cases { - current := c.s.current - updated, err := setCurrentUpdated(c.s, c.newCurrent) - if err != nil { - t.Fatalf("case %q -> %q, unexpected error: %v", current, c.newCurrent, err) - } - if c.newCurrent != c.s.current { - t.Errorf("case %q -> %q, expect current UID to be %q, but got %q", current, c.newCurrent, c.newCurrent, c.s.current) - } - if c.updated != updated { - t.Errorf("case %q -> %q, expect setCurrentUpdated to return %t, but got %t", current, c.newCurrent, c.updated, updated) - } - } -} diff --git a/pkg/kubelet/kubeletconfig/configsync.go b/pkg/kubelet/kubeletconfig/configsync.go index 2584bc2efcc..c7e3b5f8d2f 100644 --- a/pkg/kubelet/kubeletconfig/configsync.go +++ b/pkg/kubelet/kubeletconfig/configsync.go @@ -135,44 +135,48 @@ func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *api // checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist, // if a failure occurs, returns a sanitized failure reason and an error func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) { - uid := source.UID() - // if the checkpoint already exists, skip downloading - if ok, err := cc.checkpointStore.Exists(uid); err != nil { - reason := fmt.Sprintf(status.FailSyncReasonCheckpointExistenceFmt, source.APIPath(), uid) + if ok, err := cc.checkpointStore.Exists(source); err != nil { + reason := fmt.Sprintf(status.FailSyncReasonCheckpointExistenceFmt, source.APIPath(), source.UID()) return reason, fmt.Errorf("%s, error: %v", reason, err) } else if ok { - utillog.Infof("checkpoint already exists for object with UID %q, skipping download", uid) + utillog.Infof("checkpoint already exists for object %s with UID %s, skipping download", source.APIPath(), source.UID()) return "", nil } // download - checkpoint, reason, err := source.Download(client) + payload, reason, err := source.Download(client) if err != nil { return reason, fmt.Errorf("%s, error: %v", reason, err) } // save - err = cc.checkpointStore.Save(checkpoint) + err = cc.checkpointStore.Save(payload) if err != nil { - reason := fmt.Sprintf(status.FailSyncReasonSaveCheckpointFmt, source.APIPath(), checkpoint.UID()) + reason := fmt.Sprintf(status.FailSyncReasonSaveCheckpointFmt, source.APIPath(), payload.UID()) return reason, fmt.Errorf("%s, error: %v", reason, err) } return "", nil } -// setCurrentConfig updates UID of the current checkpoint in the checkpoint store to `uid` and returns whether the -// current UID changed as a result, or a sanitized failure reason and an error. +// setCurrentConfig the current checkpoint config in the store +// returns whether the current config changed as a result, or a sanitized failure reason and an error. func (cc *Controller) setCurrentConfig(source checkpoint.RemoteConfigSource) (bool, string, error) { - updated, err := cc.checkpointStore.SetCurrentUpdated(source) - if err != nil { + failReason := func(s checkpoint.RemoteConfigSource) string { if source == nil { - return false, status.FailSyncReasonSetCurrentLocal, err + return status.FailSyncReasonSetCurrentLocal } - return false, fmt.Sprintf(status.FailSyncReasonSetCurrentUIDFmt, source.APIPath(), source.UID()), err + return fmt.Sprintf(status.FailSyncReasonSetCurrentUIDFmt, source.APIPath(), source.UID()) } - return updated, "", nil + current, err := cc.checkpointStore.Current() + if err != nil { + return false, failReason(source), err + } + if err := cc.checkpointStore.SetCurrent(source); err != nil { + return false, failReason(source), err + } + return !checkpoint.EqualRemoteConfigSources(current, source), "", nil } // resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and diff --git a/pkg/kubelet/kubeletconfig/controller.go b/pkg/kubelet/kubeletconfig/controller.go index 352a310f45d..bd4d705e6ac 100644 --- a/pkg/kubelet/kubeletconfig/controller.go +++ b/pkg/kubelet/kubeletconfig/controller.go @@ -38,7 +38,7 @@ import ( ) const ( - checkpointsDir = "checkpoints" + storeDir = "store" // TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default, // because it is not especially clear where this should live in the API. configTrialDuration = 10 * time.Minute @@ -70,7 +70,7 @@ func NewController(defaultConfig *kubeletconfig.KubeletConfiguration, dynamicCon // channels must have capacity at least 1, since we signal with non-blocking writes pendingConfigSource: make(chan bool, 1), configOk: status.NewConfigOkCondition(), - checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, checkpointsDir)), + checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)), } } @@ -182,28 +182,23 @@ func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.E // a clean success or failure reason that can be reported in the status, and any error that occurs. // If the local config should be used, it will be returned. You should validate local before passing it to this function. func (cc *Controller) loadAssignedConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, string, error) { - src, err := cc.checkpointStore.Current() + source, err := cc.checkpointStore.Current() if err != nil { return nil, nil, fmt.Sprintf(status.CurFailLoadReasonFmt, "unknown"), err } // nil source is the signal to use the local config - if src == nil { - return local, src, status.CurLocalOkayReason, nil + if source == nil { + return local, source, status.CurLocalOkayReason, nil } - curUID := src.UID() - // load from checkpoint - checkpoint, err := cc.checkpointStore.Load(curUID) + // load KubeletConfiguration from checkpoint + kc, err := cc.checkpointStore.Load(source) if err != nil { - return nil, src, fmt.Sprintf(status.CurFailLoadReasonFmt, src.APIPath()), err + return nil, source, fmt.Sprintf(status.CurFailLoadReasonFmt, source.APIPath()), err } - cur, err := checkpoint.Parse() - if err != nil { - return nil, src, fmt.Sprintf(status.CurFailParseReasonFmt, src.APIPath()), err + if err := validation.ValidateKubeletConfiguration(kc); err != nil { + return nil, source, fmt.Sprintf(status.CurFailValidateReasonFmt, source.APIPath()), err } - if err := validation.ValidateKubeletConfiguration(cur); err != nil { - return nil, src, fmt.Sprintf(status.CurFailValidateReasonFmt, src.APIPath()), err - } - return cur, src, status.CurRemoteOkayReason, nil + return kc, source, status.CurRemoteOkayReason, nil } // loadLastKnownGoodConfig loads the Kubelet's last-known-good config, @@ -212,28 +207,23 @@ func (cc *Controller) loadAssignedConfig(local *kubeletconfig.KubeletConfigurati // and any error that occurs. // If the local config should be used, it will be returned. You should validate local before passing it to this function. func (cc *Controller) loadLastKnownGoodConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, error) { - src, err := cc.checkpointStore.LastKnownGood() + source, err := cc.checkpointStore.LastKnownGood() if err != nil { return nil, nil, fmt.Errorf("unable to determine last-known-good config, error: %v", err) } // nil source is the signal to use the local config - if src == nil { - return local, src, nil + if source == nil { + return local, source, nil } - lkgUID := src.UID() // load from checkpoint - checkpoint, err := cc.checkpointStore.Load(lkgUID) + kc, err := cc.checkpointStore.Load(source) if err != nil { - return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailLoadReasonFmt, src.APIPath()), err) + return nil, source, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailLoadReasonFmt, source.APIPath()), err) } - lkg, err := checkpoint.Parse() - if err != nil { - return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailParseReasonFmt, src.APIPath()), err) + if err := validation.ValidateKubeletConfiguration(kc); err != nil { + return nil, source, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailValidateReasonFmt, source.APIPath()), err) } - if err := validation.ValidateKubeletConfiguration(lkg); err != nil { - return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailValidateReasonFmt, src.APIPath()), err) - } - return lkg, src, nil + return kc, source, nil } // initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly @@ -269,14 +259,14 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) { return false, nil } -// graduateAssignedToLastKnownGood sets the last-known-good UID on the checkpointStore -// to the same value as the current UID maintained by the checkpointStore +// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore +// to the same value as the current config maintained by the checkpointStore func (cc *Controller) graduateAssignedToLastKnownGood() error { - curUID, err := cc.checkpointStore.Current() + current, err := cc.checkpointStore.Current() if err != nil { return err } - err = cc.checkpointStore.SetLastKnownGood(curUID) + err = cc.checkpointStore.SetLastKnownGood(current) if err != nil { return err } diff --git a/pkg/kubelet/kubeletconfig/status/status.go b/pkg/kubelet/kubeletconfig/status/status.go index 4cd5db1f7ae..184d916ca09 100644 --- a/pkg/kubelet/kubeletconfig/status/status.go +++ b/pkg/kubelet/kubeletconfig/status/status.go @@ -51,8 +51,6 @@ const ( // CurFailLoadReasonFmt indicates that the Kubelet failed to load the current config checkpoint for an API source CurFailLoadReasonFmt = "failed to load current: %s" - // CurFailParseReasonFmt indicates that the Kubelet failed to parse the current config checkpoint for an API source - CurFailParseReasonFmt = "failed to parse current: %s" // CurFailValidateReasonFmt indicates that the Kubelet failed to validate the current config checkpoint for an API source CurFailValidateReasonFmt = "failed to validate current: %s" @@ -60,8 +58,6 @@ const ( // LkgFailLoadReasonFmt indicates that the Kubelet failed to load the last-known-good config checkpoint for an API source LkgFailLoadReasonFmt = "failed to load last-known-good: %s" - // LkgFailParseReasonFmt indicates that the Kubelet failed to parse the last-known-good config checkpoint for an API source - LkgFailParseReasonFmt = "failed to parse last-known-good: %s" // LkgFailValidateReasonFmt indicates that the Kubelet failed to validate the last-known-good config checkpoint for an API source LkgFailValidateReasonFmt = "failed to validate last-known-good: %s" diff --git a/pkg/kubelet/kubeletconfig/util/codec/BUILD b/pkg/kubelet/kubeletconfig/util/codec/BUILD index 50b123e7a3a..43ea8c21453 100644 --- a/pkg/kubelet/kubeletconfig/util/codec/BUILD +++ b/pkg/kubelet/kubeletconfig/util/codec/BUILD @@ -13,7 +13,9 @@ go_library( "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core/install:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", + "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", ], ) diff --git a/pkg/kubelet/kubeletconfig/util/codec/codec.go b/pkg/kubelet/kubeletconfig/util/codec/codec.go index 3316f955af2..73bc81a6074 100644 --- a/pkg/kubelet/kubeletconfig/util/codec/codec.go +++ b/pkg/kubelet/kubeletconfig/util/codec/codec.go @@ -23,17 +23,45 @@ import ( _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" ) -// TODO(mtaufen): allow an encoder to be injected into checkpoint objects at creation time? (then we could ultimately instantiate only one encoder) +// EncodeKubeletConfig encodes an internal KubeletConfiguration to an external YAML representation +func EncodeKubeletConfig(internal *kubeletconfig.KubeletConfiguration, targetVersion schema.GroupVersion) ([]byte, error) { + encoder, err := newKubeletConfigYAMLEncoder(targetVersion) + if err != nil { + return nil, err + } + // encoder will convert to external version + data, err := runtime.Encode(encoder, internal) + if err != nil { + return nil, err + } + return data, nil +} -// NewJSONEncoder generates a new runtime.Encoder that encodes objects to JSON -func NewJSONEncoder(groupName string) (runtime.Encoder, error) { - // encode to json - mediaType := "application/json" +// newKubeletConfigYAMLEncoder returns an encoder that can write a KubeletConfig to YAML +func newKubeletConfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Encoder, error) { + _, codecs, err := scheme.NewSchemeAndCodecs() + if err != nil { + return nil, err + } + mediaType := "application/yaml" + info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType) + if !ok { + return nil, fmt.Errorf("unsupported media type %q", mediaType) + } + return codecs.EncoderForVersion(info.Serializer, targetVersion), nil +} + +// NewYAMLEncoder generates a new runtime.Encoder that encodes objects to YAML +func NewYAMLEncoder(groupName string) (runtime.Encoder, error) { + // encode to YAML + mediaType := "application/yaml" info, ok := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType) if !ok { return nil, fmt.Errorf("unsupported media type %q", mediaType) diff --git a/pkg/kubelet/kubeletconfig/util/files/BUILD b/pkg/kubelet/kubeletconfig/util/files/BUILD index ae93b6f99e3..5f6e0c1e0f9 100644 --- a/pkg/kubelet/kubeletconfig/util/files/BUILD +++ b/pkg/kubelet/kubeletconfig/util/files/BUILD @@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -24,3 +25,13 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["files_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/kubeletconfig/util/test:go_default_library", + "//pkg/util/filesystem:go_default_library", + ], +) diff --git a/pkg/kubelet/kubeletconfig/util/files/files.go b/pkg/kubelet/kubeletconfig/util/files/files.go index aa76e151d23..8fd19ce860b 100644 --- a/pkg/kubelet/kubeletconfig/util/files/files.go +++ b/pkg/kubelet/kubeletconfig/util/files/files.go @@ -24,7 +24,10 @@ import ( utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) -const defaultPerm = 0666 +const ( + defaultPerm = 0755 + tmptag = "tmp_" // additional prefix to prevent accidental collisions +) // FileExists returns true if a regular file exists at `path`, false if `path` does not exist, otherwise an error func FileExists(fs utilfs.Filesystem, path string) (bool, error) { @@ -66,7 +69,7 @@ func EnsureFile(fs utilfs.Filesystem, path string) error { // WriteTmpFile creates a temporary file at `path`, writes `data` into it, and fsyncs the file func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath string, retErr error) { dir := filepath.Dir(path) - prefix := filepath.Base(path) + prefix := tmptag + filepath.Base(path) // create the tmp file tmpFile, err := fs.TempFile(dir, prefix) @@ -81,7 +84,7 @@ func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath strin // if there was an error writing, syncing, or closing, delete the temporary file and return the error if retErr != nil { if err := fs.Remove(tmpPath); err != nil { - retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", path, retErr, err) + retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", tmpPath, retErr, err) } tmpPath = "" } @@ -137,3 +140,88 @@ func EnsureDir(fs utilfs.Filesystem, path string) error { // create the dir return fs.MkdirAll(path, defaultPerm) } + +// WriteTempDir creates a temporary dir at `path`, writes `files` into it, and fsyncs all the files +// The keys of `files` represent file names. These names must not: +// - be empty +// - be a path that contains more than the base name of a file (e.g. foo/bar is invalid, as is /bar) +// - match `.` or `..` exactly +// - be longer than 255 characters +// The above validation rules are based on atomic_writer.go, though in this case are more restrictive +// because we only allow a flat hierarchy. +func WriteTempDir(fs utilfs.Filesystem, path string, files map[string]string) (tmpPath string, retErr error) { + // validate the filename keys; for now we only allow a flat keyset + for name := range files { + // invalidate empty names + if name == "" { + return "", fmt.Errorf("invalid file key: must not be empty: %q", name) + } + // invalidate: foo/bar and /bar + if name != filepath.Base(name) { + return "", fmt.Errorf("invalid file key %q, only base names are allowed", name) + } + // invalidate `.` and `..` + if name == "." || name == ".." { + return "", fmt.Errorf("invalid file key, may not be '.' or '..'") + } + // invalidate length > 255 characters + if len(name) > 255 { + return "", fmt.Errorf("invalid file key %q, must be less than 255 characters", name) + } + } + + // write the temp directory in parent dir and return path to the tmp directory + dir := filepath.Dir(path) + prefix := tmptag + filepath.Base(path) + + // create the tmp dir + var err error + tmpPath, err = fs.TempDir(dir, prefix) + if err != nil { + return "", err + } + // be sure to clean up if there was an error + defer func() { + if retErr != nil { + if err := fs.RemoveAll(tmpPath); err != nil { + retErr = fmt.Errorf("attempted to remove temporary directory %q after error %v, but failed due to error: %v", tmpPath, retErr, err) + } + } + }() + // write data + for name, data := range files { + // create the file + file, err := fs.Create(filepath.Join(tmpPath, name)) + if err != nil { + return tmpPath, err + } + // be sure to close the file when we're done + defer func() { + // close the file when we're done, don't overwrite primary retErr if close fails + if err := file.Close(); retErr == nil { + retErr = err + } + }() + // write the file + if _, err := file.Write([]byte(data)); err != nil { + return tmpPath, err + } + // sync the file, to ensure it's written in case a hard reset happens + if err := file.Sync(); err != nil { + return tmpPath, err + } + } + return tmpPath, nil +} + +// ReplaceDir replaces the contents of the dir at `path` with `files` by writing to a tmp dir in the same +// dir as `path` and renaming the tmp dir over `path`. The dir does not have to exist to use ReplaceDir. +func ReplaceDir(fs utilfs.Filesystem, path string, files map[string]string) error { + // write data to a temporary directory + tmpPath, err := WriteTempDir(fs, path, files) + if err != nil { + return err + } + // rename over target directory + return fs.Rename(tmpPath, path) +} diff --git a/pkg/kubelet/kubeletconfig/util/files/files_test.go b/pkg/kubelet/kubeletconfig/util/files/files_test.go new file mode 100644 index 00000000000..ba81b82ec3e --- /dev/null +++ b/pkg/kubelet/kubeletconfig/util/files/files_test.go @@ -0,0 +1,293 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package files + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" +) + +const ( + prefix = "test-util-files" +) + +type file struct { + name string + // mode distinguishes file type, + // we only check for regular vs. directory in these tests, + // specify regular as 0, directory as os.ModeDir + mode os.FileMode + data string // ignored if mode == os.ModeDir +} + +func (f *file) write(fs utilfs.Filesystem, dir string) error { + path := filepath.Join(dir, f.name) + if f.mode.IsDir() { + if err := fs.MkdirAll(path, defaultPerm); err != nil { + return err + } + } else if f.mode.IsRegular() { + // create parent directories, if necessary + parents := filepath.Dir(path) + if err := fs.MkdirAll(parents, defaultPerm); err != nil { + return err + } + // create the file + handle, err := fs.Create(path) + if err != nil { + return err + } + _, err = handle.Write([]byte(f.data)) + if err != nil { + if cerr := handle.Close(); cerr != nil { + return fmt.Errorf("error %v closing file after error: %v", cerr, err) + } + return err + } + } else { + return fmt.Errorf("mode not implemented for testing %s", f.mode.String()) + } + return nil +} + +func (f *file) expect(fs utilfs.Filesystem, dir string) error { + path := filepath.Join(dir, f.name) + if f.mode.IsDir() { + info, err := fs.Stat(path) + if err != nil { + return err + } + if !info.IsDir() { + return fmt.Errorf("expected directory, got mode %s", info.Mode().String()) + } + } else if f.mode.IsRegular() { + info, err := fs.Stat(path) + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return fmt.Errorf("expected regular file, got mode %s", info.Mode().String()) + } + data, err := fs.ReadFile(path) + if err != nil { + return err + } + if f.data != string(data) { + return fmt.Errorf("expected file data %q, got %q", f.data, string(data)) + } + } else { + return fmt.Errorf("mode not implemented for testing %s", f.mode.String()) + } + return nil +} + +// write files, perform some function, then attempt to read files back +// if err is non-empty, expects an error from the function performed in the test +// and skips reading back the expected files +type test struct { + desc string + writes []file + expects []file + fn func(fs utilfs.Filesystem, dir string, c *test) []error + err string +} + +func (c *test) write(t *testing.T, fs utilfs.Filesystem, dir string) { + for _, f := range c.writes { + if err := f.write(fs, dir); err != nil { + t.Fatalf("error pre-writing file: %v", err) + } + } +} + +// you can optionally skip calling t.Errorf by passing a nil t, and process the +// returned errors instead +func (c *test) expect(t *testing.T, fs utilfs.Filesystem, dir string) []error { + errs := []error{} + for _, f := range c.expects { + if err := f.expect(fs, dir); err != nil { + msg := fmt.Errorf("expect %#v, got error: %v", f, err) + errs = append(errs, msg) + if t != nil { + t.Errorf("%s", msg) + } + } + } + return errs +} + +// run a test case, with an arbitrary function to execute between write and expect +// if c.fn is nil, errors from c.expect are checked against c.err, instead of errors +// from fn being checked against c.err +func (c *test) run(t *testing.T, fs utilfs.Filesystem) { + // isolate each test case in a new temporary directory + dir, err := fs.TempDir("", prefix) + if err != nil { + t.Fatalf("error creating temporary directory for test: %v", err) + } + c.write(t, fs, dir) + // if fn exists, check errors from fn, then check expected files + if c.fn != nil { + errs := c.fn(fs, dir, c) + if len(errs) > 0 { + for _, err := range errs { + utiltest.ExpectError(t, err, c.err) + } + // skip checking expected files if we expected errors + // (usually means we didn't create file) + return + } + c.expect(t, fs, dir) + return + } + // just check expected files, and compare errors from c.expect to c.err + // (this lets us test the helper functions above) + errs := c.expect(nil, fs, dir) + for _, err := range errs { + utiltest.ExpectError(t, err, c.err) + } +} + +// simple test of the above helper functions +func TestHelpers(t *testing.T) { + // omitting the test.fn means test.err is compared to errors from test.expect + cases := []test{ + { + desc: "regular file", + writes: []file{{name: "foo", data: "bar"}}, + expects: []file{{name: "foo", data: "bar"}}, + }, + { + desc: "directory", + writes: []file{{name: "foo", mode: os.ModeDir}}, + expects: []file{{name: "foo", mode: os.ModeDir}}, + }, + { + desc: "deep regular file", + writes: []file{{name: "foo/bar", data: "baz"}}, + expects: []file{{name: "foo/bar", data: "baz"}}, + }, + { + desc: "deep directory", + writes: []file{{name: "foo/bar", mode: os.ModeDir}}, + expects: []file{{name: "foo/bar", mode: os.ModeDir}}, + }, + { + desc: "missing file", + expects: []file{{name: "foo", data: "bar"}}, + err: "no such file or directory", + }, + { + desc: "missing directory", + expects: []file{{name: "foo/bar", mode: os.ModeDir}}, + err: "no such file or directory", + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + c.run(t, utilfs.DefaultFs{}) + }) + } +} + +func TestWriteTempDir(t *testing.T) { + // writing a tmp dir is covered by TestReplaceDir, but we additionally test filename validation here + c := test{ + desc: "invalid file key", + err: "invalid file key", + fn: func(fs utilfs.Filesystem, dir string, c *test) []error { + if _, err := WriteTempDir(fs, filepath.Join(dir, "tmpdir"), map[string]string{"foo/bar": ""}); err != nil { + return []error{err} + } + return nil + }, + } + c.run(t, utilfs.DefaultFs{}) +} + +func TestReplaceDir(t *testing.T) { + fn := func(fs utilfs.Filesystem, dir string, c *test) []error { + errs := []error{} + + // compute filesets from expected files and call ReplaceDir for each + // we don't nest dirs in test cases, order of ReplaceDir call is not guaranteed + dirs := map[string]map[string]string{} + + // allocate dirs + for _, f := range c.expects { + if f.mode.IsDir() { + path := filepath.Join(dir, f.name) + if _, ok := dirs[path]; !ok { + dirs[path] = map[string]string{} + } + } else if f.mode.IsRegular() { + path := filepath.Join(dir, filepath.Dir(f.name)) + if _, ok := dirs[path]; !ok { + // require an expectation for the parent directory if there is an expectation for the file + errs = append(errs, fmt.Errorf("no prior parent directory in c.expects for file %s", f.name)) + continue + } + dirs[path][filepath.Base(f.name)] = f.data + } + } + + // short-circuit test case validation errors + if len(errs) > 0 { + return errs + } + + // call ReplaceDir for each desired dir + for path, files := range dirs { + if err := ReplaceDir(fs, path, files); err != nil { + errs = append(errs, err) + } + } + return errs + } + cases := []test{ + { + fn: fn, + desc: "fn catches invalid test case", + expects: []file{{name: "foo/bar"}}, + err: "no prior parent directory", + }, + { + fn: fn, + desc: "empty dir", + expects: []file{{name: "foo", mode: os.ModeDir}}, + }, + { + fn: fn, + desc: "dir with files", + expects: []file{ + {name: "foo", mode: os.ModeDir}, + {name: "foo/bar", data: "baz"}, + {name: "foo/baz", data: "bar"}, + }, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + c.run(t, utilfs.DefaultFs{}) + }) + } +} diff --git a/pkg/kubelet/kubeletconfig/util/test/test.go b/pkg/kubelet/kubeletconfig/util/test/test.go index 1c18e8344e6..789246ecd02 100644 --- a/pkg/kubelet/kubeletconfig/util/test/test.go +++ b/pkg/kubelet/kubeletconfig/util/test/test.go @@ -21,6 +21,21 @@ import ( "testing" ) +// ExpectError calls t.Fatalf if the error does not contain a substr match. +// If substr is empty, a nil error is expected. +// It is useful to call ExpectError from subtests. +func ExpectError(t *testing.T, err error, substr string) { + if err != nil { + if len(substr) == 0 { + t.Fatalf("expect nil error but got %q", err.Error()) + } else if !strings.Contains(err.Error(), substr) { + t.Fatalf("expect error to contain %q but got %q", substr, err.Error()) + } + } else if len(substr) > 0 { + t.Fatalf("expect error to contain %q but got nil error", substr) + } +} + // SkipRest returns true if there was a non-nil error or if we expected an error that didn't happen, // and logs the appropriate error on the test object. // The return value indicates whether we should skip the rest of the test case due to the error result. diff --git a/pkg/util/filesystem/defaultfs.go b/pkg/util/filesystem/defaultfs.go index f87fe3d6dff..f621be83e6b 100644 --- a/pkg/util/filesystem/defaultfs.go +++ b/pkg/util/filesystem/defaultfs.go @@ -72,6 +72,11 @@ func (DefaultFs) ReadFile(filename string) ([]byte, error) { return ioutil.ReadFile(filename) } +// TempDir via ioutil.TempDir +func (DefaultFs) TempDir(dir, prefix string) (string, error) { + return ioutil.TempDir(dir, prefix) +} + // TempFile via ioutil.TempFile func (DefaultFs) TempFile(dir, prefix string) (File, error) { file, err := ioutil.TempFile(dir, prefix) diff --git a/pkg/util/filesystem/fakefs.go b/pkg/util/filesystem/fakefs.go index b103007a557..d86b31b6a9a 100644 --- a/pkg/util/filesystem/fakefs.go +++ b/pkg/util/filesystem/fakefs.go @@ -68,6 +68,11 @@ func (fs *fakeFs) ReadFile(filename string) ([]byte, error) { return fs.a.ReadFile(filename) } +// TempDir via afero.TempDir +func (fs *fakeFs) TempDir(dir, prefix string) (string, error) { + return fs.a.TempDir(dir, prefix) +} + // TempFile via afero.TempFile func (fs *fakeFs) TempFile(dir, prefix string) (File, error) { file, err := fs.a.TempFile(dir, prefix) diff --git a/pkg/util/filesystem/filesystem.go b/pkg/util/filesystem/filesystem.go index 16e329b30f1..9b25e14b9f5 100644 --- a/pkg/util/filesystem/filesystem.go +++ b/pkg/util/filesystem/filesystem.go @@ -35,6 +35,7 @@ type Filesystem interface { // from "io/ioutil" ReadFile(filename string) ([]byte, error) + TempDir(dir, prefix string) (string, error) TempFile(dir, prefix string) (File, error) ReadDir(dirname string) ([]os.FileInfo, error) Walk(root string, walkFn filepath.WalkFunc) error diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index 6af1f05f8bb..892d09ee9d1 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -30,11 +30,11 @@ go_library( "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", - "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", "//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm/devicemanager:go_default_library", + "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/remote:go_default_library", "//test/e2e/common:go_default_library", @@ -54,7 +54,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", diff --git a/test/e2e_node/dynamic_kubelet_config_test.go b/test/e2e_node/dynamic_kubelet_config_test.go index cdc419d10f5..b3af81a665a 100644 --- a/test/e2e_node/dynamic_kubelet_config_test.go +++ b/test/e2e_node/dynamic_kubelet_config_test.go @@ -180,7 +180,7 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube Name: failParseConfigMap.Name}}, expectConfigOk: &apiv1.NodeCondition{Type: apiv1.NodeKubeletConfigOk, Status: apiv1.ConditionFalse, Message: status.LkgLocalMessage, - Reason: fmt.Sprintf(status.CurFailParseReasonFmt, configMapAPIPath(failParseConfigMap))}, + Reason: fmt.Sprintf(status.CurFailLoadReasonFmt, configMapAPIPath(failParseConfigMap))}, expectConfig: nil, event: true, }, @@ -248,7 +248,7 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube Name: badConfigMap.Name}}, expectConfigOk: &apiv1.NodeCondition{Type: apiv1.NodeKubeletConfigOk, Status: apiv1.ConditionFalse, Message: fmt.Sprintf(status.LkgRemoteMessageFmt, configMapAPIPath(lkgConfigMap)), - Reason: fmt.Sprintf(status.CurFailParseReasonFmt, configMapAPIPath(badConfigMap))}, + Reason: fmt.Sprintf(status.CurFailLoadReasonFmt, configMapAPIPath(badConfigMap))}, expectConfig: lkgKC, event: true, }, diff --git a/test/e2e_node/services/BUILD b/test/e2e_node/services/BUILD index 4614e13e4fe..8fd71b48828 100644 --- a/test/e2e_node/services/BUILD +++ b/test/e2e_node/services/BUILD @@ -27,8 +27,8 @@ go_library( "//pkg/controller/namespace:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", - "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", "//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library", + "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e_node/builder:go_default_library", "//test/e2e_node/remote:go_default_library", @@ -41,7 +41,6 @@ go_library( "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", diff --git a/test/e2e_node/services/kubelet.go b/test/e2e_node/services/kubelet.go index 4194ffb32c9..337e52d724d 100644 --- a/test/e2e_node/services/kubelet.go +++ b/test/e2e_node/services/kubelet.go @@ -30,14 +30,13 @@ import ( "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" - "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" - "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" + kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" + kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e_node/builder" "k8s.io/kubernetes/test/e2e_node/remote" @@ -354,21 +353,7 @@ func addKubeletConfigFlags(cmdArgs *[]string, kc *kubeletconfig.KubeletConfigura // writeKubeletConfigFile writes the kubelet config file based on the args and returns the filename func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error { - // extract the KubeletConfiguration and convert to versioned - versioned := &v1beta1.KubeletConfiguration{} - scheme, _, err := scheme.NewSchemeAndCodecs() - if err != nil { - return err - } - if err := scheme.Convert(internal, versioned, nil); err != nil { - return err - } - // encode - encoder, err := newKubeletConfigJSONEncoder() - if err != nil { - return err - } - data, err := runtime.Encode(encoder, versioned) + data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion) if err != nil { return err } @@ -384,20 +369,6 @@ func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path s return nil } -func newKubeletConfigJSONEncoder() (runtime.Encoder, error) { - _, kubeletCodecs, err := scheme.NewSchemeAndCodecs() - if err != nil { - return nil, err - } - - mediaType := "application/json" - info, ok := runtime.SerializerInfoForMediaType(kubeletCodecs.SupportedMediaTypes(), mediaType) - if !ok { - return nil, fmt.Errorf("unsupported media type %q", mediaType) - } - return kubeletCodecs.EncoderForVersion(info.Serializer, v1beta1.SchemeGroupVersion), nil -} - // createPodDirectory creates pod directory. func createPodDirectory() (string, error) { cwd, err := os.Getwd() diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 8006fae5504..1faf7233795 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -32,16 +32,15 @@ import ( apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" "k8s.io/kubernetes/pkg/features" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" - kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm" + kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/remote" "k8s.io/kubernetes/test/e2e/framework" @@ -299,17 +298,7 @@ func createConfigMap(f *framework.Framework, internalKC *kubeletconfig.KubeletCo // constructs a ConfigMap, populating one of its keys with the KubeletConfiguration. Always uses GenerateName to generate a suffix. func newKubeletConfigMap(name string, internalKC *kubeletconfig.KubeletConfiguration) *apiv1.ConfigMap { - scheme, _, err := kubeletscheme.NewSchemeAndCodecs() - framework.ExpectNoError(err) - - versioned := &kubeletconfigv1beta1.KubeletConfiguration{} - err = scheme.Convert(internalKC, versioned, nil) - framework.ExpectNoError(err) - - encoder, err := newKubeletConfigJSONEncoder() - framework.ExpectNoError(err) - - data, err := runtime.Encode(encoder, versioned) + data, err := kubeletconfigcodec.EncodeKubeletConfig(internalKC, kubeletconfigv1beta1.SchemeGroupVersion) framework.ExpectNoError(err) cmap := &apiv1.ConfigMap{ @@ -353,20 +342,6 @@ func logKubeletMetrics(metricKeys ...string) { } } -func newKubeletConfigJSONEncoder() (runtime.Encoder, error) { - _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - return nil, err - } - - mediaType := "application/json" - info, ok := runtime.SerializerInfoForMediaType(kubeletCodecs.SupportedMediaTypes(), mediaType) - if !ok { - return nil, fmt.Errorf("unsupported media type %q", mediaType) - } - return kubeletCodecs.EncoderForVersion(info.Serializer, kubeletconfigv1beta1.SchemeGroupVersion), nil -} - // runCommand runs the cmd and returns the combined stdout and stderr, or an // error if the command failed. func runCommand(cmd ...string) (string, error) {