diff --git a/cmd/kubeadm/app/cmd/phases/init/kubelet.go b/cmd/kubeadm/app/cmd/phases/init/kubelet.go index 39666345803..54f55abff64 100644 --- a/cmd/kubeadm/app/cmd/phases/init/kubelet.go +++ b/cmd/kubeadm/app/cmd/phases/init/kubelet.go @@ -23,9 +23,11 @@ import ( "k8s.io/klog/v2" + kubeletconfig "k8s.io/kubelet/config/v1beta1" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet" ) @@ -76,6 +78,16 @@ func runKubeletStart(c workflow.RunData) error { return errors.Wrap(err, "error writing a dynamic environment file for the kubelet") } + // Write the instance kubelet configuration file to disk. + if features.Enabled(data.Cfg().FeatureGates, features.NodeLocalCRISocket) { + kubeletConfig := &kubeletconfig.KubeletConfiguration{ + ContainerRuntimeEndpoint: data.Cfg().NodeRegistration.CRISocket, + } + if err := kubeletphase.WriteInstanceConfigToDisk(kubeletConfig, data.KubeletDir()); err != nil { + return errors.Wrap(err, "error writing instance kubelet configuration to disk") + } + } + // Write the kubelet configuration file to disk. if err := kubeletphase.WriteConfigToDisk(&data.Cfg().ClusterConfiguration, data.KubeletDir(), data.PatchesDir(), data.OutputWriter()); err != nil { return errors.Wrap(err, "error writing kubelet configuration to disk") diff --git a/cmd/kubeadm/app/cmd/phases/init/uploadconfig.go b/cmd/kubeadm/app/cmd/phases/init/uploadconfig.go index 93719d98df7..053a06f39e4 100644 --- a/cmd/kubeadm/app/cmd/phases/init/uploadconfig.go +++ b/cmd/kubeadm/app/cmd/phases/init/uploadconfig.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet" patchnodephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/patchnode" "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig" @@ -127,9 +128,11 @@ func runUploadKubeletConfig(c workflow.RunData) error { return errors.Wrap(err, "error creating kubelet configuration ConfigMap") } - klog.V(1).Infoln("[upload-config] Preserving the CRISocket information for the control-plane node") - if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { - return errors.Wrap(err, "Error writing Crisocket information for the control-plane node") + if !features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) { + klog.V(1).Infoln("[upload-config] Preserving the CRISocket information for the control-plane node") + if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { + return errors.Wrap(err, "error writing CRISocket for this node") + } } return nil } diff --git a/cmd/kubeadm/app/cmd/phases/join/kubelet.go b/cmd/kubeadm/app/cmd/phases/join/kubelet.go index 4e657e2f948..5ef76f79993 100644 --- a/cmd/kubeadm/app/cmd/phases/join/kubelet.go +++ b/cmd/kubeadm/app/cmd/phases/join/kubelet.go @@ -232,6 +232,16 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { fmt.Println("[kubelet-start] Would stop the kubelet") } + // Write the instance kubelet configuration file to disk. + if features.Enabled(initCfg.FeatureGates, features.NodeLocalCRISocket) { + kubeletConfig := &kubeletconfig.KubeletConfiguration{ + ContainerRuntimeEndpoint: data.Cfg().NodeRegistration.CRISocket, + } + if err := kubeletphase.WriteInstanceConfigToDisk(kubeletConfig, data.KubeletDir()); err != nil { + return errors.Wrap(err, "error writing instance kubelet configuration to disk") + } + } + // Write the configuration for the kubelet (using the bootstrap token credentials) to disk so the kubelet can start if err := kubeletphase.WriteConfigToDisk(&initCfg.ClusterConfiguration, data.KubeletDir(), data.PatchesDir(), data.OutputWriter()); err != nil { return err @@ -323,9 +333,11 @@ func runKubeletWaitBootstrapPhase(c workflow.RunData) (returnErr error) { return err } - klog.V(1).Infoln("[kubelet-start] preserving the crisocket information for the node") - if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { - return errors.Wrap(err, "error uploading crisocket") + if !features.Enabled(initCfg.ClusterConfiguration.FeatureGates, features.NodeLocalCRISocket) { + klog.V(1).Infoln("[kubelet-start] preserving the crisocket information for the node") + if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { + return errors.Wrap(err, "error writing CRISocket for this node") + } } return nil diff --git a/cmd/kubeadm/app/cmd/phases/upgrade/apply/uploadconfig.go b/cmd/kubeadm/app/cmd/phases/upgrade/apply/uploadconfig.go index 3844d157cef..4842b16719d 100644 --- a/cmd/kubeadm/app/cmd/phases/upgrade/apply/uploadconfig.go +++ b/cmd/kubeadm/app/cmd/phases/upgrade/apply/uploadconfig.go @@ -28,6 +28,7 @@ import ( kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet" patchnodephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/patchnode" "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig" @@ -107,9 +108,11 @@ func runUploadKubeletConfig(c workflow.RunData) error { return errors.Wrap(err, "error creating kubelet configuration ConfigMap") } - klog.V(1).Infoln("[upgrade/upload-config] Preserving the CRISocket information for this control-plane node") - if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { - return errors.Wrap(err, "error writing Crisocket information for the control-plane node") + if !features.Enabled(cfg.ClusterConfiguration.FeatureGates, features.NodeLocalCRISocket) { + klog.V(1).Infoln("[upgrade/upload-config] Preserving the CRISocket information for this control-plane node") + if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil { + return errors.Wrap(err, "error writing CRISocket for this node") + } } return nil diff --git a/cmd/kubeadm/app/componentconfigs/kubelet.go b/cmd/kubeadm/app/componentconfigs/kubelet.go index 6a01c4c398c..436bf1bc87c 100644 --- a/cmd/kubeadm/app/componentconfigs/kubelet.go +++ b/cmd/kubeadm/app/componentconfigs/kubelet.go @@ -20,6 +20,7 @@ import ( "path/filepath" "github.com/pkg/errors" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" kubeletconfig "k8s.io/kubelet/config/v1beta1" diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go index 6337c87f160..3cff2113fa0 100644 --- a/cmd/kubeadm/app/constants/constants.go +++ b/cmd/kubeadm/app/constants/constants.go @@ -306,6 +306,10 @@ const ( // This file should exist under KubeletRunDirectory KubeletConfigurationFileName = "config.yaml" + // KubeletInstanceConfigurationFileName is the name of the kubelet instance configuration file written + // to all nodes. This file should exist under KubeletRunDirectory. + KubeletInstanceConfigurationFileName = "instance-config.yaml" + // KubeletEnvFileName is a file "kubeadm init" writes at runtime. Using that interface, kubeadm can customize certain // kubelet flags conditionally based on the environment at runtime. Also, parameters given to the configuration file // might be passed through this file. "kubeadm init" writes one variable, with the name ${KubeletEnvFileVariableName}. diff --git a/cmd/kubeadm/app/features/features.go b/cmd/kubeadm/app/features/features.go index efbd78cdddc..8defd6036f2 100644 --- a/cmd/kubeadm/app/features/features.go +++ b/cmd/kubeadm/app/features/features.go @@ -40,6 +40,8 @@ const ( WaitForAllControlPlaneComponents = "WaitForAllControlPlaneComponents" // ControlPlaneKubeletLocalMode is expected to be in alpha in v1.31, beta in v1.32 ControlPlaneKubeletLocalMode = "ControlPlaneKubeletLocalMode" + // NodeLocalCRISocket is expected to be in alpha in v1.32, beta in v1.33, ga in v1.35 + NodeLocalCRISocket = "NodeLocalCRISocket" ) // InitFeatureGates are the default feature gates for the init command @@ -56,6 +58,7 @@ var InitFeatureGates = FeatureList{ EtcdLearnerMode: {FeatureSpec: featuregate.FeatureSpec{Default: true, PreRelease: featuregate.GA, LockToDefault: true}}, WaitForAllControlPlaneComponents: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}}, ControlPlaneKubeletLocalMode: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}}, + NodeLocalCRISocket: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}}, } // Feature represents a feature being gated diff --git a/cmd/kubeadm/app/phases/kubelet/config.go b/cmd/kubeadm/app/phases/kubelet/config.go index 43406ca7c50..865acafcda4 100644 --- a/cmd/kubeadm/app/phases/kubelet/config.go +++ b/cmd/kubeadm/app/phases/kubelet/config.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" kubeletconfig "k8s.io/kubelet/config/v1beta1" "sigs.k8s.io/yaml" @@ -34,6 +35,7 @@ import ( kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/componentconfigs" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" "k8s.io/kubernetes/cmd/kubeadm/app/util/patches" @@ -66,7 +68,21 @@ func WriteConfigToDisk(cfg *kubeadmapi.ClusterConfiguration, kubeletDir, patches } } - return writeConfigBytesToDisk(kubeletBytes, kubeletDir) + if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) { + file := filepath.Join(kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName) + kubeletBytes, err = applyKubeletConfigPatchFromFile(kubeletBytes, file, output) + if err != nil { + return errors.Wrapf(err, "could not apply kubelet instance configuration as a patch from %q", file) + } + } + return writeConfigBytesToDisk(kubeletBytes, kubeletDir, kubeadmconstants.KubeletConfigurationFileName) +} + +// WriteInstanceConfigToDisk writes the container runtime endpoint configuration +// to the instance configuration file in the specified kubelet directory. +func WriteInstanceConfigToDisk(cfg *kubeletconfig.KubeletConfiguration, kubeletDir string) error { + instanceFileContent := fmt.Sprintf("containerRuntimeEndpoint: %q\n", cfg.ContainerRuntimeEndpoint) + return writeConfigBytesToDisk([]byte(instanceFileContent), kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName) } // ApplyPatchesToConfig applies the patches located in patchesDir to the KubeletConfiguration stored @@ -188,8 +204,8 @@ func createConfigMapRBACRules(client clientset.Interface) error { } // writeConfigBytesToDisk writes a byte slice down to disk at the specific location of the kubelet config file -func writeConfigBytesToDisk(b []byte, kubeletDir string) error { - configFile := filepath.Join(kubeletDir, kubeadmconstants.KubeletConfigurationFileName) +func writeConfigBytesToDisk(b []byte, kubeletDir, fileName string) error { + configFile := filepath.Join(kubeletDir, fileName) fmt.Printf("[kubelet-start] Writing kubelet configuration to file %q\n", configFile) // creates target folder if not already exists @@ -198,7 +214,7 @@ func writeConfigBytesToDisk(b []byte, kubeletDir string) error { } if err := os.WriteFile(configFile, b, 0644); err != nil { - return errors.Wrapf(err, "failed to write kubelet configuration to the file %q", configFile) + return errors.Wrapf(err, "failed to write kubelet configuration file %q", configFile) } return nil } @@ -225,3 +241,45 @@ func applyKubeletConfigPatches(kubeletBytes []byte, patchesDir string, output io } return kubeletBytes, nil } + +// applyKubeletConfigPatchFromFile applies a single patch file to the kubelet configuration bytes. +func applyKubeletConfigPatchFromFile(kubeletConfigBytes []byte, patchFilePath string, output io.Writer) ([]byte, error) { + // Get the patch data from the file. + data, err := os.ReadFile(patchFilePath) + if err != nil { + return nil, errors.Wrapf(err, "could not read patch file %q", patchFilePath) + } + + patchSet, err := patches.CreatePatchSet(patches.KubeletConfiguration, types.StrategicMergePatchType, string(data)) + if err != nil { + return nil, err + } + + patchManager := patches.NewPatchManager([]*patches.PatchSet{patchSet}, []string{patches.KubeletConfiguration}, output) + + // Always convert the target data to JSON. + patchData, err := yaml.YAMLToJSON(kubeletConfigBytes) + if err != nil { + return nil, err + } + + // Define the patch target. + patchTarget := &patches.PatchTarget{ + Name: patches.KubeletConfiguration, + StrategicMergePatchObject: kubeletconfig.KubeletConfiguration{}, + Data: patchData, + } + + err = patchManager.ApplyPatchesToTarget(patchTarget) + if err != nil { + return nil, err + } + + // Convert the patched data back to YAML and return it. + kubeletConfigBytes, err = yaml.JSONToYAML(patchTarget.Data) + if err != nil { + return nil, errors.Wrap(err, "failed to convert patched data to YAML") + } + + return kubeletConfigBytes, nil +} diff --git a/cmd/kubeadm/app/phases/kubelet/config_test.go b/cmd/kubeadm/app/phases/kubelet/config_test.go index c95499e090d..6d2a6af4e4f 100644 --- a/cmd/kubeadm/app/phases/kubelet/config_test.go +++ b/cmd/kubeadm/app/phases/kubelet/config_test.go @@ -24,6 +24,8 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -109,6 +111,74 @@ func TestApplyKubeletConfigPatches(t *testing.T) { } } +func TestApplyKubeletConfigPatchFromFile(t *testing.T) { + const kubeletConfigGVK = "apiVersion: kubelet.config.k8s.io/v1beta1\nkind: KubeletConfiguration\n" + + tests := []struct { + name string + kubeletConfig []byte + patchContent []byte + expectError bool + expectedResult []byte + }{ + { + name: "apply new field", + kubeletConfig: []byte(kubeletConfigGVK), + patchContent: []byte("containerRuntimeEndpoint: unix:///run/containerd/containerd.sock"), + expectError: false, + expectedResult: []byte("apiVersion: kubelet.config.k8s.io/v1beta1\ncontainerRuntimeEndpoint: unix:///run/containerd/containerd.sock\nkind: KubeletConfiguration\n"), + }, + { + name: "overwrite existing field", + kubeletConfig: []byte(kubeletConfigGVK + "containerRuntimeEndpoint: unix:///run/crio/crio.sock\n"), + patchContent: []byte("containerRuntimeEndpoint: unix:///run/containerd/containerd.sock"), + expectError: false, + expectedResult: []byte("apiVersion: kubelet.config.k8s.io/v1beta1\ncontainerRuntimeEndpoint: unix:///run/containerd/containerd.sock\nkind: KubeletConfiguration\n"), + }, + { + name: "invalid patch contents", + kubeletConfig: []byte(kubeletConfigGVK), + patchContent: []byte("invalid-patch-content"), + expectError: true, + }, + { + name: "empty patch file", + kubeletConfig: []byte(kubeletConfigGVK), + patchContent: []byte(""), + expectError: false, + expectedResult: []byte(kubeletConfigGVK), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + output := io.Discard + + // Create a temporary file to store the patch content. + patchFile, err := os.CreateTemp("", "instance-config-*.yml") + if err != nil { + t.Errorf("Error creating temporary file: %v", err) + } + defer func() { + _ = patchFile.Close() + _ = os.Remove(patchFile.Name()) + }() + + _, err = patchFile.Write(tt.patchContent) + if err != nil { + t.Errorf("Error writing instance config to file: %v", err) + } + + // Apply the patch. + result, err := applyKubeletConfigPatchFromFile(tt.kubeletConfig, patchFile.Name(), output) + if !tt.expectError && err != nil { + t.Errorf("Unexpected error: %v", err) + } + assert.Equal(t, tt.expectedResult, result) + }) + } +} + func TestApplyPatchesToConfig(t *testing.T) { const ( expectedAddress = "barfoo" diff --git a/cmd/kubeadm/app/phases/kubelet/flags.go b/cmd/kubeadm/app/phases/kubelet/flags.go index 79e2dc564fb..c48369a55fd 100644 --- a/cmd/kubeadm/app/phases/kubelet/flags.go +++ b/cmd/kubeadm/app/phases/kubelet/flags.go @@ -29,6 +29,7 @@ import ( nodeutil "k8s.io/component-helpers/node/util" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" "k8s.io/kubernetes/cmd/kubeadm/app/images" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" ) @@ -37,6 +38,8 @@ type kubeletFlagsOpts struct { nodeRegOpts *kubeadmapi.NodeRegistrationOptions pauseImage string registerTaintsUsingFlags bool + // TODO: remove this field once the feature NodeLocalCRISocket is GA. + criSocket string } // GetNodeNameAndHostname obtains the name for this Node using the following precedence @@ -64,6 +67,11 @@ func WriteKubeletDynamicEnvFile(cfg *kubeadmapi.ClusterConfiguration, nodeReg *k nodeRegOpts: nodeReg, pauseImage: images.GetPauseImage(cfg), registerTaintsUsingFlags: registerTaintsUsingFlags, + criSocket: nodeReg.CRISocket, + } + + if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) { + flagOpts.criSocket = "" } stringMap := buildKubeletArgs(flagOpts) argList := kubeadmutil.ArgumentsToCommand(stringMap, nodeReg.KubeletExtraArgs) @@ -76,7 +84,9 @@ func WriteKubeletDynamicEnvFile(cfg *kubeadmapi.ClusterConfiguration, nodeReg *k // that are common to both Linux and Windows func buildKubeletArgsCommon(opts kubeletFlagsOpts) []kubeadmapi.Arg { kubeletFlags := []kubeadmapi.Arg{} - kubeletFlags = append(kubeletFlags, kubeadmapi.Arg{Name: "container-runtime-endpoint", Value: opts.nodeRegOpts.CRISocket}) + if opts.criSocket != "" { + kubeletFlags = append(kubeletFlags, kubeadmapi.Arg{Name: "container-runtime-endpoint", Value: opts.criSocket}) + } // This flag passes the pod infra container image (e.g. "pause" image) to the kubelet // and prevents its garbage collection @@ -125,3 +135,47 @@ func writeKubeletFlagBytesToDisk(b []byte, kubeletDir string) error { func buildKubeletArgs(opts kubeletFlagsOpts) []kubeadmapi.Arg { return buildKubeletArgsCommon(opts) } + +// ReadKubeletDynamicEnvFile reads the kubelet dynamic environment flags file a slice of strings. +func ReadKubeletDynamicEnvFile(kubeletEnvFilePath string) ([]string, error) { + data, err := os.ReadFile(kubeletEnvFilePath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + // Trim any surrounding whitespace. + content := strings.TrimSpace(string(data)) + + // Check if the content starts with the expected kubelet environment variable prefix. + const prefix = constants.KubeletEnvFileVariableName + "=" + if !strings.HasPrefix(content, prefix) { + return nil, errors.Errorf("the file %q does not contain the expected prefix %q", kubeletEnvFilePath, prefix) + } + + // Trim the prefix and the surrounding double quotes. + flags := strings.TrimPrefix(content, prefix) + flags = strings.Trim(flags, `"`) + + // Split the flags string by whitespace to get individual arguments. + trimmedFlags := strings.Fields(flags) + if len(trimmedFlags) == 0 { + return nil, errors.Errorf("no flags found in file %q", kubeletEnvFilePath) + } + + var updatedFlags []string + for i := 0; i < len(trimmedFlags); i++ { + flag := trimmedFlags[i] + if strings.Contains(flag, "=") { + // If the flag contains '=', add it directly. + updatedFlags = append(updatedFlags, flag) + } else if i+1 < len(trimmedFlags) { + // If no '=' is found, combine the flag with the next item as its value. + combinedFlag := flag + "=" + trimmedFlags[i+1] + updatedFlags = append(updatedFlags, combinedFlag) + // Skip the next item as it has been used as the value. + i++ + } + } + + return updatedFlags, nil +} diff --git a/cmd/kubeadm/app/phases/kubelet/flags_test.go b/cmd/kubeadm/app/phases/kubelet/flags_test.go index 5c68b33cada..b159b35021d 100644 --- a/cmd/kubeadm/app/phases/kubelet/flags_test.go +++ b/cmd/kubeadm/app/phases/kubelet/flags_test.go @@ -22,6 +22,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" @@ -37,11 +39,11 @@ func TestBuildKubeletArgs(t *testing.T) { name: "hostname override", opts: kubeletFlagsOpts{ nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{ - CRISocket: "unix:///var/run/containerd/containerd.sock", KubeletExtraArgs: []kubeadmapi.Arg{ {Name: "hostname-override", Value: "override-name"}, }, }, + criSocket: "unix:///var/run/containerd/containerd.sock", }, expected: []kubeadmapi.Arg{ {Name: "container-runtime-endpoint", Value: "unix:///var/run/containerd/containerd.sock"}, @@ -52,7 +54,6 @@ func TestBuildKubeletArgs(t *testing.T) { name: "register with taints", opts: kubeletFlagsOpts{ nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{ - CRISocket: "unix:///var/run/containerd/containerd.sock", Taints: []v1.Taint{ { Key: "foo", @@ -66,6 +67,7 @@ func TestBuildKubeletArgs(t *testing.T) { }, }, }, + criSocket: "unix:///var/run/containerd/containerd.sock", registerTaintsUsingFlags: true, }, expected: []kubeadmapi.Arg{ @@ -76,10 +78,9 @@ func TestBuildKubeletArgs(t *testing.T) { { name: "pause image is set", opts: kubeletFlagsOpts{ - nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{ - CRISocket: "unix:///var/run/containerd/containerd.sock", - }, - pauseImage: "registry.k8s.io/pause:ver", + nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{}, + criSocket: "unix:///var/run/containerd/containerd.sock", + pauseImage: "registry.k8s.io/pause:ver", }, expected: []kubeadmapi.Arg{ {Name: "container-runtime-endpoint", Value: "unix:///var/run/containerd/containerd.sock"}, @@ -189,3 +190,81 @@ func TestGetNodeNameAndHostname(t *testing.T) { }) } } + +func TestReadKubeadmFlags(t *testing.T) { + tests := []struct { + name string + fileContent string + expectedValue []string + expectError bool + }{ + { + name: "valid kubeadm flags with container-runtime-endpoint", + fileContent: `KUBELET_KUBEADM_ARGS="--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock --pod-infra-container-image=registry.k8s.io/pause:1.0"`, + expectedValue: []string{"--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock", "--pod-infra-container-image=registry.k8s.io/pause:1.0"}, + expectError: false, + }, + { + name: "no container-runtime-endpoint found", + fileContent: `KUBELET_KUBEADM_ARGS="--pod-infra-container-image=registry.k8s.io/pause:1.0"`, + expectedValue: []string{"--pod-infra-container-image=registry.k8s.io/pause:1.0"}, + expectError: true, + }, + { + name: "no KUBELET_KUBEADM_ARGS line", + fileContent: `# This is a comment, no args here`, + expectedValue: nil, + expectError: true, + }, + { + name: "invalid file format", + fileContent: "", + expectedValue: nil, + expectError: true, + }, + { + name: "multiple flags with mixed equals and no equals", + fileContent: `KUBELET_KUBEADM_ARGS="--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock --foo bar --baz=qux"`, + expectedValue: []string{"--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock", "--foo=bar", "--baz=qux"}, + expectError: false, + }, + { + name: "multiple flags with no equals", + fileContent: `KUBELET_KUBEADM_ARGS="--container-runtime-endpoint unix:///var/run/containerd/containerd.sock --foo bar --baz qux"`, + expectedValue: []string{"--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock", "--foo=bar", "--baz=qux"}, + expectError: false, + }, + { + name: "invalid prefix", + fileContent: `"--foo bar"`, + expectedValue: nil, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpFile, err := os.CreateTemp("", "kubeadm-flags-*.env") + if err != nil { + t.Errorf("Error creating temporary file: %v", err) + } + + defer func() { + _ = os.Remove(tmpFile.Name()) + _ = tmpFile.Close() + }() + + _, err = tmpFile.WriteString(tt.fileContent) + if err != nil { + t.Errorf("Error writing args to file: %v", err) + } + + value, err := ReadKubeletDynamicEnvFile(tmpFile.Name()) + if !tt.expectError && err != nil { + t.Errorf("Unexpected error: %v", err) + } + + assert.Equal(t, tt.expectedValue, value) + }) + } +} diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade.go b/cmd/kubeadm/app/phases/upgrade/postupgrade.go index 2a4c456b832..4153dc49f14 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade.go @@ -27,14 +27,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - errorsutil "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + kubeletconfig "k8s.io/kubelet/config/v1beta1" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" @@ -124,19 +125,57 @@ func WriteKubeletConfigFiles(cfg *kubeadmapi.InitConfiguration, patchesDir strin fmt.Printf("[dryrun] Would back up kubelet config file to %s\n", dest) } - errs := []error{} + if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) { + // If instance-config.yaml exist on disk, we don't need to create it. + _, err := os.Stat(filepath.Join(kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName)) + if os.IsNotExist(err) { + var containerRuntimeEndpoint string + dynamicFlags, err := kubeletphase.ReadKubeletDynamicEnvFile(filepath.Join(kubeletDir, kubeadmconstants.KubeletEnvFileName)) + if err == nil { + args := kubeadmutil.ArgumentsFromCommand(dynamicFlags) + for _, arg := range args { + if arg.Name == "container-runtime-endpoint" { + containerRuntimeEndpoint = arg.Value + break + } + } + } else if dryRun { + fmt.Fprintf(os.Stdout, "[dryrun] would read the flag --container-runtime-endpoint value from %q, which is missing. "+ + "Using default socket %q instead", kubeadmconstants.KubeletEnvFileName, kubeadmconstants.DefaultCRISocket) + containerRuntimeEndpoint = kubeadmconstants.DefaultCRISocket + } else { + return errors.Wrap(err, "error reading kubeadm flags file") + } + + kubeletConfig := &kubeletconfig.KubeletConfiguration{ + ContainerRuntimeEndpoint: containerRuntimeEndpoint, + } + + if err := kubeletphase.WriteInstanceConfigToDisk(kubeletConfig, kubeletDir); err != nil { + return errors.Wrap(err, "error writing kubelet instance configuration") + } + + if dryRun { // Print what contents would be written + err = dryrunutil.PrintDryRunFile(kubeadmconstants.KubeletInstanceConfigurationFileName, kubeletDir, kubeadmconstants.KubeletRunDirectory, os.Stdout) + if err != nil { + return errors.Wrap(err, "error printing kubelet instance configuration file on dryrun") + } + } + } + } + // Write the configuration for the kubelet down to disk so the upgraded kubelet can start with fresh config if err := kubeletphase.WriteConfigToDisk(&cfg.ClusterConfiguration, kubeletDir, patchesDir, out); err != nil { - errs = append(errs, errors.Wrap(err, "error writing kubelet configuration to file")) + return errors.Wrap(err, "error writing kubelet configuration to file") } if dryRun { // Print what contents would be written err := dryrunutil.PrintDryRunFile(kubeadmconstants.KubeletConfigurationFileName, kubeletDir, kubeadmconstants.KubeletRunDirectory, os.Stdout) if err != nil { - errs = append(errs, errors.Wrap(err, "error printing files on dryrun")) + return errors.Wrap(err, "error printing kubelet configuration file on dryrun") } } - return errorsutil.NewAggregate(errs) + return nil } // GetKubeletDir gets the kubelet directory based on whether the user is dry-running this command or not. diff --git a/cmd/kubeadm/app/util/config/cluster.go b/cmd/kubeadm/app/util/config/cluster.go index 3071a11ca8b..1c228b6a6a9 100644 --- a/cmd/kubeadm/app/util/config/cluster.go +++ b/cmd/kubeadm/app/util/config/cluster.go @@ -20,6 +20,8 @@ import ( "context" "crypto/x509" "fmt" + "os" + "path" "path/filepath" "strings" "time" @@ -36,12 +38,15 @@ import ( certutil "k8s.io/client-go/util/cert" nodeutil "k8s.io/component-helpers/node/util" "k8s.io/klog/v2" + kubeletconfig "k8s.io/kubelet/config/v1beta1" + "sigs.k8s.io/yaml" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme" kubeadmapiv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta4" "k8s.io/kubernetes/cmd/kubeadm/app/componentconfigs" "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" "k8s.io/kubernetes/cmd/kubeadm/app/util/config/strict" "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig" @@ -115,7 +120,7 @@ func getInitConfigurationFromCluster(kubeconfigDir string, client clientset.Inte if !newControlPlane { // gets the nodeRegistration for the current from the node object kubeconfigFile := filepath.Join(kubeconfigDir, constants.KubeletKubeConfigFileName) - if err := GetNodeRegistration(kubeconfigFile, client, &initcfg.NodeRegistration); err != nil { + if err := GetNodeRegistration(kubeconfigFile, client, &initcfg.NodeRegistration, &initcfg.ClusterConfiguration); err != nil { return nil, errors.Wrap(err, "failed to get node registration") } // gets the APIEndpoint for the current node @@ -158,7 +163,7 @@ func GetNodeName(kubeconfigFile string) (string, error) { } // GetNodeRegistration returns the nodeRegistration for the current node -func GetNodeRegistration(kubeconfigFile string, client clientset.Interface, nodeRegistration *kubeadmapi.NodeRegistrationOptions) error { +func GetNodeRegistration(kubeconfigFile string, client clientset.Interface, nodeRegistration *kubeadmapi.NodeRegistrationOptions, clusterCfg *kubeadmapi.ClusterConfiguration) error { // gets the name of the current node nodeName, err := GetNodeName(kubeconfigFile) if err != nil { @@ -171,9 +176,30 @@ func GetNodeRegistration(kubeconfigFile string, client clientset.Interface, node return errors.Wrap(err, "failed to get corresponding node") } - criSocket, ok := node.ObjectMeta.Annotations[constants.AnnotationKubeadmCRISocket] - if !ok { - return errors.Errorf("node %s doesn't have %s annotation", nodeName, constants.AnnotationKubeadmCRISocket) + var ( + criSocket string + ok bool + missingAnnotationError = errors.Errorf("node %s doesn't have %s annotation", nodeName, constants.AnnotationKubeadmCRISocket) + ) + if features.Enabled(clusterCfg.FeatureGates, features.NodeLocalCRISocket) { + _, err = os.Stat(filepath.Join(constants.KubeletRunDirectory, constants.KubeletInstanceConfigurationFileName)) + if os.IsNotExist(err) { + criSocket, ok = node.ObjectMeta.Annotations[constants.AnnotationKubeadmCRISocket] + if !ok { + return missingAnnotationError + } + } else { + kubeletConfig, err := readKubeletConfig(constants.KubeletRunDirectory, constants.KubeletInstanceConfigurationFileName) + if err != nil { + return errors.Wrapf(err, "node %q does not have a kubelet instance configuration", nodeName) + } + criSocket = kubeletConfig.ContainerRuntimeEndpoint + } + } else { + criSocket, ok = node.ObjectMeta.Annotations[constants.AnnotationKubeadmCRISocket] + if !ok { + return missingAnnotationError + } } // returns the nodeRegistration attributes @@ -304,3 +330,20 @@ func getRawAPIEndpointFromPodAnnotationWithoutRetry(ctx context.Context, client } return "", errors.Errorf("API server pod for node name %q hasn't got a %q annotation, cannot retrieve API endpoint", nodeName, constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey) } + +// readKubeletConfig reads a KubeletConfiguration from the specified file. +func readKubeletConfig(kubeletDir, fileName string) (*kubeletconfig.KubeletConfiguration, error) { + kubeletFile := path.Join(kubeletDir, fileName) + + data, err := os.ReadFile(kubeletFile) + if err != nil { + return nil, errors.Wrapf(err, "could not read kubelet configuration file %q", kubeletFile) + } + + var config kubeletconfig.KubeletConfiguration + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, errors.Wrapf(err, "could not parse kubelet configuration file %q", kubeletFile) + } + + return &config, nil +} diff --git a/cmd/kubeadm/app/util/config/cluster_test.go b/cmd/kubeadm/app/util/config/cluster_test.go index 9ea6ce2adef..1ed8786ef5c 100644 --- a/cmd/kubeadm/app/util/config/cluster_test.go +++ b/cmd/kubeadm/app/util/config/cluster_test.go @@ -336,7 +336,7 @@ func TestGetNodeRegistration(t *testing.T) { } cfg := &kubeadmapi.InitConfiguration{} - err = GetNodeRegistration(cfgPath, client, &cfg.NodeRegistration) + err = GetNodeRegistration(cfgPath, client, &cfg.NodeRegistration, &cfg.ClusterConfiguration) if rt.expectedError != (err != nil) { t.Errorf("unexpected return err from getNodeRegistration: %v", err) return diff --git a/cmd/kubeadm/app/util/patches/patches.go b/cmd/kubeadm/app/util/patches/patches.go index c856840004c..7142ce5dee1 100644 --- a/cmd/kubeadm/app/util/patches/patches.go +++ b/cmd/kubeadm/app/util/patches/patches.go @@ -53,20 +53,20 @@ type PatchTarget struct { // PatchManager defines an object that can apply patches. type PatchManager struct { - patchSets []*patchSet + patchSets []*PatchSet knownTargets []string output io.Writer } -// patchSet defines a set of patches of a certain type that can patch a PatchTarget. -type patchSet struct { +// PatchSet defines a set of patches of a certain type that can patch a PatchTarget. +type PatchSet struct { targetName string patchType types.PatchType patches []string } // String() is used for unit-testing. -func (ps *patchSet) String() string { +func (ps *PatchSet) String() string { return fmt.Sprintf( "{%q, %q, %#v}", ps.targetName, @@ -113,6 +113,15 @@ func KnownTargets() []string { return knownTargets } +// NewPatchManager creates a patch manager that can be used to apply patches to "knownTargets". +func NewPatchManager(patchSets []*PatchSet, knownTargets []string, output io.Writer) *PatchManager { + return &PatchManager{ + patchSets: patchSets, + knownTargets: knownTargets, + output: output, + } +} + // GetPatchManagerForPath creates a patch manager that can be used to apply patches to "knownTargets". // "path" should contain patches that can be used to patch the "knownTargets". // If "output" is non-nil, messages about actions performed by the manager would go on this io.Writer. @@ -257,8 +266,8 @@ func parseFilename(fileName string, knownTargets []string) (string, types.PatchT return targetName, patchType, nil, nil } -// createPatchSet creates a patchSet object, by splitting the given "data" by "\n---". -func createPatchSet(targetName string, patchType types.PatchType, data string) (*patchSet, error) { +// CreatePatchSet creates a patchSet object, by splitting the given "data" by "\n---". +func CreatePatchSet(targetName string, patchType types.PatchType, data string) (*PatchSet, error) { var patches []string // Split the patches and convert them to JSON. @@ -285,7 +294,7 @@ func createPatchSet(targetName string, patchType types.PatchType, data string) ( patches = append(patches, string(patchJSON)) } - return &patchSet{ + return &PatchSet{ targetName: targetName, patchType: patchType, patches: patches, @@ -294,10 +303,10 @@ func createPatchSet(targetName string, patchType types.PatchType, data string) ( // getPatchSetsFromPath walks a path, ignores sub-directories and non-patch files, and // returns a list of patchFile objects. -func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Writer) ([]*patchSet, []string, []string, error) { +func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Writer) ([]*PatchSet, []string, []string, error) { patchFiles := []string{} ignoredFiles := []string{} - patchSets := []*patchSet{} + patchSets := []*PatchSet{} // Check if targetPath is a directory. info, err := os.Lstat(targetPath) @@ -349,7 +358,7 @@ func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Wr } // Create a patchSet object. - patchSet, err := createPatchSet(targetName, patchType, string(data)) + patchSet, err := CreatePatchSet(targetName, patchType, string(data)) if err != nil { return err } diff --git a/cmd/kubeadm/app/util/patches/patches_test.go b/cmd/kubeadm/app/util/patches/patches_test.go index 5213c5f7f70..49084fb6424 100644 --- a/cmd/kubeadm/app/util/patches/patches_test.go +++ b/cmd/kubeadm/app/util/patches/patches_test.go @@ -120,7 +120,7 @@ func TestCreatePatchSet(t *testing.T) { name string targetName string patchType types.PatchType - expectedPatchSet *patchSet + expectedPatchSet *PatchSet data string }{ { @@ -129,7 +129,7 @@ func TestCreatePatchSet(t *testing.T) { targetName: "etcd", patchType: types.StrategicMergePatchType, data: "foo: bar\n---\nfoo: baz\n", - expectedPatchSet: &patchSet{ + expectedPatchSet: &PatchSet{ targetName: "etcd", patchType: types.StrategicMergePatchType, patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`}, @@ -140,7 +140,7 @@ func TestCreatePatchSet(t *testing.T) { targetName: "etcd", patchType: types.StrategicMergePatchType, data: `{"foo":"bar"}` + "\n---\n" + `{"foo":"baz"}`, - expectedPatchSet: &patchSet{ + expectedPatchSet: &PatchSet{ targetName: "etcd", patchType: types.StrategicMergePatchType, patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`}, @@ -151,7 +151,7 @@ func TestCreatePatchSet(t *testing.T) { targetName: "etcd", patchType: types.StrategicMergePatchType, data: `{"foo":"bar"}` + "\n---\n ---\n" + `{"foo":"baz"}`, - expectedPatchSet: &patchSet{ + expectedPatchSet: &PatchSet{ targetName: "etcd", patchType: types.StrategicMergePatchType, patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`}, @@ -161,7 +161,7 @@ func TestCreatePatchSet(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ps, _ := createPatchSet(tc.targetName, tc.patchType, tc.data) + ps, _ := CreatePatchSet(tc.targetName, tc.patchType, tc.data) if !reflect.DeepEqual(ps, tc.expectedPatchSet) { t.Fatalf("expected patch set:\n%+v\ngot:\n%+v\n", tc.expectedPatchSet, ps) } @@ -189,7 +189,7 @@ func TestGetPatchSetsForPath(t *testing.T) { tests := []struct { name string filesToWrite []string - expectedPatchSets []*patchSet + expectedPatchSets []*PatchSet expectedPatchFiles []string expectedIgnoredFiles []string expectedError bool @@ -199,7 +199,7 @@ func TestGetPatchSetsForPath(t *testing.T) { name: "valid: patch files are sorted and non-patch files are ignored", filesToWrite: []string{"kube-scheduler+merge.json", "kube-apiserver+json.yaml", "etcd.yaml", "foo", "bar.json"}, patchData: patchData, - expectedPatchSets: []*patchSet{ + expectedPatchSets: []*PatchSet{ { targetName: "etcd", patchType: types.StrategicMergePatchType, @@ -225,7 +225,7 @@ func TestGetPatchSetsForPath(t *testing.T) { filesToWrite: []string{"kube-scheduler.json"}, expectedPatchFiles: []string{}, expectedIgnoredFiles: []string{"kube-scheduler.json"}, - expectedPatchSets: []*patchSet{}, + expectedPatchSets: []*PatchSet{}, }, { name: "invalid: bad patch type in filename returns and error",