Merge pull request #121193 from sohankunkerkar/kubelet-config-dir

Retarget drop-in kubelet configuration dir feature to Alpha
This commit is contained in:
Kubernetes Prow Robot
2023-11-03 23:59:29 +01:00
committed by GitHub
10 changed files with 265 additions and 54 deletions

View File

@@ -20,6 +20,7 @@ package app
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
@@ -34,7 +35,7 @@ import (
"time"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/imdario/mergo"
jsonpatch "github.com/evanphx/json-patch"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc/codes"
@@ -312,30 +313,34 @@ is checked every 20 seconds (also configurable with a flag).`,
// potentially overriding the previous values.
func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error {
const dropinFileExtension = ".conf"
baseKubeletConfigJSON, err := json.Marshal(kubeletConfig)
if err != nil {
return fmt.Errorf("failed to marshal base config: %w", err)
}
// Walk through the drop-in directory and update the configuration for each file
err := filepath.WalkDir(kubeletDropInConfigDir, func(path string, info fs.DirEntry, err error) error {
if err := filepath.WalkDir(kubeletDropInConfigDir, func(path string, info fs.DirEntry, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension {
dropinConfig, err := loadConfigFile(path)
dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path)
if err != nil {
return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err)
}
// Merge dropinConfig with kubeletConfig
if err := mergo.Merge(kubeletConfig, dropinConfig, mergo.WithOverride); err != nil {
return fmt.Errorf("failed to merge kubelet drop-in config, path: %s, error: %w", path, err)
mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON)
if err != nil {
return fmt.Errorf("failed to merge drop-in and current config: %w", err)
}
baseKubeletConfigJSON = mergedConfigJSON
}
return nil
})
if err != nil {
}); err != nil {
return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err)
}
if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil {
return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err)
}
return nil
}
@@ -415,6 +420,20 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e
return kc, err
}
func loadDropinConfigFileIntoJSON(name string) ([]byte, error) {
const errFmt = "failed to load drop-in kubelet config file %s, error %v"
// compute absolute path based on current working dir
kubeletConfigFile, err := filepath.Abs(name)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}
loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}
return loader.LoadIntoJSON()
}
// UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
// is not valid. It will not start any background processes, and does not include authentication/authorization
func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {

View File

@@ -21,8 +21,11 @@ import (
"path/filepath"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
@@ -71,7 +74,7 @@ func TestValueOfAllocatableResources(t *testing.T) {
func TestMergeKubeletConfigurations(t *testing.T) {
testCases := []struct {
kubeletConfig string
kubeletConfig *kubeletconfiginternal.KubeletConfiguration
dropin1 string
dropin2 string
overwrittenConfigFields map[string]interface{}
@@ -79,12 +82,14 @@ func TestMergeKubeletConfigurations(t *testing.T) {
name string
}{
{
kubeletConfig: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
port: 9080
readOnlyPort: 10257
`,
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "KubeletConfiguration",
APIVersion: "kubelet.config.k8s.io/v1beta1",
},
Port: int32(9090),
ReadOnlyPort: int32(10257),
},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
@@ -103,13 +108,15 @@ readOnlyPort: 10255
name: "kubelet.conf.d overrides kubelet.conf",
},
{
kubeletConfig: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
readOnlyPort: 10256
kubeReserved:
memory: 70Mi
`,
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "KubeletConfiguration",
APIVersion: "kubelet.config.k8s.io/v1beta1",
},
ReadOnlyPort: int32(10256),
KubeReserved: map[string]string{"memory": "100Mi"},
SyncFrequency: metav1.Duration{Duration: 5 * time.Minute},
},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
@@ -131,18 +138,19 @@ kubeReserved:
"cpu": "200m",
"memory": "100Mi",
},
"SyncFrequency": metav1.Duration{Duration: 5 * time.Minute},
},
name: "kubelet.conf.d overrides kubelet.conf with subfield override",
},
{
kubeletConfig: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
port: 9090
clusterDNS:
- 192.168.1.3
- 192.168.1.4
`,
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "KubeletConfiguration",
APIVersion: "kubelet.config.k8s.io/v1beta1",
},
Port: int32(9090),
ClusterDNS: []string{"192.168.1.3", "192.168.1.4"},
},
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
@@ -173,6 +181,7 @@ clusterDNS:
name: "kubelet.conf.d overrides kubelet.conf with slices/lists",
},
{
kubeletConfig: nil,
dropin1: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
@@ -195,13 +204,14 @@ readOnlyPort: 10255
name: "cli args override kubelet.conf.d",
},
{
kubeletConfig: `
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
port: 9090
clusterDNS:
- 192.168.1.3
`,
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "KubeletConfiguration",
APIVersion: "kubelet.config.k8s.io/v1beta1",
},
Port: int32(9090),
ClusterDNS: []string{"192.168.1.3"},
},
overwrittenConfigFields: map[string]interface{}{
"Port": int32(9090),
"ClusterDNS": []string{"192.168.1.2"},
@@ -222,12 +232,15 @@ clusterDNS:
kubeletConfig := &kubeletconfiginternal.KubeletConfiguration{}
kubeletFlags := &options.KubeletFlags{}
if len(test.kubeletConfig) > 0 {
if test.kubeletConfig != nil {
// Create the Kubeletconfig
kubeletConfFile := filepath.Join(tempDir, "kubelet.conf")
err := os.WriteFile(kubeletConfFile, []byte(test.kubeletConfig), 0644)
require.NoError(t, err, "failed to create config from a yaml file")
yamlData, err := yaml.Marshal(test.kubeletConfig) // Convert struct to YAML
require.NoError(t, err, "failed to convert kubelet config to YAML")
err = os.WriteFile(kubeletConfFile, yamlData, 0644)
require.NoError(t, err, "failed to create config from YAML data")
kubeletFlags.KubeletConfigFile = kubeletConfFile
kubeletConfig = test.kubeletConfig
}
if len(test.dropin1) > 0 || len(test.dropin2) > 0 {
// Create kubelet.conf.d directory and drop-in configuration files

2
go.mod
View File

@@ -45,7 +45,6 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.6
github.com/ishidawataru/sctp v0.0.0-20230406120618-7ff4192f6ff2
github.com/libopenstorage/openstorage v1.0.0
github.com/lithammer/dedent v1.1.0
@@ -186,6 +185,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/josharian/intern v1.0.0 // indirect

View File

@@ -31,6 +31,9 @@ import (
type Loader interface {
// Load loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be loaded
Load() (*kubeletconfig.KubeletConfiguration, error)
// LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be
// loaded. It returns the configuration as a JSON byte slice
LoadIntoJSON() ([]byte, error)
}
// fsLoader loads configuration from `configDir`
@@ -78,6 +81,20 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
return kc, nil
}
func (loader *fsLoader) LoadIntoJSON() ([]byte, error) {
data, err := loader.fs.ReadFile(loader.kubeletFile)
if err != nil {
return nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %v", loader.kubeletFile, err)
}
// no configuration is an error, some parameters are required
if len(data) == 0 {
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
}
return utilcodec.DecodeKubeletConfigurationIntoJSON(loader.kubeletCodecs, data)
}
// resolveRelativePaths makes relative paths absolute by resolving them against `root`
func resolveRelativePaths(paths []*string, root string) {
for _, path := range paths {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package codec
import (
"encoding/json"
"fmt"
"k8s.io/klog/v2"
@@ -24,6 +25,7 @@ import (
// ensure the core apis are installed
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -105,3 +107,16 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
return internalKC, nil
}
// DecodeKubeletConfigurationIntoJSON decodes a serialized KubeletConfiguration to the internal type.
func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, error) {
// The UniversalDecoder runs defaulting and returns the internal type by default.
obj, _, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{})
if err != nil {
return nil, err
}
objT := obj.(*unstructured.Unstructured)
return json.Marshal(objT.Object)
}

View File

@@ -99,13 +99,14 @@ var (
// Test suite authors can use framework/viper to make all command line
// parameters also configurable via a configuration file.
type TestContextType struct {
KubeConfig string
KubeContext string
KubeAPIContentType string
KubeletRootDir string
CertDir string
Host string
BearerToken string `datapolicy:"token"`
KubeConfig string
KubeContext string
KubeAPIContentType string
KubeletRootDir string
KubeletConfigDropinDir string
CertDir string
Host string
BearerToken string `datapolicy:"token"`
// TODO: Deprecating this over time... instead just use gobindata_util.go , see #23987.
RepoRoot string
// ListImages will list off all images that are used then quit

View File

@@ -37,6 +37,7 @@ var (
GracefulNodeShutdownBasedOnPodPriority = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("GracefulNodeShutdownBasedOnPodPriority"))
HostAccess = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("HostAccess"))
ImageID = framework.WithNodeFeature(framework.ValidNodeFeatures.Add(" ImageID"))
KubeletConfigDropInDir = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("KubeletConfigDropInDir"))
LSCIQuotaMonitoring = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("LSCIQuotaMonitoring"))
NodeAllocatable = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("NodeAllocatable"))
NodeProblemDetector = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("NodeProblemDetector"))

View File

@@ -90,6 +90,7 @@ func registerNodeFlags(flags *flag.FlagSet) {
framework.TestContext.NodeE2E = true
flags.StringVar(&framework.TestContext.BearerToken, "bearer-token", "", "The bearer token to authenticate with. If not specified, it would be a random token. Currently this token is only used in node e2e tests.")
flags.StringVar(&framework.TestContext.NodeName, "node-name", "", "Name of the node to run tests on.")
flags.StringVar(&framework.TestContext.KubeletConfigDropinDir, "config-dir", "", "Path to a directory containing drop-in configurations for the kubelet.")
// TODO(random-liu): Move kubelet start logic out of the test.
// TODO(random-liu): Move log fetch logic out of the test.
// There are different ways to start kubelet (systemd, initd, docker, manually started etc.)
@@ -200,6 +201,14 @@ func TestE2eNode(t *testing.T) {
// We're not running in a special mode so lets run tests.
gomega.RegisterFailHandler(ginkgo.Fail)
// Initialize the KubeletConfigDropinDir again if the test doesn't run in run-kubelet-mode.
if framework.TestContext.KubeletConfigDropinDir == "" {
var err error
framework.TestContext.KubeletConfigDropinDir, err = services.KubeletConfigDirCWDDir()
if err != nil {
klog.Errorf("failed to create kubelet config directory: %v", err)
}
}
reportDir := framework.TestContext.ReportDir
if reportDir != "" {
// Create the directory if it doesn't already exist

View File

@@ -0,0 +1,113 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"os"
"path/filepath"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/nodefeature"
)
var _ = SIGDescribe("Kubelet Config", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), nodefeature.KubeletConfigDropInDir, func() {
f := framework.NewDefaultFramework("kubelet-config-drop-in-dir-test")
ginkgo.Context("when merging drop-in configs", func() {
var oldcfg *kubeletconfig.KubeletConfiguration
ginkgo.BeforeEach(func(ctx context.Context) {
var err error
oldcfg, err = getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)
})
ginkgo.AfterEach(func(ctx context.Context) {
files, err := filepath.Glob(filepath.Join(framework.TestContext.KubeletConfigDropinDir, "*"+".conf"))
framework.ExpectNoError(err)
for _, file := range files {
err := os.Remove(file)
framework.ExpectNoError(err)
}
updateKubeletConfig(ctx, f, oldcfg, true)
})
ginkgo.It("should merge kubelet configs correctly", func(ctx context.Context) {
// Get the initial kubelet configuration
initialConfig, err := getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)
ginkgo.By("Stopping the kubelet")
restartKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse())
configDir := framework.TestContext.KubeletConfigDropinDir
contents := []byte(`apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
port: 10255
readOnlyPort: 10257
clusterDNS:
- 192.168.1.10
systemReserved:
memory: 1Gi`)
framework.ExpectNoError(os.WriteFile(filepath.Join(configDir, "10-kubelet.conf"), contents, 0755))
contents = []byte(`apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
clusterDNS:
- 192.168.1.1
- 192.168.1.5
- 192.168.1.8
port: 8080
cpuManagerReconcilePeriod: 0s
systemReserved:
memory: 2Gi`)
framework.ExpectNoError(os.WriteFile(filepath.Join(configDir, "20-kubelet.conf"), contents, 0755))
ginkgo.By("Restarting the kubelet")
restartKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue())
mergedConfig, err := getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)
// Replace specific fields in the initial configuration with expectedConfig values
initialConfig.Port = int32(8080) // not overridden by second file, should be retained.
initialConfig.ReadOnlyPort = int32(10257) // overridden by second file.
initialConfig.SystemReserved = map[string]string{ // overridden by map in second file.
"memory": "2Gi",
}
initialConfig.ClusterDNS = []string{"192.168.1.1", "192.168.1.5", "192.168.1.8"} // overridden by slice in second file.
// This value was explicitly set in the drop-in, make sure it is retained
initialConfig.CPUManagerReconcilePeriod = metav1.Duration{Duration: time.Duration(0)}
// Meanwhile, this value was not explicitly set, but could have been overridden by a "default" of 0 for the type.
// Ensure the true default persists.
initialConfig.CPUCFSQuotaPeriod = metav1.Duration{Duration: time.Duration(100000000)}
// Compare the expected config with the merged config
gomega.Expect(initialConfig).To(gomega.BeComparableTo(mergedConfig), "Merged kubelet config does not match the expected configuration.")
})
})
})

