Implement kubeadm init

This commit is contained in:
HirazawaUi 2024-10-10 00:08:13 +08:00
parent fc5b3e2dfb
commit 16e767d915
10 changed files with 198 additions and 32 deletions

View File

@ -23,9 +23,11 @@ import (
"k8s.io/klog/v2"
kubeletconfig "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet"
)
@ -76,6 +78,16 @@ func runKubeletStart(c workflow.RunData) error {
return errors.Wrap(err, "error writing a dynamic environment file for the kubelet")
}
// Write the instance kubelet configuration file to disk.
if features.Enabled(data.Cfg().FeatureGates, features.NodeLocalCRISocket) {
kubeletConfig := &kubeletconfig.KubeletConfiguration{
ContainerRuntimeEndpoint: data.Cfg().NodeRegistration.CRISocket,
}
if err := kubeletphase.WriteInstanceConfigToDisk(kubeletConfig, data.KubeletDir()); err != nil {
return errors.Wrap(err, "error writing instance kubelet configuration to disk")
}
}
// Write the kubelet configuration file to disk.
if err := kubeletphase.WriteConfigToDisk(&data.Cfg().ClusterConfiguration, data.KubeletDir(), data.PatchesDir(), data.OutputWriter()); err != nil {
return errors.Wrap(err, "error writing kubelet configuration to disk")

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
kubeletphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubelet"
patchnodephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/patchnode"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
@ -127,9 +128,11 @@ func runUploadKubeletConfig(c workflow.RunData) error {
return errors.Wrap(err, "error creating kubelet configuration ConfigMap")
}
klog.V(1).Infoln("[upload-config] Preserving the CRISocket information for the control-plane node")
if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil {
return errors.Wrap(err, "Error writing Crisocket information for the control-plane node")
if !features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) {
klog.V(1).Infoln("[upload-config] Preserving the CRISocket information for the control-plane node")
if err := patchnodephase.AnnotateCRISocket(client, cfg.NodeRegistration.Name, cfg.NodeRegistration.CRISocket); err != nil {
return errors.Wrap(err, "error writing CRISocket for this node")
}
}
return nil
}

View File

