diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 90ea87b1dda..5e2300ddbb8 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -42,6 +42,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" jsonpatch "gopkg.in/evanphx/json-patch.v4" + "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -51,9 +52,11 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.12.0" oteltrace "go.opentelemetry.io/otel/trace" noopoteltrace "go.opentelemetry.io/otel/trace/noop" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -94,6 +97,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" + kubeletconfigv1beta1conversion "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1" kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" @@ -316,7 +320,15 @@ is checked every 20 seconds (also configurable with a flag).`, // potentially overriding the previous values. func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error { const dropinFileExtension = ".conf" - baseKubeletConfigJSON, err := json.Marshal(kubeletConfig) + + // TODO: move the "internal --> versioned --> merge --> default --> internal" operation inside the loop, + // and use the version of each drop-in file as the versioned target once config files can be in versions other than just v1beta1 + versionedConfig := &kubeletconfigv1beta1.KubeletConfiguration{} + if err := kubeletconfigv1beta1conversion.Convert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(kubeletConfig, versionedConfig, nil); err != nil { + return err + } + + baseKubeletConfigJSON, err := json.Marshal(versionedConfig) if err != nil { return fmt.Errorf("failed to marshal base config: %w", err) } @@ -326,10 +338,17 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf return err } if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension { - dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path) + dropinConfigJSON, gvk, err := loadDropinConfigFileIntoJSON(path) if err != nil { return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err) } + if gvk == nil || gvk.Empty() { + return fmt.Errorf("failed to load kubelet dropin file, path: %s, no apiVersion/kind", path) + } + // TODO: expand this once more than a single kubelet config version exists + if *gvk != kubeletconfigv1beta1.SchemeGroupVersion.WithKind("KubeletConfiguration") { + return fmt.Errorf("failed to load kubelet dropin file, path: %s, unknown apiVersion/kind: %v", path, gvk.String()) + } mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON) if err != nil { return fmt.Errorf("failed to merge drop-in and current config: %w", err) @@ -341,9 +360,18 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err) } - if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil { + // reset versioned config and decode + versionedConfig = &kubeletconfigv1beta1.KubeletConfiguration{} + if err := json.Unmarshal(baseKubeletConfigJSON, versionedConfig); err != nil { return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err) } + // apply defaulting after decoding + kubeletconfigv1beta1conversion.SetDefaults_KubeletConfiguration(versionedConfig) + // convert back to internal config + if err := kubeletconfigv1beta1conversion.Convert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(versionedConfig, kubeletConfig, nil); err != nil { + return fmt.Errorf("failed to convert merged config to internal kubelet configuration: %w", err) + } + return nil } @@ -423,16 +451,16 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e return kc, err } -func loadDropinConfigFileIntoJSON(name string) ([]byte, error) { +func loadDropinConfigFileIntoJSON(name string) ([]byte, *schema.GroupVersionKind, error) { const errFmt = "failed to load drop-in kubelet config file %s, error %v" // compute absolute path based on current working dir kubeletConfigFile, err := filepath.Abs(name) if err != nil { - return nil, fmt.Errorf(errFmt, name, err) + return nil, nil, fmt.Errorf(errFmt, name, err) } loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile) if err != nil { - return nil, fmt.Errorf(errFmt, name, err) + return nil, nil, fmt.Errorf(errFmt, name, err) } return loader.LoadIntoJSON() } diff --git a/cmd/kubelet/app/server_test.go b/cmd/kubelet/app/server_test.go index 3513c5fc7ed..c3a0d24f161 100644 --- a/cmd/kubelet/app/server_test.go +++ b/cmd/kubelet/app/server_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/cmd/kubelet/app/options" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -80,6 +81,7 @@ func TestMergeKubeletConfigurations(t *testing.T) { overwrittenConfigFields map[string]interface{} cliArgs []string name string + expectMergeError string }{ { kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{ @@ -222,6 +224,55 @@ readOnlyPort: 10255 }, name: "cli args override kubelet.conf", }, + { + kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{ + ResolverConfig: "original", + Port: 123, + ReadOnlyPort: 234, + }, + dropin1: ` +apiVersion: kubelet.config.k8s.io/v1beta1 +kind: KubeletConfiguration +resolvConf: dropin1 +`, + dropin2: ` +apiVersion: kubelet.config.k8s.io/v1beta1 +kind: KubeletConfiguration +port: 0 +`, + overwrittenConfigFields: map[string]interface{}{ + "ResolverConfig": "dropin1", // overridden by dropin1 + "Port": int32(10250), // reset to 0 by dropin2, then re-defaulted + "ReadOnlyPort": int32(234), // preserved from original config + }, + name: "json conversion is correct", + }, + { + kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{}, + dropin1: ` +apiVersion: kubelet.config.k8s.io/v1beta2 +kind: KubeletConfiguration +`, + name: "invalid drop-in apiVersion", + expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta2, Kind=KubeletConfiguration`, + }, + { + kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{}, + dropin1: ` +apiVersion: kubelet.config.k8s.io/v1beta1 +kind: KubeletConfiguration2 +`, + name: "invalid drop-in kind", + expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta1, Kind=KubeletConfiguration2`, + }, + { + kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{}, + dropin1: ` +port: 123 +`, + name: "empty drop-in apiVersion/kind", + expectMergeError: `'Kind' is missing`, + }, } for _, test := range testCases { @@ -248,15 +299,24 @@ readOnlyPort: 10255 err := os.Mkdir(kubeletConfDir, 0755) require.NoError(t, err, "Failed to create kubelet.conf.d directory") - err = os.WriteFile(filepath.Join(kubeletConfDir, "10-kubelet.conf"), []byte(test.dropin1), 0644) - require.NoError(t, err, "failed to create config from a yaml file") + if len(test.dropin1) > 0 { + err = os.WriteFile(filepath.Join(kubeletConfDir, "10-kubelet.conf"), []byte(test.dropin1), 0644) + require.NoError(t, err, "failed to create config from a yaml file") + } - err = os.WriteFile(filepath.Join(kubeletConfDir, "20-kubelet.conf"), []byte(test.dropin2), 0644) - require.NoError(t, err, "failed to create config from a yaml file") + if len(test.dropin2) > 0 { + err = os.WriteFile(filepath.Join(kubeletConfDir, "20-kubelet.conf"), []byte(test.dropin2), 0644) + require.NoError(t, err, "failed to create config from a yaml file") + } // Merge the kubelet configurations err = mergeKubeletConfigurations(kubeletConfig, kubeletConfDir) - require.NoError(t, err, "failed to merge kubelet drop-in configs") + if test.expectMergeError == "" { + require.NoError(t, err, "failed to merge kubelet drop-in configs") + } else { + require.Error(t, err) + require.Contains(t, err.Error(), test.expectMergeError) + } } // Use kubelet config flag precedence diff --git a/pkg/kubelet/kubeletconfig/configfiles/configfiles.go b/pkg/kubelet/kubeletconfig/configfiles/configfiles.go index 4e8fd8ea021..38d446b23cd 100644 --- a/pkg/kubelet/kubeletconfig/configfiles/configfiles.go +++ b/pkg/kubelet/kubeletconfig/configfiles/configfiles.go @@ -20,6 +20,7 @@ import ( "fmt" "path/filepath" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" @@ -33,7 +34,7 @@ type Loader interface { Load() (*kubeletconfig.KubeletConfiguration, error) // LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be // loaded. It returns the configuration as a JSON byte slice - LoadIntoJSON() ([]byte, error) + LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error) } // fsLoader loads configuration from `configDir` @@ -81,15 +82,15 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) { return kc, nil } -func (loader *fsLoader) LoadIntoJSON() ([]byte, error) { +func (loader *fsLoader) LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error) { data, err := loader.fs.ReadFile(loader.kubeletFile) if err != nil { - return nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %v", loader.kubeletFile, err) + return nil, nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %w", loader.kubeletFile, err) } // no configuration is an error, some parameters are required if len(data) == 0 { - return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile) + return nil, nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile) } return utilcodec.DecodeKubeletConfigurationIntoJSON(loader.kubeletCodecs, data) diff --git a/pkg/kubelet/kubeletconfig/util/codec/codec.go b/pkg/kubelet/kubeletconfig/util/codec/codec.go index ba4b41cfd85..7bced679cd6 100644 --- a/pkg/kubelet/kubeletconfig/util/codec/codec.go +++ b/pkg/kubelet/kubeletconfig/util/codec/codec.go @@ -109,14 +109,15 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b } // DecodeKubeletConfigurationIntoJSON decodes a serialized KubeletConfiguration to the internal type. -func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, error) { +func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, *schema.GroupVersionKind, error) { // The UniversalDecoder runs defaulting and returns the internal type by default. - obj, _, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{}) + obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{}) if err != nil { - return nil, err + return nil, nil, err } objT := obj.(*unstructured.Unstructured) - return json.Marshal(objT.Object) + jsonData, err := json.Marshal(objT.Object) + return jsonData, gvk, err }