View File

@@ -174,6 +174,12 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
return nil, err
}
// KubeletDropInConfiguration directory path
framework.TestContext.KubeletConfigDropinDir, err = KubeletConfigDirCWDDir()
if err != nil {
return nil, err
}
// Create pod directory
podPath, err := createPodDirectory()
if err != nil {
@@ -243,6 +249,8 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
unitName = fmt.Sprintf("kubelet-%s.service", unitTimestamp)
cmdArgs = append(cmdArgs,
systemdRun,
// Set the environment variable to enable kubelet config drop-in directory.
"-E", "KUBELET_CONFIG_DROPIN_DIR_ALPHA=yes",
"-p", "Delegate=true",
"-p", logLocation+framework.TestContext.ReportDir+"/kubelet.log",
"--unit="+unitName,
@@ -282,6 +290,9 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
kc.FeatureGates = featureGates
}
// Add the KubeletDropinConfigDirectory flag if set.
cmdArgs = append(cmdArgs, "--config-dir", framework.TestContext.KubeletConfigDropinDir)
// Keep hostname override for convenience.
if framework.TestContext.NodeName != "" { // If node name is specified, set hostname override.
cmdArgs = append(cmdArgs, "--hostname-override", framework.TestContext.NodeName)
@@ -295,7 +306,7 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
cmdArgs = append(cmdArgs, "--image-service-endpoint", framework.TestContext.ImageServiceEndpoint)
}
if err := writeKubeletConfigFile(kc, kubeletConfigPath); err != nil {
if err := WriteKubeletConfigFile(kc, kubeletConfigPath); err != nil {
return nil, err
}
// add the flag to load config from a file
@@ -324,8 +335,8 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
return server, server.start()
}
// writeKubeletConfigFile writes the kubelet config file based on the args and returns the filename
func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
// WriteKubeletConfigFile writes the kubelet config file based on the args and returns the filename
func WriteKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return err
@@ -408,6 +419,18 @@ func kubeletConfigCWDPath() (string, error) {
return filepath.Join(cwd, "kubelet-config"), nil
}
func KubeletConfigDirCWDDir() (string, error) {
cwd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("failed to get current working directory: %w", err)
}
dir := filepath.Join(cwd, "kubelet.conf.d")
if err := os.MkdirAll(dir, 0755); err != nil {
return "", err
}
return dir, nil
}
// like createKubeconfig, but creates kubeconfig at current-working-directory/kubeconfig
// returns a fully-qualified path to the kubeconfig file
func createKubeconfigCWD() (string, error) {