mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Implement Alpha Dynamic Kubelet Configuration
See Issue #27980 and Proposal PR #29459
This commit is contained in:
parent
5d25bffffe
commit
35a63d956b
@ -40,7 +40,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||||
kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||||
"k8s.io/kubernetes/pkg/capabilities"
|
"k8s.io/kubernetes/pkg/capabilities"
|
||||||
"k8s.io/kubernetes/pkg/client/chaosclient"
|
"k8s.io/kubernetes/pkg/client/chaosclient"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
@ -64,6 +64,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server"
|
"k8s.io/kubernetes/pkg/kubelet/server"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
utilconfig "k8s.io/kubernetes/pkg/util/config"
|
utilconfig "k8s.io/kubernetes/pkg/util/config"
|
||||||
"k8s.io/kubernetes/pkg/util/configz"
|
"k8s.io/kubernetes/pkg/util/configz"
|
||||||
"k8s.io/kubernetes/pkg/util/crypto"
|
"k8s.io/kubernetes/pkg/util/crypto"
|
||||||
@ -73,7 +74,7 @@ import (
|
|||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
"k8s.io/kubernetes/pkg/util/oom"
|
"k8s.io/kubernetes/pkg/util/oom"
|
||||||
"k8s.io/kubernetes/pkg/util/rlimit"
|
"k8s.io/kubernetes/pkg/util/rlimit"
|
||||||
"k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
@ -306,6 +307,117 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
|
||||||
|
clientConfig, err := CreateAPIServerClientConfig(s)
|
||||||
|
if err == nil {
|
||||||
|
kubeClient, err := clientset.NewForConfig(clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return kubeClient, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
|
||||||
|
func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) {
|
||||||
|
// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
|
||||||
|
kubeClient, err := getKubeClient(s)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
configmap, err := func() (*api.ConfigMap, error) {
|
||||||
|
var nodename string
|
||||||
|
hostname := nodeutil.GetHostname(s.HostnameOverride)
|
||||||
|
|
||||||
|
if kcfg != nil && kcfg.Cloud != nil {
|
||||||
|
instances, ok := kcfg.Cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodename, err = instances.CurrentNodeName(hostname)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// look for kubelet-<node-name> configmap from "kube-system"
|
||||||
|
configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return configmap, nil
|
||||||
|
}
|
||||||
|
// No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
|
||||||
|
configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
|
||||||
|
}
|
||||||
|
return configmap, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// When we create the KubeletConfiguration configmap, we put a json string
|
||||||
|
// representation of the config in a `kubelet.config` key.
|
||||||
|
jsonstr, ok := configmap.Data["kubelet.config"]
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonstr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
|
||||||
|
glog.Infof("Starting Kubelet configuration sync loop")
|
||||||
|
go func() {
|
||||||
|
wait.PollInfinite(30*time.Second, func() (bool, error) {
|
||||||
|
glog.Infof("Checking API server for new Kubelet configuration.")
|
||||||
|
remoteKC, err := getRemoteKubeletConfig(s, nil)
|
||||||
|
if err == nil {
|
||||||
|
// Detect new config by comparing with the last JSON string we extracted.
|
||||||
|
if remoteKC != currentKC {
|
||||||
|
glog.Info("Found new Kubelet configuration via API server, restarting!")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
|
||||||
|
}
|
||||||
|
return false, nil // Always return (false, nil) so we poll forever.
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to check for config on the API server, return that config if we get it, and start
|
||||||
|
// a background thread that checks for updates to configs.
|
||||||
|
func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
|
||||||
|
jsonstr, err := getRemoteKubeletConfig(s, nil)
|
||||||
|
if err == nil {
|
||||||
|
// We will compare future API server config against the config we just got (jsonstr):
|
||||||
|
startKubeletConfigSyncLoop(s, jsonstr)
|
||||||
|
|
||||||
|
// Convert json from API server to external type struct, and convert that to internal type struct
|
||||||
|
extKC := v1alpha1.KubeletConfiguration{}
|
||||||
|
err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kc := componentconfig.KubeletConfiguration{}
|
||||||
|
err = api.Scheme.Convert(&extKC, &kc, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &kc, nil
|
||||||
|
} else {
|
||||||
|
// Couldn't get a configuration from the API server yet.
|
||||||
|
// Restart as soon as anything comes back from the API server.
|
||||||
|
startKubeletConfigSyncLoop(s, "")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
|
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
|
||||||
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
|
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
|
||||||
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
|
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
|
||||||
@ -326,6 +438,22 @@ func checkPermissions() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
|
||||||
|
tmp := v1alpha1.KubeletConfiguration{}
|
||||||
|
api.Scheme.Convert(kc, &tmp, nil)
|
||||||
|
cz.Set(tmp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
|
||||||
|
cz, err := configz.New("componentconfig")
|
||||||
|
if err == nil {
|
||||||
|
setConfigz(cz, kc)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("unable to register configz: %s", err)
|
||||||
|
}
|
||||||
|
return cz, err
|
||||||
|
}
|
||||||
|
|
||||||
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
||||||
if s.ExitOnLockContention && s.LockFilePath == "" {
|
if s.ExitOnLockContention && s.LockFilePath == "" {
|
||||||
return errors.New("cannot exit on lock file contention: no lock file specified")
|
return errors.New("cannot exit on lock file contention: no lock file specified")
|
||||||
@ -344,18 +472,38 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c, err := configz.New("componentconfig"); err == nil {
|
|
||||||
c.Set(s.KubeletConfiguration)
|
// Register current configuration with /configz endpoint
|
||||||
} else {
|
cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
|
||||||
glog.Errorf("unable to register configz: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if kcfg == nil {
|
if kcfg == nil {
|
||||||
|
if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
|
||||||
|
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
|
||||||
|
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
|
||||||
|
|
||||||
|
// Don't do dynamic Kubelet configuration in runonce mode
|
||||||
|
if s.RunOnce == false {
|
||||||
|
// For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb
|
||||||
|
// any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig).
|
||||||
|
remoteKC, err := initKubeletConfigSync(s)
|
||||||
|
if err == nil {
|
||||||
|
// Update s (KubeletServer) with new config from API server
|
||||||
|
s.KubeletConfiguration = *remoteKC
|
||||||
|
// Ensure that /configz is up to date with the new config
|
||||||
|
if cfgzErr != nil {
|
||||||
|
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
|
||||||
|
} else {
|
||||||
|
setConfigz(cfgz, &s.KubeletConfiguration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var kubeClient, eventClient *clientset.Clientset
|
var kubeClient, eventClient *clientset.Clientset
|
||||||
var autoDetectCloudProvider bool
|
var autoDetectCloudProvider bool
|
||||||
var cloud cloudprovider.Interface
|
var cloud cloudprovider.Interface
|
||||||
|
|
||||||
if s.CloudProvider == kubeExternal.AutoDetectCloudProvider {
|
if s.CloudProvider == v1alpha1.AutoDetectCloudProvider {
|
||||||
autoDetectCloudProvider = true
|
autoDetectCloudProvider = true
|
||||||
} else {
|
} else {
|
||||||
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||||
@ -440,7 +588,8 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
|
|||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runtime.ReallyCrash = s.ReallyCrashForTesting
|
utilruntime.ReallyCrash = s.ReallyCrashForTesting
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
|
|
||||||
// TODO(vmarmol): Do this through container config.
|
// TODO(vmarmol): Do this through container config.
|
||||||
|
@ -38,14 +38,16 @@ const (
|
|||||||
// specification of gates. Examples:
|
// specification of gates. Examples:
|
||||||
// AllAlpha=false,NewFeature=true will result in newFeature=true
|
// AllAlpha=false,NewFeature=true will result in newFeature=true
|
||||||
// AllAlpha=true,NewFeature=false will result in newFeature=false
|
// AllAlpha=true,NewFeature=false will result in newFeature=false
|
||||||
allAlphaGate = "AllAlpha"
|
allAlphaGate = "AllAlpha"
|
||||||
|
dynamicKubeletConfig = "DynamicKubeletConfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// Default values for recorded features. Every new feature gate should be
|
// Default values for recorded features. Every new feature gate should be
|
||||||
// represented here.
|
// represented here.
|
||||||
knownFeatures = map[string]featureSpec{
|
knownFeatures = map[string]featureSpec{
|
||||||
allAlphaGate: {false, alpha},
|
allAlphaGate: {false, alpha},
|
||||||
|
dynamicKubeletConfig: {false, alpha},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Special handling for a few gates.
|
// Special handling for a few gates.
|
||||||
@ -86,6 +88,7 @@ type FeatureGate interface {
|
|||||||
// MyFeature() bool
|
// MyFeature() bool
|
||||||
|
|
||||||
// TODO: Define accessors for each non-API alpha feature.
|
// TODO: Define accessors for each non-API alpha feature.
|
||||||
|
DynamicKubeletConfig() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
|
// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
|
||||||
@ -154,6 +157,11 @@ func (f *featureGate) Type() string {
|
|||||||
return "mapStringBool"
|
return "mapStringBool"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DynamicKubeletConfig returns value for dynamicKubeletConfig
|
||||||
|
func (f *featureGate) DynamicKubeletConfig() bool {
|
||||||
|
return f.lookup(dynamicKubeletConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func (f *featureGate) lookup(key string) bool {
|
func (f *featureGate) lookup(key string) bool {
|
||||||
defaultValue := f.known[key].enabled
|
defaultValue := f.known[key].enabled
|
||||||
if f.enabled != nil {
|
if f.enabled != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user