Dynamic Kubelet Configuration

Alpha implementation of the Dynamic Kubelet Configuration feature.
See the proposal doc in #29459.
This commit is contained in:
Michael Taufen
2017-04-28 15:08:57 -07:00
parent d37f2f1a5d
commit 443d58e40a
85 changed files with 6042 additions and 370 deletions

View File

@@ -28,6 +28,7 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"time"
@@ -37,8 +38,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@@ -73,6 +72,7 @@ import (
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
"k8s.io/kubernetes/pkg/kubelet/eviction"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@@ -93,7 +93,8 @@ const (
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
s := options.NewKubeletServer()
// ignore the error, as this is just for generating docs and the like
s, _ := options.NewKubeletServer()
s.AddFlags(pflag.CommandLine)
cmd := &cobra.Command{
Use: componentKubelet,
@@ -167,119 +168,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
}, nil
}
func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
clientConfig, err := CreateAPIServerClientConfig(s)
if err == nil {
kubeClient, err := clientset.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
return kubeClient, nil
}
return nil, err
}
// Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
func getRemoteKubeletConfig(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (string, error) {
// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
kubeClient, err := getKubeClient(s)
if err != nil {
return "", err
}
configmap, err := func() (*v1.ConfigMap, error) {
var nodename types.NodeName
hostname := nodeutil.GetHostname(s.HostnameOverride)
if kubeDeps != nil && kubeDeps.Cloud != nil {
instances, ok := kubeDeps.Cloud.Instances()
if !ok {
err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename")
return nil, err
}
nodename, err = instances.CurrentNodeName(hostname)
if err != nil {
err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
return nil, err
}
// look for kubelet-<node-name> configmap from "kube-system"
configmap, err := kubeClient.CoreV1().ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename), metav1.GetOptions{})
if err != nil {
return nil, err
}
return configmap, nil
}
// No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
configmap, err := kubeClient.CoreV1().ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
}
return configmap, nil
}()
if err != nil {
return "", err
}
// When we create the KubeletConfiguration configmap, we put a json string
// representation of the config in a `kubelet.config` key.
jsonstr, ok := configmap.Data["kubelet.config"]
if !ok {
return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
}
return jsonstr, nil
}
func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
glog.Infof("Starting Kubelet configuration sync loop")
go func() {
wait.PollInfinite(30*time.Second, func() (bool, error) {
glog.Infof("Checking API server for new Kubelet configuration.")
remoteKC, err := getRemoteKubeletConfig(s, nil)
if err == nil {
// Detect new config by comparing with the last JSON string we extracted.
if remoteKC != currentKC {
glog.Info("Found new Kubelet configuration via API server, restarting!")
os.Exit(0)
}
} else {
glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
}
return false, nil // Always return (false, nil) so we poll forever.
})
}()
}
// Try to check for config on the API server, return that config if we get it, and start
// a background thread that checks for updates to configs.
func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
jsonstr, err := getRemoteKubeletConfig(s, nil)
if err == nil {
// We will compare future API server config against the config we just got (jsonstr):
startKubeletConfigSyncLoop(s, jsonstr)
// Convert json from API server to external type struct, and convert that to internal type struct
extKC := componentconfigv1alpha1.KubeletConfiguration{}
err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
if err != nil {
return nil, err
}
api.Scheme.Default(&extKC)
kc := componentconfig.KubeletConfiguration{}
err = api.Scheme.Convert(&extKC, &kc, nil)
if err != nil {
return nil, err
}
return &kc, nil
} else {
// Couldn't get a configuration from the API server yet.
// Restart as soon as anything comes back from the API server.
startKubeletConfigSyncLoop(s, "")
return nil, err
}
}
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
@@ -315,27 +204,6 @@ func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, err
return cz, err
}
// validateConfig validates configuration of Kubelet and returns an error if the input configuration is invalid.
func validateConfig(s *options.KubeletServer) error {
if !s.CgroupsPerQOS && len(s.EnforceNodeAllocatable) > 0 {
return fmt.Errorf("Node Allocatable enforcement is not supported unless Cgroups Per QOS feature is turned on")
}
if s.SystemCgroups != "" && s.CgroupRoot == "" {
return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
}
for _, val := range s.EnforceNodeAllocatable {
switch val {
case cm.NodeAllocatableEnforcementKey:
case cm.SystemReservedEnforcementKey:
case cm.KubeReservedEnforcementKey:
continue
default:
return fmt.Errorf("invalid option %q specified for EnforceNodeAllocatable setting. Valid options are %q, %q or %q", val, cm.NodeAllocatableEnforcementKey, cm.SystemReservedEnforcementKey, cm.KubeReservedEnforcementKey)
}
}
return nil
}
// makeEventRecorder sets up kubeDeps.Recorder if its nil. Its a no-op otherwise.
func makeEventRecorder(s *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
@@ -353,20 +221,20 @@ func makeEventRecorder(s *componentconfig.KubeletConfiguration, kubeDeps *kubele
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
standaloneMode := true
switch {
case s.RequireKubeConfig == true:
standaloneMode = false
glog.Warningf("--require-kubeconfig is deprecated. Set --kubeconfig without using --require-kubeconfig.")
case s.KubeConfig.Provided():
standaloneMode = false
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// Obtain Kubelet Lock File
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("acquiring file lock on %q", s.LockFilePath)
@@ -381,44 +249,20 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
}
}
// Set feature gates based on the value in KubeletConfiguration
err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// Register current configuration with /configz endpoint
cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
// Don't do dynamic Kubelet configuration in runonce mode
if s.RunOnce == false {
remoteKC, err := initKubeletConfigSync(s)
if err == nil {
// Update s (KubeletServer) with new config from API server
s.KubeletConfiguration = *remoteKC
// Ensure that /configz is up to date with the new config
if cfgzErr != nil {
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
} else {
setConfigz(cfgz, &s.KubeletConfiguration)
}
// Update feature gates from the new config
err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
} else {
glog.Errorf("failed to init dynamic Kubelet configuration sync: %v", err)
}
}
_, err = initConfigz(&s.KubeletConfiguration)
if err != nil {
glog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}
// Validate configuration.
if err := validateConfig(s); err != nil {
return err
// About to get clients and such, detect standaloneMode
standaloneMode := true
switch {
case s.RequireKubeConfig == true:
standaloneMode = false
glog.Warningf("--require-kubeconfig is deprecated. Set --kubeconfig without using --require-kubeconfig.")
case s.KubeConfig.Provided():
standaloneMode = false
}
if kubeDeps == nil {
@@ -518,6 +362,12 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
}
}
// Alpha Dynamic Configuration Implementation;
// if the kubelet config controller is available, inject the latest to start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, string(nodeName))
}
if kubeDeps.Auth == nil {
auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
if err != nil {
@@ -653,8 +503,8 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(kf *options.KubeletFlags, kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
kc.TLSCertFile = path.Join(kc.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = path.Join(kc.CertDirectory, "kubelet.key")
kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
if err != nil {
@@ -795,8 +645,8 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.Kubele
}
capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(kubeFlags.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kubeFlags.RootDirectory)
builder := kubeDeps.Builder
if builder == nil {
@@ -805,7 +655,8 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.Kubele
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
k, err := builder(kubeCfg, kubeDeps, &kubeFlags.ContainerRuntimeOptions, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.ProviderID)
k, err := builder(kubeCfg, kubeDeps, &kubeFlags.ContainerRuntimeOptions, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.ProviderID, kubeFlags.CloudProvider, kubeFlags.CertDirectory, kubeFlags.RootDirectory)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
@@ -849,11 +700,20 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *compon
}
}
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, crOptions *options.ContainerRuntimeOptions, hostnameOverride, nodeIP, providerID string) (k kubelet.Bootstrap, err error) {
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration,
kubeDeps *kubelet.Dependencies,
crOptions *options.ContainerRuntimeOptions,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory string) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, hostnameOverride, nodeIP, providerID)
k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, hostnameOverride, nodeIP, providerID, cloudProvider, certDirectory, rootDirectory)
if err != nil {
return nil, err
}
@@ -896,6 +756,36 @@ func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, err
return rl, nil
}
// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
func BootstrapKubeletConfigController(flags *options.KubeletFlags,
defaultConfig *componentconfig.KubeletConfiguration) (*componentconfig.KubeletConfiguration, *kubeletconfig.Controller, error) {
var err error
// Alpha Dynamic Configuration Implementation; this section only loads config from disk, it does not contact the API server
// compute absolute paths based on current working dir
initConfigDir := ""
if flags.InitConfigDir.Provided() {
initConfigDir, err = filepath.Abs(flags.InitConfigDir.Value())
if err != nil {
return nil, nil, fmt.Errorf("failed to get absolute path for --init-config-dir")
}
}
dynamicConfigDir := ""
if flags.DynamicConfigDir.Provided() {
dynamicConfigDir, err = filepath.Abs(flags.DynamicConfigDir.Value())
if err != nil {
return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir")
}
}
// get the latest KubeletConfiguration checkpoint from disk, or load the init or default config if no valid checkpoints exist
kubeletConfigController := kubeletconfig.NewController(initConfigDir, dynamicConfigDir, defaultConfig)
kubeletConfig, err := kubeletConfigController.Bootstrap()
if err != nil {
return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
}
return kubeletConfig, kubeletConfigController, nil
}
// RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
// TODO(random-liu): Move this to a separate binary.
func RunDockershim(c *componentconfig.KubeletConfiguration, r *options.ContainerRuntimeOptions) error {