Merge pull request #127421 from liggitt/kubelet-merge

Merge kubelet configuration files using versioned serialized data
This commit is contained in:
Kubernetes Prow Robot 2024-09-17 19:04:17 +01:00 committed by GitHub
commit dd820a13a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 109 additions and 19 deletions

View File

@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
jsonpatch "gopkg.in/evanphx/json-patch.v4"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
@ -51,9 +52,11 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
oteltrace "go.opentelemetry.io/otel/trace"
noopoteltrace "go.opentelemetry.io/otel/trace/noop"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -94,6 +97,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
kubeletconfigv1beta1conversion "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
@ -316,7 +320,15 @@ is checked every 20 seconds (also configurable with a flag).`,
// potentially overriding the previous values.
func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error {
const dropinFileExtension = ".conf"
baseKubeletConfigJSON, err := json.Marshal(kubeletConfig)
// TODO: move the "internal --> versioned --> merge --> default --> internal" operation inside the loop,
// and use the version of each drop-in file as the versioned target once config files can be in versions other than just v1beta1
versionedConfig := &kubeletconfigv1beta1.KubeletConfiguration{}
if err := kubeletconfigv1beta1conversion.Convert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(kubeletConfig, versionedConfig, nil); err != nil {
return err
}
baseKubeletConfigJSON, err := json.Marshal(versionedConfig)
if err != nil {
return fmt.Errorf("failed to marshal base config: %w", err)
}
@ -326,10 +338,17 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf
return err
}
if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension {
dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path)
dropinConfigJSON, gvk, err := loadDropinConfigFileIntoJSON(path)
if err != nil {
return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err)
}
if gvk == nil || gvk.Empty() {
return fmt.Errorf("failed to load kubelet dropin file, path: %s, no apiVersion/kind", path)
}
// TODO: expand this once more than a single kubelet config version exists
if *gvk != kubeletconfigv1beta1.SchemeGroupVersion.WithKind("KubeletConfiguration") {
return fmt.Errorf("failed to load kubelet dropin file, path: %s, unknown apiVersion/kind: %v", path, gvk.String())
}
mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON)
if err != nil {
return fmt.Errorf("failed to merge drop-in and current config: %w", err)
@ -341,9 +360,18 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf
return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err)
}
if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil {
// reset versioned config and decode
versionedConfig = &kubeletconfigv1beta1.KubeletConfiguration{}
if err := json.Unmarshal(baseKubeletConfigJSON, versionedConfig); err != nil {
return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err)
}
// apply defaulting after decoding
kubeletconfigv1beta1conversion.SetDefaults_KubeletConfiguration(versionedConfig)
// convert back to internal config
if err := kubeletconfigv1beta1conversion.Convert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(versionedConfig, kubeletConfig, nil); err != nil {
return fmt.Errorf("failed to convert merged config to internal kubelet configuration: %w", err)
}
return nil
}
@ -423,16 +451,16 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e
return kc, err
}
func loadDropinConfigFileIntoJSON(name string) ([]byte, error) {
func loadDropinConfigFileIntoJSON(name string) ([]byte, *schema.GroupVersionKind, error) {
const errFmt = "failed to load drop-in kubelet config file %s, error %v"
// compute absolute path based on current working dir
kubeletConfigFile, err := filepath.Abs(name)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
return nil, nil, fmt.Errorf(errFmt, name, err)
}
loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
return nil, nil, fmt.Errorf(errFmt, name, err)
}
return loader.LoadIntoJSON()
}

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
@ -80,6 +81,7 @@ func TestMergeKubeletConfigurations(t *testing.T) {
overwrittenConfigFields map[string]interface{}
cliArgs []string
name string
expectMergeError string
}{
{
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
@ -222,6 +224,55 @@ readOnlyPort: 10255
},
name: "cli args override kubelet.conf",
},
{
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
ResolverConfig: "original",
Port: 123,
ReadOnlyPort: 234,
},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
resolvConf: dropin1
`,
dropin2: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
port: 0
`,
overwrittenConfigFields: map[string]interface{}{
"ResolverConfig": "dropin1", // overridden by dropin1
"Port": int32(10250), // reset to 0 by dropin2, then re-defaulted
"ReadOnlyPort": int32(234), // preserved from original config
},
name: "json conversion is correct",
},
{
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta2
kind: KubeletConfiguration
`,
name: "invalid drop-in apiVersion",
expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta2, Kind=KubeletConfiguration`,
},
{
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration2
`,
name: "invalid drop-in kind",
expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta1, Kind=KubeletConfiguration2`,
},
{
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
dropin1: `
port: 123
`,
name: "empty drop-in apiVersion/kind",
expectMergeError: `'Kind' is missing`,
},
}
for _, test := range testCases {
@ -248,15 +299,24 @@ readOnlyPort: 10255
err := os.Mkdir(kubeletConfDir, 0755)
require.NoError(t, err, "Failed to create kubelet.conf.d directory")
if len(test.dropin1) > 0 {
err = os.WriteFile(filepath.Join(kubeletConfDir, "10-kubelet.conf"), []byte(test.dropin1), 0644)
require.NoError(t, err, "failed to create config from a yaml file")
}
if len(test.dropin2) > 0 {
err = os.WriteFile(filepath.Join(kubeletConfDir, "20-kubelet.conf"), []byte(test.dropin2), 0644)
require.NoError(t, err, "failed to create config from a yaml file")
}
// Merge the kubelet configurations
err = mergeKubeletConfigurations(kubeletConfig, kubeletConfDir)
if test.expectMergeError == "" {
require.NoError(t, err, "failed to merge kubelet drop-in configs")
} else {
require.Error(t, err)
require.Contains(t, err.Error(), test.expectMergeError)
}
}
// Use kubelet config flag precedence

View File

@ -20,6 +20,7 @@ import (
"fmt"
"path/filepath"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
@ -33,7 +34,7 @@ type Loader interface {
Load() (*kubeletconfig.KubeletConfiguration, error)
// LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be
// loaded. It returns the configuration as a JSON byte slice
LoadIntoJSON() ([]byte, error)
LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error)
}
// fsLoader loads configuration from `configDir`
@ -81,15 +82,15 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
return kc, nil
}
func (loader *fsLoader) LoadIntoJSON() ([]byte, error) {
func (loader *fsLoader) LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error) {
data, err := loader.fs.ReadFile(loader.kubeletFile)
if err != nil {
return nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %v", loader.kubeletFile, err)
return nil, nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %w", loader.kubeletFile, err)
}
// no configuration is an error, some parameters are required
if len(data) == 0 {
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
return nil, nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
}
return utilcodec.DecodeKubeletConfigurationIntoJSON(loader.kubeletCodecs, data)

View File

@ -109,14 +109,15 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
}
// DecodeKubeletConfigurationIntoJSON decodes a serialized KubeletConfiguration to the internal type.
func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, error) {
func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, *schema.GroupVersionKind, error) {
// The UniversalDecoder runs defaulting and returns the internal type by default.
obj, _, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{})
obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{})
if err != nil {
return nil, err
return nil, nil, err
}
objT := obj.(*unstructured.Unstructured)
return json.Marshal(objT.Object)
jsonData, err := json.Marshal(objT.Object)
return jsonData, gvk, err
}