@ -20,6 +20,7 @@ import (
"path/filepath"
"github.com/pkg/errors"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
kubeletconfig "k8s.io/kubelet/config/v1beta1"

View File

@ -306,6 +306,10 @@ const (
// This file should exist under KubeletRunDirectory
KubeletConfigurationFileName = "config.yaml"
// KubeletInstanceConfigurationFileName is the name of the kubelet instance configuration file written
// to all nodes. This file should exist under KubeletRunDirectory.
KubeletInstanceConfigurationFileName = "instance-config.yaml"
// KubeletEnvFileName is a file "kubeadm init" writes at runtime. Using that interface, kubeadm can customize certain
// kubelet flags conditionally based on the environment at runtime. Also, parameters given to the configuration file
// might be passed through this file. "kubeadm init" writes one variable, with the name ${KubeletEnvFileVariableName}.

View File

@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
kubeletconfig "k8s.io/kubelet/config/v1beta1"
"sigs.k8s.io/yaml"
@ -34,6 +35,7 @@ import (
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/componentconfigs"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
"k8s.io/kubernetes/cmd/kubeadm/app/util/patches"
@ -66,7 +68,21 @@ func WriteConfigToDisk(cfg *kubeadmapi.ClusterConfiguration, kubeletDir, patches
}
}
return writeConfigBytesToDisk(kubeletBytes, kubeletDir)
if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) {
file := filepath.Join(kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName)
kubeletBytes, err = applyKubeletConfigPatchFromFile(kubeletBytes, file, output)
if err != nil {
return errors.Wrapf(err, "could not apply kubelet instance configuration as a patch from %q", file)
}
}
return writeConfigBytesToDisk(kubeletBytes, kubeletDir, kubeadmconstants.KubeletConfigurationFileName)
}
// WriteInstanceConfigToDisk writes the container runtime endpoint configuration
// to the instance configuration file in the specified kubelet directory.
func WriteInstanceConfigToDisk(cfg *kubeletconfig.KubeletConfiguration, kubeletDir string) error {
instanceFileContent := fmt.Sprintf("containerRuntimeEndpoint: %q\n", cfg.ContainerRuntimeEndpoint)
return writeConfigBytesToDisk([]byte(instanceFileContent), kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName)
}
// ApplyPatchesToConfig applies the patches located in patchesDir to the KubeletConfiguration stored
@ -188,8 +204,8 @@ func createConfigMapRBACRules(client clientset.Interface) error {
}
// writeConfigBytesToDisk writes a byte slice down to disk at the specific location of the kubelet config file
func writeConfigBytesToDisk(b []byte, kubeletDir string) error {
configFile := filepath.Join(kubeletDir, kubeadmconstants.KubeletConfigurationFileName)
func writeConfigBytesToDisk(b []byte, kubeletDir, fileName string) error {
configFile := filepath.Join(kubeletDir, fileName)
fmt.Printf("[kubelet-start] Writing kubelet configuration to file %q\n", configFile)
// creates target folder if not already exists
@ -198,7 +214,7 @@ func writeConfigBytesToDisk(b []byte, kubeletDir string) error {
}
if err := os.WriteFile(configFile, b, 0644); err != nil {
return errors.Wrapf(err, "failed to write kubelet configuration to the file %q", configFile)
return errors.Wrapf(err, "failed to write kubelet configuration file %q", configFile)
}
return nil
}
@ -225,3 +241,45 @@ func applyKubeletConfigPatches(kubeletBytes []byte, patchesDir string, output io
}
return kubeletBytes, nil
}
// applyKubeletConfigPatchFromFile applies a single patch file to the kubelet configuration bytes.
func applyKubeletConfigPatchFromFile(kubeletConfigBytes []byte, patchFilePath string, output io.Writer) ([]byte, error) {
// Get the patch data from the file.
data, err := os.ReadFile(patchFilePath)
if err != nil {
return nil, errors.Wrapf(err, "could not read patch file %q", patchFilePath)
}
patchSet, err := patches.CreatePatchSet(patches.KubeletConfiguration, types.StrategicMergePatchType, string(data))
if err != nil {
return nil, err
}
patchManager := patches.NewPatchManager([]*patches.PatchSet{patchSet}, []string{patches.KubeletConfiguration}, output)
// Always convert the target data to JSON.
patchData, err := yaml.YAMLToJSON(kubeletConfigBytes)
if err != nil {
return nil, err
}
// Define the patch target.
patchTarget := &patches.PatchTarget{
Name: patches.KubeletConfiguration,
StrategicMergePatchObject: kubeletconfig.KubeletConfiguration{},
Data: patchData,
}
err = patchManager.ApplyPatchesToTarget(patchTarget)
if err != nil {
return nil, err
}
// Convert the patched data back to YAML and return it.
kubeletConfigBytes, err = yaml.JSONToYAML(patchTarget.Data)
if err != nil {
return nil, errors.Wrap(err, "failed to convert patched data to YAML")
}
return kubeletConfigBytes, nil
}

View File

