diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 4435d506deb..2eae712ee42 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -49,12 +49,17 @@ type KubeletServer struct { AuthPath util.StringFlag // Deprecated -- use KubeConfig instead APIServerList []string // Deprecated -- use KubeConfig instead - RunOnce bool - // Insert a probability of random errors during calls to the master. ChaosChance float64 // Crash immediately, rather than eating panics. ReallyCrashForTesting bool + + // TODO(mtaufen): It is increasingly looking like nobody actually uses the + // Kubelet's runonce mode anymore, so it may be a candidate + // for deprecation and removal. + // If runOnce is true, the Kubelet will check the API server once for pods, + // run those in addition to the pods specified by the local manifest, and exit. + RunOnce bool } // NewKubeletServer will create a new KubeletServer with default values. diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 3d3787e66fa..d117cf97ad1 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/apis/componentconfig" v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/capabilities" @@ -59,9 +58,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" - "k8s.io/kubernetes/pkg/kubelet/eviction" - "k8s.io/kubernetes/pkg/kubelet/images" - "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/server" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" @@ -69,7 +65,7 @@ import ( "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/flock" - "k8s.io/kubernetes/pkg/util/io" + kubeio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" @@ -77,22 +73,8 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" - "k8s.io/kubernetes/pkg/volume" ) -// bootstrapping interface for kubelet, targets the initialization protocol -type KubeletBootstrap interface { - BirthCry() - StartGarbageCollection() - ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) - ListenAndServeReadOnly(address net.IP, port uint) - Run(<-chan kubetypes.PodUpdate) - RunOnce(<-chan kubetypes.PodUpdate) ([]kubelet.RunPodResult, error) -} - -// create and initialize a Kubelet instance -type KubeletBuilder func(kc *KubeletConfig) (KubeletBootstrap, *config.PodConfig, error) - // NewKubeletCommand creates a *cobra.Command object with default parameters func NewKubeletCommand() *cobra.Command { s := options.NewKubeletServer() @@ -124,188 +106,38 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API return cmd } -// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup +// UnsecuredKubeletDeps returns a KubeletDeps suitable for being run, or an error if the server setup // is not valid. It will not start any background processes, and does not include authentication/authorization -func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { - hostNetworkSources, err := kubetypes.GetValidatedSources(s.HostNetworkSources) - if err != nil { - return nil, err - } +func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) { - hostPIDSources, err := kubetypes.GetValidatedSources(s.HostPIDSources) - if err != nil { - return nil, err - } - - hostIPCSources, err := kubetypes.GetValidatedSources(s.HostIPCSources) + // Initialize the TLS Options + tlsOptions, err := InitializeTLS(&s.KubeletConfiguration) if err != nil { return nil, err } mounter := mount.New() - var writer io.Writer = &io.StdWriter{} + var writer kubeio.Writer = &kubeio.StdWriter{} if s.Containerized { glog.V(2).Info("Running kubelet in containerized mode (experimental)") mounter = mount.NewNsenterMounter() - writer = &io.NsenterWriter{} + writer = &kubeio.NsenterWriter{} } - tlsOptions, err := InitializeTLS(s) - if err != nil { - return nil, err - } - - var dockerExecHandler dockertools.ExecHandler - switch s.DockerExecHandlerName { - case "native": - dockerExecHandler = &dockertools.NativeExecHandler{} - case "nsenter": - dockerExecHandler = &dockertools.NsenterExecHandler{} - default: - glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) - dockerExecHandler = &dockertools.NativeExecHandler{} - } - - imageGCPolicy := images.ImageGCPolicy{ - MinAge: s.ImageMinimumGCAge.Duration, - HighThresholdPercent: int(s.ImageGCHighThresholdPercent), - LowThresholdPercent: int(s.ImageGCLowThresholdPercent), - } - - diskSpacePolicy := kubelet.DiskSpacePolicy{ - DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB), - RootFreeDiskMB: int(s.LowDiskSpaceThresholdMB), - } - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - reservation, err := parseReservation(s.KubeReserved, s.SystemReserved) - if err != nil { - return nil, err - } - - thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod, s.EvictionMinimumReclaim) - if err != nil { - return nil, err - } - evictionConfig := eviction.Config{ - PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration, - MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod), - Thresholds: thresholds, - } - - if s.MakeIPTablesUtilChains { - if s.IPTablesMasqueradeBit > 31 || s.IPTablesMasqueradeBit < 0 { - return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]") - } - if s.IPTablesDropBit > 31 || s.IPTablesDropBit < 0 { - return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]") - } - if s.IPTablesDropBit == s.IPTablesMasqueradeBit { - return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different") - } - } - - return &KubeletConfig{ - Address: net.ParseIP(s.Address), - AllowPrivileged: s.AllowPrivileged, - Auth: nil, // default does not enforce auth[nz] - CAdvisorInterface: nil, // launches background processes, not set here - VolumeStatsAggPeriod: s.VolumeStatsAggPeriod.Duration, - CgroupRoot: s.CgroupRoot, - Cloud: nil, // cloud provider might start background processes - ClusterDNS: net.ParseIP(s.ClusterDNS), - ClusterDomain: s.ClusterDomain, - ConfigFile: s.PodManifestPath, - ConfigureCBR0: s.ConfigureCBR0, - ContainerManager: nil, - ContainerRuntime: s.ContainerRuntime, - RemoteRuntimeEndpoint: s.RemoteRuntimeEndpoint, - RemoteImageEndpoint: s.RemoteImageEndpoint, - RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration, - CPUCFSQuota: s.CPUCFSQuota, - DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. - RuntimeCgroups: s.RuntimeCgroups, - DockerExecHandler: dockerExecHandler, - EnableControllerAttachDetach: s.EnableControllerAttachDetach, - EnableCustomMetrics: s.EnableCustomMetrics, - EnableDebuggingHandlers: s.EnableDebuggingHandlers, - CgroupsPerQOS: s.CgroupsPerQOS, - EnableServer: s.EnableServer, - EventBurst: int(s.EventBurst), - EventRecordQPS: float32(s.EventRecordQPS), - FileCheckFrequency: s.FileCheckFrequency.Duration, - HostnameOverride: s.HostnameOverride, - HostNetworkSources: hostNetworkSources, - HostPIDSources: hostPIDSources, - HostIPCSources: hostIPCSources, - HTTPCheckFrequency: s.HTTPCheckFrequency.Duration, - ImageGCPolicy: imageGCPolicy, - KubeClient: nil, - ManifestURL: s.ManifestURL, - ManifestURLHeader: manifestURLHeader, - MasterServiceNamespace: s.MasterServiceNamespace, - MaxContainerCount: int(s.MaxContainerCount), - MaxOpenFiles: uint64(s.MaxOpenFiles), - MaxPerPodContainerCount: int(s.MaxPerPodContainerCount), - MaxPods: int(s.MaxPods), - NvidiaGPUs: int(s.NvidiaGPUs), - MinimumGCAge: s.MinimumGCAge.Duration, - Mounter: mounter, - NetworkPluginName: s.NetworkPluginName, - NetworkPluginMTU: int(s.NetworkPluginMTU), - NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir), - NodeLabels: s.NodeLabels, - NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency.Duration, - NonMasqueradeCIDR: s.NonMasqueradeCIDR, - OOMAdjuster: oom.NewOOMAdjuster(), - OSInterface: kubecontainer.RealOS{}, - PodCIDR: s.PodCIDR, - ReconcileCIDR: s.ReconcileCIDR, - PodInfraContainerImage: s.PodInfraContainerImage, - Port: uint(s.Port), - ReadOnlyPort: uint(s.ReadOnlyPort), - RegisterNode: s.RegisterNode, - RegisterSchedulable: s.RegisterSchedulable, - RegistryBurst: int(s.RegistryBurst), - RegistryPullQPS: float64(s.RegistryPullQPS), - ResolverConfig: s.ResolverConfig, - Reservation: *reservation, - KubeletCgroups: s.KubeletCgroups, - RktPath: s.RktPath, - RktAPIEndpoint: s.RktAPIEndpoint, - RktStage1Image: s.RktStage1Image, - RootDirectory: s.RootDirectory, - SeccompProfileRoot: s.SeccompProfileRoot, - Runonce: s.RunOnce, - SerializeImagePulls: s.SerializeImagePulls, - StandaloneMode: (len(s.APIServerList) == 0), - StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout.Duration, - SyncFrequency: s.SyncFrequency.Duration, - AllowedUnsafeSysctls: s.AllowedUnsafeSysctls, - SystemCgroups: s.SystemCgroups, - TLSOptions: tlsOptions, - Writer: writer, - VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), - OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency.Duration, - HairpinMode: s.HairpinMode, - BabysitDaemons: s.BabysitDaemons, - ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay, - NodeIP: net.ParseIP(s.NodeIP), - EvictionConfig: evictionConfig, - PodsPerCore: int(s.PodsPerCore), - ProtectKernelDefaults: s.ProtectKernelDefaults, - MakeIPTablesUtilChains: s.MakeIPTablesUtilChains, - iptablesMasqueradeBit: int(s.IPTablesMasqueradeBit), - iptablesDropBit: int(s.IPTablesDropBit), + return &kubelet.KubeletDeps{ + Auth: nil, // default does not enforce auth[nz] + CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here + Cloud: nil, // cloud provider might start background processes + ContainerManager: nil, + DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. + KubeClient: nil, + Mounter: mounter, + NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir), + OOMAdjuster: oom.NewOOMAdjuster(), + OSInterface: kubecontainer.RealOS{}, + Writer: writer, + VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), + TLSOptions: tlsOptions, }, nil } @@ -322,7 +154,7 @@ func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) { } // Tries to download the kubelet- configmap from "kube-system" namespace via the API server and returns a JSON string or error -func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) { +func getRemoteKubeletConfig(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (string, error) { // TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request kubeClient, err := getKubeClient(s) if err != nil { @@ -333,8 +165,8 @@ func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (stri var nodename string hostname := nodeutil.GetHostname(s.HostnameOverride) - if kcfg != nil && kcfg.Cloud != nil { - instances, ok := kcfg.Cloud.Instances() + if kubeDeps != nil && kubeDeps.Cloud != nil { + instances, ok := kubeDeps.Cloud.Instances() if !ok { err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.") return nil, err @@ -420,13 +252,14 @@ func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletCo } } -// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit. -// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer. -// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults -// will be ignored. -func Run(s *options.KubeletServer, kcfg *KubeletConfig) error { - if err := run(s, kcfg); err != nil { +// Run runs the specified KubeletServer with the given KubeletDeps. This should never exit. +// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer. +// Otherwise, the caller is assumed to have set up the KubeletDeps object and a default one will +// not be generated. +func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error { + if err := run(s, kubeDeps); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) + } return nil } @@ -456,7 +289,10 @@ func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, err return cz, err } -func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { +func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { + // TODO: this should be replaced by a --standalone flag + standaloneMode := (len(s.APIServerList) == 0) + if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") } @@ -477,37 +313,31 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { // Register current configuration with /configz endpoint cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration) + if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() { + // Look for config on the API server. If it exists, replace s.KubeletConfiguration + // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config. - if kcfg == nil { - if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() { - // Look for config on the API server. If it exists, replace s.KubeletConfiguration - // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config. - - // Don't do dynamic Kubelet configuration in runonce mode - if s.RunOnce == false { - // For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb - // any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig). - remoteKC, err := initKubeletConfigSync(s) - if err == nil { - // Update s (KubeletServer) with new config from API server - s.KubeletConfiguration = *remoteKC - // Ensure that /configz is up to date with the new config - if cfgzErr != nil { - glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr) - } else { - setConfigz(cfgz, &s.KubeletConfiguration) - } + // Don't do dynamic Kubelet configuration in runonce mode + if s.RunOnce == false { + remoteKC, err := initKubeletConfigSync(s) + if err == nil { + // Update s (KubeletServer) with new config from API server + s.KubeletConfiguration = *remoteKC + // Ensure that /configz is up to date with the new config + if cfgzErr != nil { + glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr) + } else { + setConfigz(cfgz, &s.KubeletConfiguration) } } } + } + if kubeDeps == nil { var kubeClient, eventClient *clientset.Clientset - var autoDetectCloudProvider bool var cloud cloudprovider.Interface - if s.CloudProvider == v1alpha1.AutoDetectCloudProvider { - autoDetectCloudProvider = true - } else { + if s.CloudProvider != v1alpha1.AutoDetectCloudProvider { cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return err @@ -532,7 +362,6 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { clientConfig, err := CreateAPIServerClientConfig(s) if err == nil { kubeClient, err = clientset.NewForConfig(clientConfig) - // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) @@ -543,43 +372,40 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { if s.RequireKubeConfig { return fmt.Errorf("invalid kubeconfig: %v", err) } - // TODO: this should be replaced by a --standalone flag - if len(s.APIServerList) > 0 { + if standaloneMode { glog.Warningf("No API client: %v", err) } } - cfg, err := UnsecuredKubeletConfig(s) + kubeDeps, err = UnsecuredKubeletDeps(s) if err != nil { return err } - kcfg = cfg - kcfg.AutoDetectCloudProvider = autoDetectCloudProvider - kcfg.Cloud = cloud - kcfg.KubeClient = kubeClient - kcfg.EventClient = eventClient + kubeDeps.Cloud = cloud + kubeDeps.KubeClient = kubeClient + kubeDeps.EventClient = eventClient } - if kcfg.CAdvisorInterface == nil { - kcfg.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), kcfg.ContainerRuntime) + if kubeDeps.CAdvisorInterface == nil { + kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime) if err != nil { return err } } - if kcfg.ContainerManager == nil { - if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" { + if kubeDeps.ContainerManager == nil { + if s.SystemCgroups != "" && s.CgroupRoot == "" { return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified") } - kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{ - RuntimeCgroupsName: kcfg.RuntimeCgroups, - SystemCgroupsName: kcfg.SystemCgroups, - KubeletCgroupsName: kcfg.KubeletCgroups, - ContainerRuntime: kcfg.ContainerRuntime, - CgroupsPerQOS: kcfg.CgroupsPerQOS, - CgroupRoot: kcfg.CgroupRoot, - ProtectKernelDefaults: kcfg.ProtectKernelDefaults, + kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{ + RuntimeCgroupsName: s.RuntimeCgroups, + SystemCgroupsName: s.SystemCgroups, + KubeletCgroupsName: s.KubeletCgroups, + ContainerRuntime: s.ContainerRuntime, + CgroupsPerQOS: s.CgroupsPerQOS, + CgroupRoot: s.CgroupRoot, + ProtectKernelDefaults: s.ProtectKernelDefaults, }) if err != nil { return err @@ -595,12 +421,12 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { rand.Seed(time.Now().UTC().UnixNano()) // TODO(vmarmol): Do this through container config. - oomAdjuster := kcfg.OOMAdjuster + oomAdjuster := kubeDeps.OOMAdjuster if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { glog.Warning(err) } - if err := RunKubelet(kcfg); err != nil { + if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil { return err } @@ -646,15 +472,15 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (string, error) // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed // certificate and key file are generated. Returns a configured server.TLSOptions object. -func InitializeTLS(s *options.KubeletServer) (*server.TLSOptions, error) { - if s.TLSCertFile == "" && s.TLSPrivateKeyFile == "" { - s.TLSCertFile = path.Join(s.CertDirectory, "kubelet.crt") - s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "kubelet.key") - if !crypto.FoundCertOrKey(s.TLSCertFile, s.TLSPrivateKeyFile) { - if err := crypto.GenerateSelfSignedCert(nodeutil.GetHostname(s.HostnameOverride), s.TLSCertFile, s.TLSPrivateKeyFile, nil, nil); err != nil { +func InitializeTLS(kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) { + if kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" { + kc.TLSCertFile = path.Join(kc.CertDirectory, "kubelet.crt") + kc.TLSPrivateKeyFile = path.Join(kc.CertDirectory, "kubelet.key") + if !crypto.FoundCertOrKey(kc.TLSCertFile, kc.TLSPrivateKeyFile) { + if err := crypto.GenerateSelfSignedCert(nodeutil.GetHostname(kc.HostnameOverride), kc.TLSCertFile, kc.TLSPrivateKeyFile, nil, nil); err != nil { return nil, fmt.Errorf("unable to generate self signed cert: %v", err) } - glog.V(4).Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile) + glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile) } } tlsOptions := &server.TLSOptions{ @@ -666,8 +492,8 @@ func InitializeTLS(s *options.KubeletServer) (*server.TLSOptions, error) { // Populate PeerCertificates in requests, but don't yet reject connections without certificates. ClientAuth: tls.RequestClientCert, }, - CertFile: s.TLSCertFile, - KeyFile: s.TLSPrivateKeyFile, + CertFile: kc.TLSCertFile, + KeyFile: kc.TLSPrivateKeyFile, } return tlsOptions, nil } @@ -778,151 +604,79 @@ func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config) } } -// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client. -// Under the hood it calls RunKubelet (below) -func SimpleKubelet(client *clientset.Clientset, - dockerClient dockertools.DockerInterface, - hostname, rootDir, manifestURL, address string, - port uint, - readOnlyPort uint, - masterServiceNamespace string, - volumePlugins []volume.VolumePlugin, - tlsOptions *server.TLSOptions, - cadvisorInterface cadvisor.Interface, - configFilePath string, - cloud cloudprovider.Interface, - osInterface kubecontainer.OSInterface, - fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency, outOfDiskTransitionFrequency, evictionPressureTransitionPeriod time.Duration, - maxPods int, podsPerCore int, - containerManager cm.ContainerManager, clusterDNS net.IP) *KubeletConfig { - imageGCPolicy := images.ImageGCPolicy{ - HighThresholdPercent: 90, - LowThresholdPercent: 80, - } - diskSpacePolicy := kubelet.DiskSpacePolicy{ - DockerFreeDiskMB: 256, - RootFreeDiskMB: 256, - } - evictionConfig := eviction.Config{ - PressureTransitionPeriod: evictionPressureTransitionPeriod, - } - - c := componentconfig.KubeletConfiguration{} - kcfg := KubeletConfig{ - Address: net.ParseIP(address), - CAdvisorInterface: cadvisorInterface, - VolumeStatsAggPeriod: time.Minute, - CgroupRoot: "", - Cloud: cloud, - ClusterDNS: clusterDNS, - ConfigFile: configFilePath, - ContainerManager: containerManager, - ContainerRuntime: "docker", - CPUCFSQuota: true, - DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockerClient, - RuntimeCgroups: "", - DockerExecHandler: &dockertools.NativeExecHandler{}, - EnableControllerAttachDetach: false, - EnableCustomMetrics: false, - EnableDebuggingHandlers: true, - EnableServer: true, - CgroupsPerQOS: false, - FileCheckFrequency: fileCheckFrequency, - // Since this kubelet runs with --configure-cbr0=false, it needs to use - // hairpin-veth to allow hairpin packets. Note that this deviates from - // what the "real" kubelet currently does, because there's no way to - // set promiscuous mode on docker0. - HairpinMode: componentconfig.HairpinVeth, - HostnameOverride: hostname, - HTTPCheckFrequency: httpCheckFrequency, - ImageGCPolicy: imageGCPolicy, - KubeClient: client, - ManifestURL: manifestURL, - MasterServiceNamespace: masterServiceNamespace, - MaxContainerCount: 100, - MaxOpenFiles: 1024, - MaxPerPodContainerCount: 2, - MaxPods: maxPods, - NvidiaGPUs: 0, - MinimumGCAge: minimumGCAge, - Mounter: mount.New(), - NodeStatusUpdateFrequency: nodeStatusUpdateFrequency, - OOMAdjuster: oom.NewFakeOOMAdjuster(), - OSInterface: osInterface, - PodInfraContainerImage: c.PodInfraContainerImage, - Port: port, - ReadOnlyPort: readOnlyPort, - RegisterNode: true, - RegisterSchedulable: true, - RegistryBurst: 10, - RegistryPullQPS: 5.0, - ResolverConfig: kubetypes.ResolvConfDefault, - KubeletCgroups: "/kubelet", - RootDirectory: rootDir, - SerializeImagePulls: true, - SyncFrequency: syncFrequency, - SystemCgroups: "", - TLSOptions: tlsOptions, - VolumePlugins: volumePlugins, - Writer: &io.StdWriter{}, - OutOfDiskTransitionFrequency: outOfDiskTransitionFrequency, - EvictionConfig: evictionConfig, - PodsPerCore: podsPerCore, - ProtectKernelDefaults: false, - } - return &kcfg -} - // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications: // 1 Integration tests // 2 Kubelet binary // 3 Standalone 'kubernetes' binary // Eventually, #2 will be replaced with instances of #3 -func RunKubelet(kcfg *KubeletConfig) error { - kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride) - - if len(kcfg.NodeName) == 0 { - nodeName, err := getNodeName(kcfg.Cloud, kcfg.Hostname) - if err != nil { - return err - } - kcfg.NodeName = nodeName +func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error { + hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride) + // Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil + nodeName, err := getNodeName(kubeDeps.Cloud, hostname) + if err != nil { + return err } eventBroadcaster := record.NewBroadcaster() - kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName}) + kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: nodeName}) eventBroadcaster.StartLogging(glog.V(3).Infof) - if kcfg.EventClient != nil { + if kubeDeps.EventClient != nil { glog.V(4).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")}) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { glog.Warning("No api server defined - no events will be sent to API server.") } - privilegedSources := capabilities.PrivilegedSources{ - HostNetworkSources: kcfg.HostNetworkSources, - HostPIDSources: kcfg.HostPIDSources, - HostIPCSources: kcfg.HostIPCSources, + // TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig, + // so that I could remove the associated fields from KubeletConfig. I would + // prefer this to be done as part of an independent validation step on the + // KubeletConfiguration. But as far as I can tell, we don't have an explicit + // place for validation of the KubeletConfiguration yet. + hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources) + if err != nil { + return err } - capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0) - credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) - glog.V(2).Infof("Using root directory: %v", kcfg.RootDirectory) + hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources) + if err != nil { + return err + } - builder := kcfg.Builder + hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources) + if err != nil { + return err + } + + privilegedSources := capabilities.PrivilegedSources{ + HostNetworkSources: hostNetworkSources, + HostPIDSources: hostPIDSources, + HostIPCSources: hostIPCSources, + } + capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0) + + credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory) + glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory) + + builder := kubeDeps.Builder if builder == nil { builder = CreateAndInitKubelet } - if kcfg.OSInterface == nil { - kcfg.OSInterface = kubecontainer.RealOS{} + if kubeDeps.OSInterface == nil { + kubeDeps.OSInterface = kubecontainer.RealOS{} } - k, podCfg, err := builder(kcfg) + k, err := builder(kubeCfg, kubeDeps, standaloneMode) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } - rlimit.RlimitNumFiles(kcfg.MaxOpenFiles) + // NewMainKubelet should have set up a pod source config if one didn't exist + // when the builder was run. This is just a precaution. + if kubeDeps.PodConfig == nil { + return fmt.Errorf("failed to create kubelet, pod source config was nil!") + } + podCfg := kubeDeps.PodConfig + + rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles)) // TODO(dawnchen): remove this once we deprecated old debian containervm images. // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726 @@ -961,313 +715,52 @@ func RunKubelet(kcfg *KubeletConfig) error { } // process pods and exit. - if kcfg.Runonce { + if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { - startKubelet(k, podCfg, kcfg) + err := startKubelet(k, podCfg, kubeCfg, kubeDeps) + if err != nil { + return err + } glog.Infof("Started kubelet %s", version.Get().String()) } return nil } -func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { +func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error { // start the kubelet go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // start the kubelet server - if kc.EnableServer { + if kubeCfg.EnableServer { go wait.Until(func() { - k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers) + k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers) }, 0, wait.NeverStop) } - if kc.ReadOnlyPort > 0 { + if kubeCfg.ReadOnlyPort > 0 { go wait.Until(func() { - k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort) + k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) }, 0, wait.NeverStop) } + + return nil } -func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { - // source of all configuration - cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder) - - // define file config source - if kc.ConfigFile != "" { - glog.Infof("Adding manifest file: %v", kc.ConfigFile) - config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubetypes.FileSource)) - } - - // define url config source - if kc.ManifestURL != "" { - glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader) - config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubetypes.HTTPSource)) - } - if kc.KubeClient != nil { - glog.Infof("Watching apiserver") - config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubetypes.ApiserverSource)) - } - return cfg -} - -// KubeletConfig is all of the parameters necessary for running a kubelet. -// TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring. -type KubeletConfig struct { - Address net.IP - AllowPrivileged bool - Auth server.AuthInterface - AutoDetectCloudProvider bool - Builder KubeletBuilder - CAdvisorInterface cadvisor.Interface - VolumeStatsAggPeriod time.Duration - CgroupRoot string - Cloud cloudprovider.Interface - ClusterDNS net.IP - ClusterDomain string - ConfigFile string - ConfigureCBR0 bool - ContainerManager cm.ContainerManager - ContainerRuntime string - RemoteRuntimeEndpoint string - RemoteImageEndpoint string - RuntimeRequestTimeout time.Duration - CPUCFSQuota bool - DiskSpacePolicy kubelet.DiskSpacePolicy - DockerClient dockertools.DockerInterface - RuntimeCgroups string - DockerExecHandler dockertools.ExecHandler - EnableControllerAttachDetach bool - EnableCustomMetrics bool - EnableDebuggingHandlers bool - CgroupsPerQOS bool - EnableServer bool - EventClient *clientset.Clientset - EventBurst int - EventRecordQPS float32 - FileCheckFrequency time.Duration - Hostname string - HostnameOverride string - HostNetworkSources []string - HostPIDSources []string - HostIPCSources []string - HTTPCheckFrequency time.Duration - ImageGCPolicy images.ImageGCPolicy - KubeClient *clientset.Clientset - ManifestURL string - ManifestURLHeader http.Header - MasterServiceNamespace string - MaxContainerCount int - MaxOpenFiles uint64 - MaxPerPodContainerCount int - MaxPods int - MinimumGCAge time.Duration - Mounter mount.Interface - NetworkPluginName string - NetworkPluginMTU int - NetworkPlugins []network.NetworkPlugin - NodeName string - NodeLabels map[string]string - NodeStatusUpdateFrequency time.Duration - NonMasqueradeCIDR string - NvidiaGPUs int - OOMAdjuster *oom.OOMAdjuster - OSInterface kubecontainer.OSInterface - PodCIDR string - PodsPerCore int - ReconcileCIDR bool - PodConfig *config.PodConfig - PodInfraContainerImage string - Port uint - ReadOnlyPort uint - Recorder record.EventRecorder - RegisterNode bool - RegisterSchedulable bool - RegistryBurst int - RegistryPullQPS float64 - Reservation kubetypes.Reservation - ResolverConfig string - KubeletCgroups string - RktPath string - RktAPIEndpoint string - RktStage1Image string - RootDirectory string - Runonce bool - SeccompProfileRoot string - SerializeImagePulls bool - StandaloneMode bool - StreamingConnectionIdleTimeout time.Duration - SyncFrequency time.Duration - AllowedUnsafeSysctls []string - SystemCgroups string - TLSOptions *server.TLSOptions - Writer io.Writer - VolumePlugins []volume.VolumePlugin - OutOfDiskTransitionFrequency time.Duration - EvictionConfig eviction.Config - - ExperimentalFlannelOverlay bool - NodeIP net.IP - ContainerRuntimeOptions []kubecontainer.Option - HairpinMode string - BabysitDaemons bool - Options []kubelet.Option - ProtectKernelDefaults bool - MakeIPTablesUtilChains bool - iptablesMasqueradeBit int - iptablesDropBit int -} - -func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { +func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations - // TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods - // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing - // a nil pointer to it when what we really want is a nil interface. - var kubeClient clientset.Interface - if kc.KubeClient != nil { - kubeClient = kc.KubeClient - // TODO: remove this when we've refactored kubelet to only use clientset. - } - - gcPolicy := kubecontainer.ContainerGCPolicy{ - MinAge: kc.MinimumGCAge, - MaxPerPodContainer: kc.MaxPerPodContainerCount, - MaxContainers: kc.MaxContainerCount, - } - - daemonEndpoints := &api.NodeDaemonEndpoints{ - KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)}, - } - - pc = kc.PodConfig - if pc == nil { - pc = makePodSourceConfig(kc) - } - k, err = kubelet.NewMainKubelet( - kc.Hostname, - kc.NodeName, - kc.DockerClient, - kubeClient, - kc.RootDirectory, - kc.SeccompProfileRoot, - kc.PodInfraContainerImage, - kc.SyncFrequency, - float32(kc.RegistryPullQPS), - kc.RegistryBurst, - kc.EventRecordQPS, - kc.EventBurst, - gcPolicy, - pc.SeenAllSources, - kc.RegisterNode, - kc.RegisterSchedulable, - kc.StandaloneMode, - kc.ClusterDomain, - kc.ClusterDNS, - kc.MasterServiceNamespace, - kc.VolumePlugins, - kc.NetworkPlugins, - kc.NetworkPluginName, - kc.NetworkPluginMTU, - kc.StreamingConnectionIdleTimeout, - kc.Recorder, - kc.CAdvisorInterface, - kc.ImageGCPolicy, - kc.DiskSpacePolicy, - kc.Cloud, - kc.AutoDetectCloudProvider, - kc.NodeLabels, - kc.NodeStatusUpdateFrequency, - kc.OSInterface, - kc.CgroupsPerQOS, - kc.CgroupRoot, - kc.ContainerRuntime, - kc.RemoteRuntimeEndpoint, - kc.RemoteImageEndpoint, - kc.RuntimeRequestTimeout, - kc.RktPath, - kc.RktAPIEndpoint, - kc.RktStage1Image, - kc.Mounter, - kc.Writer, - kc.ConfigureCBR0, - kc.NonMasqueradeCIDR, - kc.PodCIDR, - kc.ReconcileCIDR, - kc.MaxPods, - kc.PodsPerCore, - kc.NvidiaGPUs, - kc.DockerExecHandler, - kc.ResolverConfig, - kc.CPUCFSQuota, - daemonEndpoints, - kc.OOMAdjuster, - kc.SerializeImagePulls, - kc.ContainerManager, - kc.OutOfDiskTransitionFrequency, - kc.ExperimentalFlannelOverlay, - kc.NodeIP, - kc.Reservation, - kc.EnableCustomMetrics, - kc.VolumeStatsAggPeriod, - kc.ContainerRuntimeOptions, - kc.HairpinMode, - kc.BabysitDaemons, - kc.EvictionConfig, - kc.Options, - kc.EnableControllerAttachDetach, - kc.MakeIPTablesUtilChains, - kc.iptablesMasqueradeBit, - kc.iptablesDropBit, - kc.AllowedUnsafeSysctls, - ) + k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode) if err != nil { - return nil, nil, err + return nil, err } k.BirthCry() k.StartGarbageCollection() - return k, pc, nil -} - -func parseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) { - reservation := new(kubetypes.Reservation) - rl, err := parseResourceList(kubeReserved) - if err != nil { - return nil, err - } - reservation.Kubernetes = rl - - rl, err = parseResourceList(systemReserved) - if err != nil { - return nil, err - } - reservation.System = rl - - return reservation, nil -} - -func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) { - rl := make(api.ResourceList) - for k, v := range m { - switch api.ResourceName(k) { - // Only CPU and memory resources are supported. - case api.ResourceCPU, api.ResourceMemory: - q, err := resource.ParseQuantity(v) - if err != nil { - return nil, err - } - if q.Sign() == -1 { - return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) - } - rl[api.ResourceName(k)] = q - default: - return nil, fmt.Errorf("cannot reserve %q resource", k) - } - } - return rl, nil + return k, nil } diff --git a/cmd/kubelet/app/server_test.go b/cmd/kubelet/app/server_test.go index 6c93750c7ec..478e16fb446 100644 --- a/cmd/kubelet/app/server_test.go +++ b/cmd/kubelet/app/server_test.go @@ -19,6 +19,7 @@ package app import ( "testing" + "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/util/config" ) @@ -56,7 +57,7 @@ func TestValueOfAllocatableResources(t *testing.T) { kubeReservedCM.Set(test.kubeReserved) systemReservedCM.Set(test.systemReserved) - _, err := parseReservation(kubeReservedCM, systemReservedCM) + _, err := kubelet.ParseReservation(kubeReservedCM, systemReservedCM) if err != nil { t.Logf("%s: error returned: %v", test.name, err) } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 39fc4749a49..90d184fc10b 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/podutil" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/fields" @@ -158,16 +159,16 @@ func (s *KubeletExecutorServer) runKubelet( } }() - kcfg, err := kubeletapp.UnsecuredKubeletConfig(s.KubeletServer) + kubeDeps, err := kubeletapp.UnsecuredKubeletDeps(s.KubeletServer) if err != nil { return err } // apply Mesos specific settings - kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { - k, pc, err := kubeletapp.CreateAndInitKubelet(kc) + kubeDeps.Builder = func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (kubelet.KubeletBootstrap, error) { + k, err := kubeletapp.CreateAndInitKubelet(kubeCfg, kubeDeps, standaloneMode) if err != nil { - return k, pc, err + return k, err } // decorate kubelet such that it shuts down when the executor is @@ -177,11 +178,10 @@ func (s *KubeletExecutorServer) runKubelet( executorDone: executorDone, } - return decorated, pc, nil + return decorated, nil } - kcfg.RuntimeCgroups = "" // don't move the docker daemon into a cgroup - kcfg.Hostname = kcfg.HostnameOverride - kcfg.KubeClient = apiclient + s.RuntimeCgroups = "" // don't move the docker daemon into a cgroup + kubeDeps.KubeClient = apiclient // taken from KubeletServer#Run(*KubeletConfig) eventClientConfig, err := kubeletapp.CreateAPIServerClientConfig(s.KubeletServer) @@ -192,16 +192,16 @@ func (s *KubeletExecutorServer) runKubelet( // make a separate client for events eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) - kcfg.EventClient, err = clientset.NewForConfig(eventClientConfig) + kubeDeps.EventClient, err = clientset.NewForConfig(eventClientConfig) if err != nil { return err } - kcfg.NodeName = kcfg.HostnameOverride - kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source - kcfg.StandaloneMode = false - kcfg.SystemCgroups = "" // don't take control over other system processes. - if kcfg.Cloud != nil { + kubeDeps.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kubeDeps.Recorder) // override the default pod source + + s.SystemCgroups = "" // don't take control over other system processes. + + if kubeDeps.Cloud != nil { // fail early and hard because having the cloud provider loaded would go unnoticed, // but break bigger cluster because accessing the state.json from every slave kills the master. panic("cloud provider must not be set") @@ -209,17 +209,17 @@ func (s *KubeletExecutorServer) runKubelet( // create custom cAdvisor interface which return the resource values that Mesos reports ni := <-nodeInfos - cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), kcfg.ContainerRuntime) + cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), s.ContainerRuntime) if err != nil { return err } - kcfg.CAdvisorInterface = cAdvisorInterface - kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface, cm.NodeConfig{ - RuntimeCgroupsName: kcfg.RuntimeCgroups, - SystemCgroupsName: kcfg.SystemCgroups, - KubeletCgroupsName: kcfg.KubeletCgroups, - ContainerRuntime: kcfg.ContainerRuntime, + kubeDeps.CAdvisorInterface = cAdvisorInterface + kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, cAdvisorInterface, cm.NodeConfig{ + RuntimeCgroupsName: s.RuntimeCgroups, + SystemCgroupsName: s.SystemCgroups, + KubeletCgroupsName: s.KubeletCgroups, + ContainerRuntime: s.ContainerRuntime, }) if err != nil { return err @@ -239,26 +239,26 @@ func (s *KubeletExecutorServer) runKubelet( containerOptions = append(containerOptions, podsource.ContainerEnvOverlay([]api.EnvVar{ {Name: envContainerID, Value: s.containerID}, })) - kcfg.ContainerRuntimeOptions = append(kcfg.ContainerRuntimeOptions, + kubeDeps.ContainerRuntimeOptions = append(kubeDeps.ContainerRuntimeOptions, dockertools.PodInfraContainerEnv(map[string]string{ envContainerID: s.containerID, })) } - podsource.Mesos(executorDone, kcfg.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...) + podsource.Mesos(executorDone, kubeDeps.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...) // create static-pods directory file source log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) + fileSourceUpdates := kubeDeps.PodConfig.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, s.HostnameOverride, s.FileCheckFrequency.Duration, fileSourceUpdates) // run the kubelet - // NOTE: because kcfg != nil holds, the upstream Run function will not + // NOTE: because kubeDeps != nil holds, the upstream Run function will not // initialize the cloud provider. We explicitly wouldn't want // that because then every kubelet instance would query the master // state.json which does not scale. s.KubeletServer.LockFilePath = "" // disable lock file - err = kubeletapp.Run(s.KubeletServer, kcfg) + err = kubeletapp.Run(s.KubeletServer, kubeDeps) return } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2bcac327d12..418e58094b9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -36,9 +36,11 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/kubernetes/pkg/api" utilpod "k8s.io/kubernetes/pkg/api/pod" + "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apis/componentconfig" + kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" @@ -78,6 +80,7 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/clock" + utilconfig "k8s.io/kubernetes/pkg/util/config" utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" @@ -85,6 +88,7 @@ import ( kubeio "k8s.io/kubernetes/pkg/util/io" utilipt "k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/mount" + nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -173,90 +177,198 @@ type SyncHandler interface { // Option is a functional option type for Kubelet type Option func(*Kubelet) +// bootstrapping interface for kubelet, targets the initialization protocol +type KubeletBootstrap interface { + GetConfiguration() *componentconfig.KubeletConfiguration + BirthCry() + StartGarbageCollection() + ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) + ListenAndServeReadOnly(address net.IP, port uint) + Run(<-chan kubetypes.PodUpdate) + RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error) +} + +// create and initialize a Kubelet instance +type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error) + +// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed +// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping +// these objects while we figure out a more comprehensive dependency injection story for the Kubelet. +type KubeletDeps struct { + // TODO(mtaufen): KubeletBuilder: + // Mesos currently uses this as a hook to let them make their own call to + // let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with + // their own KubeletBootstrap. It's a useful hook. I need to think about what + // a nice home for it would be. There seems to be a trend, between this and + // the Options fields below, of providing hooks where you can add extra functionality + // to the Kubelet for your solution. Maybe we should centralize these sorts of things? + Builder KubeletBuilder + + // TODO(mtaufen): ContainerRuntimeOptions and Options: + // Arrays of functions that can do arbitrary things to the Kubelet and the Runtime + // seem like a difficult path to trace when it's time to debug something. + // I'm leaving these fields here for now, but there is likely an easier-to-follow + // way to support their intended use cases. E.g. ContainerRuntimeOptions + // is used by Mesos to set an environment variable in containers which has + // some connection to their container GC. It seems that Mesos intends to use + // Options to add additional node conditions that are updated as part of the + // Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521). + // We should think about providing more explicit ways of doing these things. + ContainerRuntimeOptions []kubecontainer.Option + Options []Option + + // Injected Dependencies + Auth server.AuthInterface + CAdvisorInterface cadvisor.Interface + Cloud cloudprovider.Interface + ContainerManager cm.ContainerManager + DockerClient dockertools.DockerInterface + EventClient *clientset.Clientset + KubeClient *clientset.Clientset + Mounter mount.Interface + NetworkPlugins []network.NetworkPlugin + OOMAdjuster *oom.OOMAdjuster + OSInterface kubecontainer.OSInterface + PodConfig *config.PodConfig + Recorder record.EventRecorder + Writer kubeio.Writer + VolumePlugins []volume.VolumePlugin + TLSOptions *server.TLSOptions +} + +func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) { + manifestURLHeader := make(http.Header) + if kubeCfg.ManifestURLHeader != "" { + pieces := strings.Split(kubeCfg.ManifestURLHeader, ":") + if len(pieces) != 2 { + return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader) + } + manifestURLHeader.Set(pieces[0], pieces[1]) + } + + // source of all configuration + cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder) + + // define file config source + if kubeCfg.PodManifestPath != "" { + glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath) + config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) + } + + // define url config source + if kubeCfg.ManifestURL != "" { + glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader) + config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) + } + if kubeDeps.KubeClient != nil { + glog.Infof("Watching apiserver") + config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource)) + } + return cfg, nil +} + // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. -func NewMainKubelet( - hostname string, - nodeName string, - dockerClient dockertools.DockerInterface, - kubeClient clientset.Interface, - rootDirectory string, - seccompProfileRoot string, - podInfraContainerImage string, - resyncInterval time.Duration, - pullQPS float32, - pullBurst int, - eventQPS float32, - eventBurst int, - containerGCPolicy kubecontainer.ContainerGCPolicy, - sourcesReadyFn config.SourcesReadyFn, - registerNode bool, - registerSchedulable bool, - standaloneMode bool, - clusterDomain string, - clusterDNS net.IP, - masterServiceNamespace string, - volumePlugins []volume.VolumePlugin, - networkPlugins []network.NetworkPlugin, - networkPluginName string, - networkPluginMTU int, - streamingConnectionIdleTimeout time.Duration, - recorder record.EventRecorder, - cadvisorInterface cadvisor.Interface, - imageGCPolicy images.ImageGCPolicy, - diskSpacePolicy DiskSpacePolicy, - cloud cloudprovider.Interface, - autoDetectCloudProvider bool, - nodeLabels map[string]string, - nodeStatusUpdateFrequency time.Duration, - osInterface kubecontainer.OSInterface, - CgroupsPerQOS bool, - cgroupRoot string, - containerRuntime string, - remoteRuntimeEndpoint string, - remoteImageEndpoint string, - runtimeRequestTimeout time.Duration, - rktPath string, - rktAPIEndpoint string, - rktStage1Image string, - mounter mount.Interface, - writer kubeio.Writer, - configureCBR0 bool, - nonMasqueradeCIDR string, - podCIDR string, - reconcileCIDR bool, - maxPods int, - podsPerCore int, - nvidiaGPUs int, - dockerExecHandler dockertools.ExecHandler, - resolverConfig string, - cpuCFSQuota bool, - daemonEndpoints *api.NodeDaemonEndpoints, - oomAdjuster *oom.OOMAdjuster, - serializeImagePulls bool, - containerManager cm.ContainerManager, - outOfDiskTransitionFrequency time.Duration, - flannelExperimentalOverlay bool, - nodeIP net.IP, - reservation kubetypes.Reservation, - enableCustomMetrics bool, - volumeStatsAggPeriod time.Duration, - containerRuntimeOptions []kubecontainer.Option, - hairpinMode string, - babysitDaemons bool, - evictionConfig eviction.Config, - kubeOptions []Option, - enableControllerAttachDetach bool, - makeIPTablesUtilChains bool, - iptablesMasqueradeBit int, - iptablesDropBit int, - allowedUnsafeSysctls []string, -) (*Kubelet, error) { - if rootDirectory == "" { - return nil, fmt.Errorf("invalid root directory %q", rootDirectory) +func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) { + if kubeCfg.RootDirectory == "" { + return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory) } - if resyncInterval <= 0 { - return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval) + if kubeCfg.SyncFrequency.Duration <= 0 { + return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration) + } + + if kubeCfg.MakeIPTablesUtilChains { + if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 { + return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]") + } + if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 { + return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]") + } + if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit { + return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different") + } + } + + hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride) + // Query the cloud provider for our node name, default to hostname + nodeName := hostname + if kubeDeps.Cloud != nil { + var err error + instances, ok := kubeDeps.Cloud.Instances() + if !ok { + return nil, fmt.Errorf("failed to get instances from cloud provider") + } + + nodeName, err = instances.CurrentNodeName(hostname) + if err != nil { + return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err) + } + + glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName) + } + + // TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods + // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing + // a nil pointer to it when what we really want is a nil interface. + var kubeClient clientset.Interface + if kubeDeps.KubeClient != nil { + kubeClient = kubeDeps.KubeClient + // TODO: remove this when we've refactored kubelet to only use clientset. + } + + if kubeDeps.PodConfig == nil { + var err error + kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName) + if err != nil { + return nil, err + } + } + + containerGCPolicy := kubecontainer.ContainerGCPolicy{ + MinAge: kubeCfg.MinimumGCAge.Duration, + MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount), + MaxContainers: int(kubeCfg.MaxContainerCount), + } + + daemonEndpoints := &api.NodeDaemonEndpoints{ + KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port}, + } + + imageGCPolicy := images.ImageGCPolicy{ + MinAge: kubeCfg.ImageMinimumGCAge.Duration, + HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent), + LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent), + } + + diskSpacePolicy := DiskSpacePolicy{ + DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB), + RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB), + } + + thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim) + if err != nil { + return nil, err + } + evictionConfig := eviction.Config{ + PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration, + MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod), + Thresholds: thresholds, + } + + reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved) + if err != nil { + return nil, err + } + + var dockerExecHandler dockertools.ExecHandler + switch kubeCfg.DockerExecHandlerName { + case "native": + dockerExecHandler = &dockertools.NativeExecHandler{} + case "nsenter": + dockerExecHandler = &dockertools.NsenterExecHandler{} + default: + glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName) + dockerExecHandler = &dockertools.NativeExecHandler{} } serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) @@ -305,79 +417,81 @@ func NewMainKubelet( Namespace: "", } - diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy) + diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy) if err != nil { return nil, fmt.Errorf("failed to initialize disk manager: %v", err) } containerRefManager := kubecontainer.NewRefManager() - oomWatcher := NewOOMWatcher(cadvisorInterface, recorder) + oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder) - // TODO: remove when internal cbr0 implementation gets removed in favor - // of the kubenet network plugin - if networkPluginName == "kubenet" { - configureCBR0 = false - flannelExperimentalOverlay = false + // TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor + // of the kubenet network plugin + var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0 + var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay + if kubeCfg.NetworkPluginName == "kubenet" { + myConfigureCBR0 = false + myFlannelExperimentalOverlay = false } klet := &Kubelet{ hostname: hostname, nodeName: nodeName, - dockerClient: dockerClient, + dockerClient: kubeDeps.DockerClient, kubeClient: kubeClient, - rootDirectory: rootDirectory, - resyncInterval: resyncInterval, + rootDirectory: kubeCfg.RootDirectory, + resyncInterval: kubeCfg.SyncFrequency.Duration, containerRefManager: containerRefManager, httpClient: &http.Client{}, - sourcesReady: config.NewSourcesReady(sourcesReadyFn), - registerNode: registerNode, - registerSchedulable: registerSchedulable, + sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), + registerNode: kubeCfg.RegisterNode, + registerSchedulable: kubeCfg.RegisterSchedulable, standaloneMode: standaloneMode, - clusterDomain: clusterDomain, - clusterDNS: clusterDNS, + clusterDomain: kubeCfg.ClusterDomain, + clusterDNS: net.ParseIP(kubeCfg.ClusterDNS), serviceLister: serviceLister, nodeLister: nodeLister, nodeInfo: nodeInfo, - masterServiceNamespace: masterServiceNamespace, - streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, - recorder: recorder, - cadvisor: cadvisorInterface, + masterServiceNamespace: kubeCfg.MasterServiceNamespace, + streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, + recorder: kubeDeps.Recorder, + cadvisor: kubeDeps.CAdvisorInterface, diskSpaceManager: diskSpaceManager, - cloud: cloud, - autoDetectCloudProvider: autoDetectCloudProvider, + cloud: kubeDeps.Cloud, + autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider), nodeRef: nodeRef, - nodeLabels: nodeLabels, - nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, - os: osInterface, + nodeLabels: kubeCfg.NodeLabels, + nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, + os: kubeDeps.OSInterface, oomWatcher: oomWatcher, - CgroupsPerQOS: CgroupsPerQOS, - cgroupRoot: cgroupRoot, - mounter: mounter, - writer: writer, - configureCBR0: configureCBR0, - nonMasqueradeCIDR: nonMasqueradeCIDR, - reconcileCIDR: reconcileCIDR, - maxPods: maxPods, - podsPerCore: podsPerCore, - nvidiaGPUs: nvidiaGPUs, + cgroupsPerQOS: kubeCfg.CgroupsPerQOS, + cgroupRoot: kubeCfg.CgroupRoot, + mounter: kubeDeps.Mounter, + writer: kubeDeps.Writer, + configureCBR0: myConfigureCBR0, + nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR, + reconcileCIDR: kubeCfg.ReconcileCIDR, + maxPods: int(kubeCfg.MaxPods), + podsPerCore: int(kubeCfg.PodsPerCore), + nvidiaGPUs: int(kubeCfg.NvidiaGPUs), syncLoopMonitor: atomic.Value{}, - resolverConfig: resolverConfig, - cpuCFSQuota: cpuCFSQuota, + resolverConfig: kubeCfg.ResolverConfig, + cpuCFSQuota: kubeCfg.CPUCFSQuota, daemonEndpoints: daemonEndpoints, - containerManager: containerManager, - flannelExperimentalOverlay: flannelExperimentalOverlay, + containerManager: kubeDeps.ContainerManager, + flannelExperimentalOverlay: myFlannelExperimentalOverlay, flannelHelper: nil, - nodeIP: nodeIP, + nodeIP: net.ParseIP(kubeCfg.NodeIP), clock: clock.RealClock{}, - outOfDiskTransitionFrequency: outOfDiskTransitionFrequency, - reservation: reservation, - enableCustomMetrics: enableCustomMetrics, - babysitDaemons: babysitDaemons, - enableControllerAttachDetach: enableControllerAttachDetach, + outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration, + reservation: *reservation, + enableCustomMetrics: kubeCfg.EnableCustomMetrics, + babysitDaemons: kubeCfg.BabysitDaemons, + enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach, iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4), - makeIPTablesUtilChains: makeIPTablesUtilChains, - iptablesMasqueradeBit: iptablesMasqueradeBit, - iptablesDropBit: iptablesDropBit, + makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains, + iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit), + iptablesDropBit: int(kubeCfg.IPTablesDropBit), } if klet.flannelExperimentalOverlay { @@ -391,7 +505,7 @@ func NewMainKubelet( glog.Infof("Using node IP: %q", klet.nodeIP.String()) } - if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(hairpinMode), containerRuntime, configureCBR0, networkPluginName); err != nil { + if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil { // This is a non-recoverable error. Returning it up the callstack will just // lead to retries of the same failure, so just fail hard. glog.Fatalf("Invalid hairpin mode: %v", err) @@ -400,7 +514,7 @@ func NewMainKubelet( } glog.Infof("Hairpin mode set to %q", klet.hairpinMode) - if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, networkPluginMTU); err != nil { + if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { return nil, err } else { klet.networkPlugin = plug @@ -419,63 +533,63 @@ func NewMainKubelet( klet.podCache = kubecontainer.NewCache() klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) - if remoteRuntimeEndpoint != "" { - containerRuntime = "remote" + if kubeCfg.RemoteRuntimeEndpoint != "" { + kubeCfg.ContainerRuntime = "remote" - // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified - if remoteImageEndpoint == "" { - remoteImageEndpoint = remoteRuntimeEndpoint + // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified + if kubeCfg.RemoteImageEndpoint == "" { + kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint } } // Initialize the runtime. - switch containerRuntime { + switch kubeCfg.ContainerRuntime { case "docker": // Only supported one for now, continue. klet.containerRuntime = dockertools.NewDockerManager( - dockerClient, - kubecontainer.FilterEventRecorder(recorder), + kubeDeps.DockerClient, + kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, klet.podManager, machineInfo, - podInfraContainerImage, - pullQPS, - pullBurst, + kubeCfg.PodInfraContainerImage, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), containerLogsDir, - osInterface, + kubeDeps.OSInterface, klet.networkPlugin, klet, klet.httpClient, dockerExecHandler, - oomAdjuster, + kubeDeps.OOMAdjuster, procFs, klet.cpuCFSQuota, imageBackOff, - serializeImagePulls, - enableCustomMetrics, + kubeCfg.SerializeImagePulls, + kubeCfg.EnableCustomMetrics, // If using "kubenet", the Kubernetes network plugin that wraps // CNI's bridge plugin, it knows how to set the hairpin veth flag // so we tell the container runtime to back away from setting it. // If the kubelet is started with any other plugin we can't be // sure it handles the hairpin case so we instruct the docker // runtime to set the flag instead. - klet.hairpinMode == componentconfig.HairpinVeth && networkPluginName != "kubenet", - seccompProfileRoot, - containerRuntimeOptions..., + klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet", + kubeCfg.SeccompProfileRoot, + kubeDeps.ContainerRuntimeOptions..., ) case "rkt": // TODO: Include hairpin mode settings in rkt? conf := &rkt.Config{ - Path: rktPath, - Stage1Image: rktStage1Image, + Path: kubeCfg.RktPath, + Stage1Image: kubeCfg.RktStage1Image, InsecureOptions: "image,ondisk", } rktRuntime, err := rkt.New( - rktAPIEndpoint, + kubeCfg.RktAPIEndpoint, conf, klet, - recorder, + kubeDeps.Recorder, containerRefManager, klet.podManager, klet.livenessManager, @@ -485,32 +599,32 @@ func NewMainKubelet( utilexec.New(), kubecontainer.RealOS{}, imageBackOff, - serializeImagePulls, - runtimeRequestTimeout, + kubeCfg.SerializeImagePulls, + kubeCfg.RuntimeRequestTimeout.Duration, ) if err != nil { return nil, err } klet.containerRuntime = rktRuntime case "remote": - remoteRuntimeService, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout) + remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) if err != nil { return nil, err } - remoteImageService, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout) + remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) if err != nil { return nil, err } klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( - kubecontainer.FilterEventRecorder(recorder), + kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, - osInterface, + kubeDeps.OSInterface, klet.networkPlugin, klet, klet.httpClient, imageBackOff, - serializeImagePulls, + kubeCfg.SerializeImagePulls, klet.cpuCFSQuota, remoteRuntimeService, remoteImageService, @@ -519,15 +633,15 @@ func NewMainKubelet( return nil, err } default: - return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) + return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) } // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency - klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod, klet.containerRuntime) + klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime) klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) - klet.updatePodCIDR(podCIDR) + klet.updatePodCIDR(kubeCfg.PodCIDR) // setup containerGC containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy) @@ -538,7 +652,7 @@ func NewMainKubelet( klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) // setup imageManager - imageManager, err := images.NewImageGCManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy) + imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } @@ -552,25 +666,25 @@ func NewMainKubelet( klet.livenessManager, klet.runner, containerRefManager, - recorder) + kubeDeps.Recorder) klet.volumePluginMgr, err = - NewInitializedVolumePluginMgr(klet, volumePlugins) + NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins) if err != nil { return nil, err } // setup volumeManager klet.volumeManager, err = volumemanager.NewVolumeManager( - enableControllerAttachDetach, + kubeCfg.EnableControllerAttachDetach, nodeName, klet.podManager, klet.kubeClient, klet.volumePluginMgr, klet.containerRuntime, - mounter, + kubeDeps.Mounter, klet.getPodsDir(), - recorder) + kubeDeps.Recorder) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { @@ -579,14 +693,15 @@ func NewMainKubelet( klet.runtimeCache = runtimeCache klet.reasonCache = NewReasonCache() klet.workQueue = queue.NewBasicWorkQueue(klet.clock) - klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) + klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() // setup eviction manager - evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, recorder, nodeRef, klet.clock) + evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock) + if err != nil { return nil, fmt.Errorf("failed to initialize eviction manager: %v", err) } @@ -604,7 +719,7 @@ func NewMainKubelet( } // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec // Hence, we concatenate those two lists. - safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), allowedUnsafeSysctls...) + safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...) unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey) if err != nil { return nil, err @@ -614,19 +729,23 @@ func NewMainKubelet( klet.AddPodAdmitHandler(unsafeWhitelist) // enable active deadline handler - activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock) + activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock) if err != nil { return nil, err } klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler) - klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(containerRuntime)) + klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(kubeCfg.ContainerRuntime)) // apply functional Option's - for _, opt := range kubeOptions { + for _, opt := range kubeDeps.Options { opt(klet) } + + // Finally, put the most recent version of the config on the Kubelet, so + // people can see how it was configured. + klet.kubeletConfiguration = kubeCfg return klet, nil } @@ -640,6 +759,8 @@ type nodeLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { + kubeletConfiguration *componentconfig.KubeletConfiguration + hostname string nodeName string dockerClient dockertools.DockerInterface @@ -792,7 +913,7 @@ type Kubelet struct { resourceAnalyzer stats.ResourceAnalyzer // Whether or not we should have the QOS cgroup hierarchy for resource management - CgroupsPerQOS bool + cgroupsPerQOS bool // If non-empty, pass this to the container runtime as the root cgroup. cgroupRoot string @@ -2913,6 +3034,11 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16 return kl.runner.PortForward(&pod, port, stream) } +// GetConfiguration returns the KubeletConfiguration used to configure the kubelet. +func (kl *Kubelet) GetConfiguration() *componentconfig.KubeletConfiguration { + return kl.kubeletConfiguration +} + // BirthCry sends an event that the kubelet has started up. func (kl *Kubelet) BirthCry() { // Make an event that kubelet restarted. @@ -2956,3 +3082,39 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { // ContatnerRemoved doesn't affect pod state return event.Type != pleg.ContainerRemoved } + +func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) { + rl := make(api.ResourceList) + for k, v := range m { + switch api.ResourceName(k) { + // Only CPU and memory resources are supported. + case api.ResourceCPU, api.ResourceMemory: + q, err := resource.ParseQuantity(v) + if err != nil { + return nil, err + } + if q.Sign() == -1 { + return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) + } + rl[api.ResourceName(k)] = q + default: + return nil, fmt.Errorf("cannot reserve %q resource", k) + } + } + return rl, nil +} + +func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) { + reservation := new(kubetypes.Reservation) + if rl, err := parseResourceList(kubeReserved); err != nil { + return nil, err + } else { + reservation.Kubernetes = rl + } + if rl, err := parseResourceList(systemReserved); err != nil { + return nil, err + } else { + reservation.System = rl + } + return reservation, nil +} diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index 937f17ea44c..672f5685509 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -21,11 +21,18 @@ import ( kubeletapp "k8s.io/kubernetes/cmd/kubelet/app" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockertools" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + kubeio "k8s.io/kubernetes/pkg/util/io" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/volume/empty_dir" "k8s.io/kubernetes/test/utils" @@ -33,7 +40,8 @@ import ( ) type HollowKubelet struct { - KubeletConfig *kubeletapp.KubeletConfig + KubeletConfiguration *componentconfig.KubeletConfiguration + KubeletDeps *kubelet.KubeletDeps } func NewHollowKubelet( @@ -45,44 +53,116 @@ func NewHollowKubelet( containerManager cm.ContainerManager, maxPods int, podsPerCore int, ) *HollowKubelet { - testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "") - manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir) - glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir) + // ----------------- + // Static config + // ----------------- + c := GetHollowKubeletConfig(nodeName, kubeletPort, kubeletReadOnlyPort, maxPods, podsPerCore) + + // ----------------- + // Injected objects + // ----------------- + d := &kubelet.KubeletDeps{ + KubeClient: client, + DockerClient: dockerClient, + CAdvisorInterface: cadvisorInterface, + Cloud: nil, + OSInterface: &containertest.FakeOS{}, + ContainerManager: containerManager, + VolumePlugins: empty_dir.ProbeVolumePlugins(), + TLSOptions: nil, + OOMAdjuster: oom.NewFakeOOMAdjuster(), + Writer: &kubeio.StdWriter{}, + Mounter: mount.New(), + } return &HollowKubelet{ - KubeletConfig: kubeletapp.SimpleKubelet( - client, - dockerClient, - nodeName, - testRootDir, - "", /* manifest-url */ - "0.0.0.0", /* bind address */ - uint(kubeletPort), - uint(kubeletReadOnlyPort), - api.NamespaceDefault, - empty_dir.ProbeVolumePlugins(), - nil, /* tls-options */ - cadvisorInterface, - manifestFilePath, - nil, /* cloud-provider */ - &containertest.FakeOS{}, /* os-interface */ - 20*time.Second, /* FileCheckFrequency */ - 20*time.Second, /* HTTPCheckFrequency */ - 1*time.Minute, /* MinimumGCAge */ - 10*time.Second, /* NodeStatusUpdateFrequency */ - 10*time.Second, /* SyncFrequency */ - 5*time.Minute, /* OutOfDiskTransitionFrequency */ - 5*time.Minute, /* EvictionPressureTransitionPeriod */ - maxPods, - podsPerCore, - containerManager, - nil, - ), + KubeletConfiguration: c, + KubeletDeps: d, } } // Starts this HollowKubelet and blocks. func (hk *HollowKubelet) Run() { - kubeletapp.RunKubelet(hk.KubeletConfig) + kubeletapp.RunKubelet(hk.KubeletConfiguration, hk.KubeletDeps, false, false) select {} } + +// Builds a KubeletConfiguration for the HollowKubelet, ensuring that the +// usual defaults are applied for fields we do not override. +func GetHollowKubeletConfig( + nodeName string, + kubeletPort int, + kubeletReadOnlyPort int, + maxPods int, + podsPerCore int) *componentconfig.KubeletConfiguration { + + testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "") + manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir) + glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir) + + // Do the external -> internal conversion to make sure that defaults + // are set for fields not overridden in NewHollowKubelet. + tmp := &v1alpha1.KubeletConfiguration{} + c := &componentconfig.KubeletConfiguration{} + api.Scheme.Convert(tmp, c, nil) + + c.HostnameOverride = nodeName + c.RootDirectory = testRootDir + c.ManifestURL = "" + c.Address = "0.0.0.0" /* bind address */ + c.Port = int32(kubeletPort) + c.ReadOnlyPort = int32(kubeletReadOnlyPort) + c.MasterServiceNamespace = api.NamespaceDefault + c.PodManifestPath = manifestFilePath + c.FileCheckFrequency.Duration = 20 * time.Second + c.HTTPCheckFrequency.Duration = 20 * time.Second + c.MinimumGCAge.Duration = 1 * time.Minute + c.NodeStatusUpdateFrequency.Duration = 10 * time.Second + c.SyncFrequency.Duration = 10 * time.Second + c.OutOfDiskTransitionFrequency.Duration = 5 * time.Minute + c.EvictionPressureTransitionPeriod.Duration = 5 * time.Minute + c.MaxPods = int32(maxPods) + c.PodsPerCore = int32(podsPerCore) + c.ClusterDNS = "" + c.DockerExecHandlerName = "native" + c.ImageGCHighThresholdPercent = 90 + c.ImageGCLowThresholdPercent = 80 + c.LowDiskSpaceThresholdMB = 256 + c.VolumeStatsAggPeriod.Duration = time.Minute + c.CgroupRoot = "" + c.ContainerRuntime = "docker" + c.CPUCFSQuota = true + c.RuntimeCgroups = "" + c.EnableControllerAttachDetach = false + c.EnableCustomMetrics = false + c.EnableDebuggingHandlers = true + c.EnableServer = true + c.CgroupsPerQOS = false + // Since this kubelet runs with --configure-cbr0=false, it needs to use + // hairpin-veth to allow hairpin packets. Note that this deviates from + // what the "real" kubelet currently does, because there's no way to + // set promiscuous mode on docker0. + c.HairpinMode = componentconfig.HairpinVeth + c.MaxContainerCount = 100 + c.MaxOpenFiles = 1024 + c.MaxPerPodContainerCount = 2 + c.NvidiaGPUs = 0 + c.RegisterNode = true + c.RegisterSchedulable = true + c.RegistryBurst = 10 + c.RegistryPullQPS = 5.0 + c.ResolverConfig = kubetypes.ResolvConfDefault + c.KubeletCgroups = "/kubelet" + c.SerializeImagePulls = true + c.SystemCgroups = "" + c.ProtectKernelDefaults = false + + // TODO(mtaufen): Note that PodInfraContainerImage was being set to the empty value before, + // but this may not have been intentional. (previous code (SimpleKubelet) + // was peeling it off of a componentconfig.KubeletConfiguration{}, but may + // have actually wanted the default). + // The default will be present in the KubeletConfiguration contstructed above. + + return c + +}