@ -24,6 +24,8 @@ import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -109,6 +111,74 @@ func TestApplyKubeletConfigPatches(t *testing.T) {
}
}
func TestApplyKubeletConfigPatchFromFile(t *testing.T) {
const kubeletConfigGVK = "apiVersion: kubelet.config.k8s.io/v1beta1\nkind: KubeletConfiguration\n"
tests := []struct {
name string
kubeletConfig []byte
patchContent []byte
expectError bool
expectedResult []byte
}{
{
name: "apply new field",
kubeletConfig: []byte(kubeletConfigGVK),
patchContent: []byte("containerRuntimeEndpoint: unix:///run/containerd/containerd.sock"),
expectError: false,
expectedResult: []byte("apiVersion: kubelet.config.k8s.io/v1beta1\ncontainerRuntimeEndpoint: unix:///run/containerd/containerd.sock\nkind: KubeletConfiguration\n"),
},
{
name: "overwrite existing field",
kubeletConfig: []byte(kubeletConfigGVK + "containerRuntimeEndpoint: unix:///run/crio/crio.sock\n"),
patchContent: []byte("containerRuntimeEndpoint: unix:///run/containerd/containerd.sock"),
expectError: false,
expectedResult: []byte("apiVersion: kubelet.config.k8s.io/v1beta1\ncontainerRuntimeEndpoint: unix:///run/containerd/containerd.sock\nkind: KubeletConfiguration\n"),
},
{
name: "invalid patch contents",
kubeletConfig: []byte(kubeletConfigGVK),
patchContent: []byte("invalid-patch-content"),
expectError: true,
},
{
name: "empty patch file",
kubeletConfig: []byte(kubeletConfigGVK),
patchContent: []byte(""),
expectError: false,
expectedResult: []byte(kubeletConfigGVK),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
output := io.Discard
// Create a temporary file to store the patch content.
patchFile, err := os.CreateTemp("", "instance-config-*.yml")
if err != nil {
t.Errorf("Error creating temporary file: %v", err)
}
defer func() {
_ = patchFile.Close()
_ = os.Remove(patchFile.Name())
}()
_, err = patchFile.Write(tt.patchContent)
if err != nil {
t.Errorf("Error writing instance config to file: %v", err)
}
// Apply the patch.
result, err := applyKubeletConfigPatchFromFile(tt.kubeletConfig, patchFile.Name(), output)
if !tt.expectError && err != nil {
t.Errorf("Unexpected error: %v", err)
}
assert.Equal(t, tt.expectedResult, result)
})
}
}
func TestApplyPatchesToConfig(t *testing.T) {
const (
expectedAddress = "barfoo"

View File

@ -29,6 +29,7 @@ import (
nodeutil "k8s.io/component-helpers/node/util"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/images"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
)
@ -37,6 +38,8 @@ type kubeletFlagsOpts struct {
nodeRegOpts *kubeadmapi.NodeRegistrationOptions
pauseImage string
registerTaintsUsingFlags bool
// TODO: remove this field once the feature NodeLocalCRISocket is GA.
criSocket string
}
// GetNodeNameAndHostname obtains the name for this Node using the following precedence
@ -64,6 +67,11 @@ func WriteKubeletDynamicEnvFile(cfg *kubeadmapi.ClusterConfiguration, nodeReg *k
nodeRegOpts: nodeReg,
pauseImage: images.GetPauseImage(cfg),
registerTaintsUsingFlags: registerTaintsUsingFlags,
criSocket: nodeReg.CRISocket,
}
if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) {
flagOpts.criSocket = ""
}
stringMap := buildKubeletArgs(flagOpts)
argList := kubeadmutil.ArgumentsToCommand(stringMap, nodeReg.KubeletExtraArgs)
@ -76,7 +84,9 @@ func WriteKubeletDynamicEnvFile(cfg *kubeadmapi.ClusterConfiguration, nodeReg *k
// that are common to both Linux and Windows
func buildKubeletArgsCommon(opts kubeletFlagsOpts) []kubeadmapi.Arg {
kubeletFlags := []kubeadmapi.Arg{}
kubeletFlags = append(kubeletFlags, kubeadmapi.Arg{Name: "container-runtime-endpoint", Value: opts.nodeRegOpts.CRISocket})
if opts.criSocket != "" {
kubeletFlags = append(kubeletFlags, kubeadmapi.Arg{Name: "container-runtime-endpoint", Value: opts.criSocket})
}
// This flag passes the pod infra container image (e.g. "pause" image) to the kubelet
// and prevents its garbage collection

View File

@ -37,11 +37,11 @@ func TestBuildKubeletArgs(t *testing.T) {
name: "hostname override",
opts: kubeletFlagsOpts{
nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{
CRISocket: "unix:///var/run/containerd/containerd.sock",
KubeletExtraArgs: []kubeadmapi.Arg{
{Name: "hostname-override", Value: "override-name"},
},
},
criSocket: "unix:///var/run/containerd/containerd.sock",
},
expected: []kubeadmapi.Arg{
{Name: "container-runtime-endpoint", Value: "unix:///var/run/containerd/containerd.sock"},
@ -52,7 +52,6 @@ func TestBuildKubeletArgs(t *testing.T) {
name: "register with taints",
opts: kubeletFlagsOpts{
nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{
CRISocket: "unix:///var/run/containerd/containerd.sock",
Taints: []v1.Taint{
{
Key: "foo",
@ -66,6 +65,7 @@ func TestBuildKubeletArgs(t *testing.T) {
},
},
},
criSocket: "unix:///var/run/containerd/containerd.sock",
registerTaintsUsingFlags: true,
},
expected: []kubeadmapi.Arg{
@ -76,10 +76,9 @@ func TestBuildKubeletArgs(t *testing.T) {
{
name: "pause image is set",
opts: kubeletFlagsOpts{
nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{
CRISocket: "unix:///var/run/containerd/containerd.sock",
},
pauseImage: "registry.k8s.io/pause:ver",
nodeRegOpts: &kubeadmapi.NodeRegistrationOptions{},
criSocket: "unix:///var/run/containerd/containerd.sock",
pauseImage: "registry.k8s.io/pause:ver",
},
expected: []kubeadmapi.Arg{
{Name: "container-runtime-endpoint", Value: "unix:///var/run/containerd/containerd.sock"},

View File

@ -53,20 +53,20 @@ type PatchTarget struct {
// PatchManager defines an object that can apply patches.
type PatchManager struct {
patchSets []*patchSet
patchSets []*PatchSet
knownTargets []string
output io.Writer
}
// patchSet defines a set of patches of a certain type that can patch a PatchTarget.
type patchSet struct {
// PatchSet defines a set of patches of a certain type that can patch a PatchTarget.
type PatchSet struct {
targetName string
patchType types.PatchType
patches []string
}
// String() is used for unit-testing.
func (ps *patchSet) String() string {
func (ps *PatchSet) String() string {
return fmt.Sprintf(
"{%q, %q, %#v}",
ps.targetName,
@ -113,6 +113,15 @@ func KnownTargets() []string {
return knownTargets
}
// NewPatchManager creates a patch manager that can be used to apply patches to "knownTargets".
func NewPatchManager(patchSets []*PatchSet, knownTargets []string, output io.Writer) *PatchManager {
return &PatchManager{
patchSets: patchSets,
knownTargets: knownTargets,
output: output,
}
}
// GetPatchManagerForPath creates a patch manager that can be used to apply patches to "knownTargets".
// "path" should contain patches that can be used to patch the "knownTargets".
// If "output" is non-nil, messages about actions performed by the manager would go on this io.Writer.
@ -257,8 +266,8 @@ func parseFilename(fileName string, knownTargets []string) (string, types.PatchT
return targetName, patchType, nil, nil
}
// createPatchSet creates a patchSet object, by splitting the given "data" by "\n---".
func createPatchSet(targetName string, patchType types.PatchType, data string) (*patchSet, error) {
// CreatePatchSet creates a patchSet object, by splitting the given "data" by "\n---".
func CreatePatchSet(targetName string, patchType types.PatchType, data string) (*PatchSet, error) {
var patches []string
// Split the patches and convert them to JSON.
@ -285,7 +294,7 @@ func createPatchSet(targetName string, patchType types.PatchType, data string) (
patches = append(patches, string(patchJSON))
}
return &patchSet{
return &PatchSet{
targetName: targetName,
patchType: patchType,
patches: patches,
@ -294,10 +303,10 @@ func createPatchSet(targetName string, patchType types.PatchType, data string) (
// getPatchSetsFromPath walks a path, ignores sub-directories and non-patch files, and
// returns a list of patchFile objects.
func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Writer) ([]*patchSet, []string, []string, error) {
func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Writer) ([]*PatchSet, []string, []string, error) {
patchFiles := []string{}
ignoredFiles := []string{}
patchSets := []*patchSet{}
patchSets := []*PatchSet{}
// Check if targetPath is a directory.
info, err := os.Lstat(targetPath)
@ -349,7 +358,7 @@ func getPatchSetsFromPath(targetPath string, knownTargets []string, output io.Wr
}
// Create a patchSet object.
patchSet, err := createPatchSet(targetName, patchType, string(data))
patchSet, err := CreatePatchSet(targetName, patchType, string(data))
if err != nil {
return err
}

View File

@ -120,7 +120,7 @@ func TestCreatePatchSet(t *testing.T) {
name string
targetName string
patchType types.PatchType
expectedPatchSet *patchSet
expectedPatchSet *PatchSet
data string
}{
{
@ -129,7 +129,7 @@ func TestCreatePatchSet(t *testing.T) {
targetName: "etcd",
patchType: types.StrategicMergePatchType,
data: "foo: bar\n---\nfoo: baz\n",
expectedPatchSet: &patchSet{
expectedPatchSet: &PatchSet{
targetName: "etcd",
patchType: types.StrategicMergePatchType,
patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`},
@ -140,7 +140,7 @@ func TestCreatePatchSet(t *testing.T) {
targetName: "etcd",
patchType: types.StrategicMergePatchType,
data: `{"foo":"bar"}` + "\n---\n" + `{"foo":"baz"}`,
expectedPatchSet: &patchSet{
expectedPatchSet: &PatchSet{
targetName: "etcd",
patchType: types.StrategicMergePatchType,
patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`},
@ -151,7 +151,7 @@ func TestCreatePatchSet(t *testing.T) {
targetName: "etcd",
patchType: types.StrategicMergePatchType,
data: `{"foo":"bar"}` + "\n---\n ---\n" + `{"foo":"baz"}`,
expectedPatchSet: &patchSet{
expectedPatchSet: &PatchSet{
targetName: "etcd",
patchType: types.StrategicMergePatchType,
patches: []string{`{"foo":"bar"}`, `{"foo":"baz"}`},
@ -161,7 +161,7 @@ func TestCreatePatchSet(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ps, _ := createPatchSet(tc.targetName, tc.patchType, tc.data)
ps, _ := CreatePatchSet(tc.targetName, tc.patchType, tc.data)
if !reflect.DeepEqual(ps, tc.expectedPatchSet) {
t.Fatalf("expected patch set:\n%+v\ngot:\n%+v\n", tc.expectedPatchSet, ps)
}
@ -189,7 +189,7 @@ func TestGetPatchSetsForPath(t *testing.T) {
tests := []struct {
name string
filesToWrite []string
expectedPatchSets []*patchSet
expectedPatchSets []*PatchSet
expectedPatchFiles []string
expectedIgnoredFiles []string
expectedError bool
@ -199,7 +199,7 @@ func TestGetPatchSetsForPath(t *testing.T) {
name: "valid: patch files are sorted and non-patch files are ignored",
filesToWrite: []string{"kube-scheduler+merge.json", "kube-apiserver+json.yaml", "etcd.yaml", "foo", "bar.json"},
patchData: patchData,
expectedPatchSets: []*patchSet{
expectedPatchSets: []*PatchSet{
{
targetName: "etcd",
patchType: types.StrategicMergePatchType,
@ -225,7 +225,7 @@ func TestGetPatchSetsForPath(t *testing.T) {
filesToWrite: []string{"kube-scheduler.json"},
expectedPatchFiles: []string{},
expectedIgnoredFiles: []string{"kube-scheduler.json"},
expectedPatchSets: []*patchSet{},
expectedPatchSets: []*PatchSet{},
},
{
name: "invalid: bad patch type in filename returns and error",