mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #29216 from mtaufen/kconf-refactor
Automatic merge from submit-queue Refactor to simplify the hard-traveled path of the KubeletConfiguration object ### There are two main goals of this PR: - Make `NewMainKubelet` take `KubeletConfiguration` and `KubeletDeps` as its only arguments. - Finally eliminate the legacy `KubeletConfig` type. ### Why am I doing this? Long story short, I started adding an endpoint to the Kubelet to display the *current* config that the Kubelet was running with, and I realized a few things: - There were so many transformations to the configuration, in so many different places, before it was used that I wasn't confident the values initially passed in on the `KubeletConfiguration` would be the correct values to report by the time someone used the endpoint to check on them. - Trying to reconstruct a `KubeletConfiguration` object from a mix of the `Kubelet` object and the legacy `KubeletConfig` object would just add to the mess (not to mention maintenance burden), and it would be much easier if we passed the `KubeletConfiguration` all the way down to where we construct the `Kubelet` object, and then just store a reference to the `KubeletConfiguration` object on the `Kubelet` for later retrieval. - My hope is that by eliminating unnecessary internal transformations to the config information, and by consolidating the remaining ones in a single place (`NewMainKubelet`), we can have a much clearer understanding of what happens to the config before it makes it to the `Kubelet` object, and also a better ability to report up-to-date information on the status of the Kubelet. So I started cleaning things up :-). ### Discussion points It was relatively simple to get `NewMainKubelet` to just take the legacy `KubeletConfig` as its only argument, because most of its arguments were just passing through `KubeletConfig` fields or passing information that was generated solely from `KubeletConfig` fields. Completely eliminating the legacy `KubeletConfig` type has been more difficult, because the fields of the `KubeletConfiguration` do not have a one-to-one relationship with the fields of the `KubeletConfig`. While I was able to eliminate many of the `KubeletConfig` fields, I'm starting to get into the nontrivial stuff and I'd like to get a discussion started on what should happen with the remaining fields (pending cherry-picking notwithstanding). On my `kconf-refactor` branch, the legacy `KubeletConfig` object is down to the following 27 fields (from the initial 93). I'd really appreciate any guidance people have on what should happen with these fields. ``` type KubeletConfig struct { Auth server.AuthInterface AutoDetectCloudProvider bool Builder KubeletBuilder CAdvisorInterface cadvisor.Interface Cloud cloudprovider.Interface ContainerManager cm.ContainerManager DockerClient dockertools.DockerInterface EventClient *clientset.Clientset Hostname string HostNetworkSources []string HostPIDSources []string HostIPCSources []string KubeClient *clientset.Clientset Mounter mount.Interface NetworkPlugins []network.NetworkPlugin NodeName string OOMAdjuster *oom.OOMAdjuster OSInterface kubecontainer.OSInterface PodConfig *config.PodConfig Recorder record.EventRecorder Reservation kubetypes.Reservation TLSOptions *server.TLSOptions Writer kubeio.Writer VolumePlugins []volume.VolumePlugin EvictionConfig eviction.Config ContainerRuntimeOptions []kubecontainer.Option Options []Option } ``` The patterns I've seen so far with respect to eliminating `KubeletConfig` fields may be of some help: - Some fields could just be eliminated, because they were either the same on `KubeletConfiguration` or just a typecast away from being the same. - Some fields from `KubeletConfiguration` just ended up in substructures of `KubeletConfig`; it was easy to just remove those substructure fields from `KubeletConfig` and construct them using local vars in `NewMainKubelet` instead. - Some fields, e.g. `Runonce`, were able to move into the `KubeletConfiguration`. **P.S.** Part of the way I'm making the transition is by adding an extra `KubeletConfiguration` argument to functions that originally took a `KubeletConfig`, and field-by-field, switching those functions over to using information from the `KubeletConfiguration`. Once the `KubeletConfig` is gone, I'll remove the `KubeletConfig` argument, and the transition will be complete. **Final note:** Please try to keep in mind that this is not a general Kubelet cleanup effort, it is just me cleaning things up that are directly in the path of what I'm trying to do. Let's keep this focused on cleanup related to the path that config takes on it's way to the Kubelet. **Release note**: <!-- Steps to write your release note: 1. Use the release-note-* labels to set the release note state (if you have access) 2. Enter your extended release note in the below block; leaving it blank means using the PR title as the release note. If no release note is required, just write `NONE`. --> ``` Removed Flags - Removes the --auth-path flag. This has been deprecated in favor of --kubeconfig for two releases. ```
This commit is contained in:
commit
df54a28361
@ -49,12 +49,17 @@ type KubeletServer struct {
|
||||
AuthPath util.StringFlag // Deprecated -- use KubeConfig instead
|
||||
APIServerList []string // Deprecated -- use KubeConfig instead
|
||||
|
||||
RunOnce bool
|
||||
|
||||
// Insert a probability of random errors during calls to the master.
|
||||
ChaosChance float64
|
||||
// Crash immediately, rather than eating panics.
|
||||
ReallyCrashForTesting bool
|
||||
|
||||
// TODO(mtaufen): It is increasingly looking like nobody actually uses the
|
||||
// Kubelet's runonce mode anymore, so it may be a candidate
|
||||
// for deprecation and removal.
|
||||
// If runOnce is true, the Kubelet will check the API server once for pods,
|
||||
// run those in addition to the pods specified by the local manifest, and exit.
|
||||
RunOnce bool
|
||||
}
|
||||
|
||||
// NewKubeletServer will create a new KubeletServer with default values.
|
||||
@ -62,7 +67,6 @@ func NewKubeletServer() *KubeletServer {
|
||||
config := componentconfig.KubeletConfiguration{}
|
||||
api.Scheme.Convert(&v1alpha1.KubeletConfiguration{}, &config, nil)
|
||||
return &KubeletServer{
|
||||
AuthPath: util.NewStringFlag("/var/lib/kubelet/kubernetes_auth"), // deprecated
|
||||
KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),
|
||||
RequireKubeConfig: false, // in 1.5, default to true
|
||||
KubeletConfiguration: config,
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -19,6 +19,7 @@ package app
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet"
|
||||
"k8s.io/kubernetes/pkg/util/config"
|
||||
)
|
||||
|
||||
@ -56,7 +57,7 @@ func TestValueOfAllocatableResources(t *testing.T) {
|
||||
kubeReservedCM.Set(test.kubeReserved)
|
||||
systemReservedCM.Set(test.systemReserved)
|
||||
|
||||
_, err := parseReservation(kubeReservedCM, systemReservedCM)
|
||||
_, err := kubelet.ParseReservation(kubeReservedCM, systemReservedCM)
|
||||
if err != nil {
|
||||
t.Logf("%s: error returned: %v", test.name, err)
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
@ -158,16 +159,16 @@ func (s *KubeletExecutorServer) runKubelet(
|
||||
}
|
||||
}()
|
||||
|
||||
kcfg, err := kubeletapp.UnsecuredKubeletConfig(s.KubeletServer)
|
||||
kubeDeps, err := kubeletapp.UnsecuredKubeletDeps(s.KubeletServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// apply Mesos specific settings
|
||||
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
|
||||
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
|
||||
kubeDeps.Builder = func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (kubelet.KubeletBootstrap, error) {
|
||||
k, err := kubeletapp.CreateAndInitKubelet(kubeCfg, kubeDeps, standaloneMode)
|
||||
if err != nil {
|
||||
return k, pc, err
|
||||
return k, err
|
||||
}
|
||||
|
||||
// decorate kubelet such that it shuts down when the executor is
|
||||
@ -177,11 +178,10 @@ func (s *KubeletExecutorServer) runKubelet(
|
||||
executorDone: executorDone,
|
||||
}
|
||||
|
||||
return decorated, pc, nil
|
||||
return decorated, nil
|
||||
}
|
||||
kcfg.RuntimeCgroups = "" // don't move the docker daemon into a cgroup
|
||||
kcfg.Hostname = kcfg.HostnameOverride
|
||||
kcfg.KubeClient = apiclient
|
||||
s.RuntimeCgroups = "" // don't move the docker daemon into a cgroup
|
||||
kubeDeps.KubeClient = apiclient
|
||||
|
||||
// taken from KubeletServer#Run(*KubeletConfig)
|
||||
eventClientConfig, err := kubeletapp.CreateAPIServerClientConfig(s.KubeletServer)
|
||||
@ -192,16 +192,16 @@ func (s *KubeletExecutorServer) runKubelet(
|
||||
// make a separate client for events
|
||||
eventClientConfig.QPS = float32(s.EventRecordQPS)
|
||||
eventClientConfig.Burst = int(s.EventBurst)
|
||||
kcfg.EventClient, err = clientset.NewForConfig(eventClientConfig)
|
||||
kubeDeps.EventClient, err = clientset.NewForConfig(eventClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kcfg.NodeName = kcfg.HostnameOverride
|
||||
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
|
||||
kcfg.StandaloneMode = false
|
||||
kcfg.SystemCgroups = "" // don't take control over other system processes.
|
||||
if kcfg.Cloud != nil {
|
||||
kubeDeps.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kubeDeps.Recorder) // override the default pod source
|
||||
|
||||
s.SystemCgroups = "" // don't take control over other system processes.
|
||||
|
||||
if kubeDeps.Cloud != nil {
|
||||
// fail early and hard because having the cloud provider loaded would go unnoticed,
|
||||
// but break bigger cluster because accessing the state.json from every slave kills the master.
|
||||
panic("cloud provider must not be set")
|
||||
@ -209,17 +209,17 @@ func (s *KubeletExecutorServer) runKubelet(
|
||||
|
||||
// create custom cAdvisor interface which return the resource values that Mesos reports
|
||||
ni := <-nodeInfos
|
||||
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), kcfg.ContainerRuntime)
|
||||
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), s.ContainerRuntime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kcfg.CAdvisorInterface = cAdvisorInterface
|
||||
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface, cm.NodeConfig{
|
||||
RuntimeCgroupsName: kcfg.RuntimeCgroups,
|
||||
SystemCgroupsName: kcfg.SystemCgroups,
|
||||
KubeletCgroupsName: kcfg.KubeletCgroups,
|
||||
ContainerRuntime: kcfg.ContainerRuntime,
|
||||
kubeDeps.CAdvisorInterface = cAdvisorInterface
|
||||
kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, cAdvisorInterface, cm.NodeConfig{
|
||||
RuntimeCgroupsName: s.RuntimeCgroups,
|
||||
SystemCgroupsName: s.SystemCgroups,
|
||||
KubeletCgroupsName: s.KubeletCgroups,
|
||||
ContainerRuntime: s.ContainerRuntime,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -239,26 +239,26 @@ func (s *KubeletExecutorServer) runKubelet(
|
||||
containerOptions = append(containerOptions, podsource.ContainerEnvOverlay([]api.EnvVar{
|
||||
{Name: envContainerID, Value: s.containerID},
|
||||
}))
|
||||
kcfg.ContainerRuntimeOptions = append(kcfg.ContainerRuntimeOptions,
|
||||
kubeDeps.ContainerRuntimeOptions = append(kubeDeps.ContainerRuntimeOptions,
|
||||
dockertools.PodInfraContainerEnv(map[string]string{
|
||||
envContainerID: s.containerID,
|
||||
}))
|
||||
}
|
||||
|
||||
podsource.Mesos(executorDone, kcfg.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...)
|
||||
podsource.Mesos(executorDone, kubeDeps.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...)
|
||||
|
||||
// create static-pods directory file source
|
||||
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
|
||||
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
|
||||
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
|
||||
fileSourceUpdates := kubeDeps.PodConfig.Channel(kubetypes.FileSource)
|
||||
kconfig.NewSourceFile(staticPodsConfigPath, s.HostnameOverride, s.FileCheckFrequency.Duration, fileSourceUpdates)
|
||||
|
||||
// run the kubelet
|
||||
// NOTE: because kcfg != nil holds, the upstream Run function will not
|
||||
// NOTE: because kubeDeps != nil holds, the upstream Run function will not
|
||||
// initialize the cloud provider. We explicitly wouldn't want
|
||||
// that because then every kubelet instance would query the master
|
||||
// state.json which does not scale.
|
||||
s.KubeletServer.LockFilePath = "" // disable lock file
|
||||
err = kubeletapp.Run(s.KubeletServer, kcfg)
|
||||
err = kubeletapp.Run(s.KubeletServer, kubeDeps)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -36,9 +36,11 @@ import (
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
utilpod "k8s.io/kubernetes/pkg/api/pod"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
@ -78,6 +80,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/bandwidth"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
utilconfig "k8s.io/kubernetes/pkg/util/config"
|
||||
utildbus "k8s.io/kubernetes/pkg/util/dbus"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
@ -85,6 +88,7 @@ import (
|
||||
kubeio "k8s.io/kubernetes/pkg/util/io"
|
||||
utilipt "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/pkg/util/oom"
|
||||
"k8s.io/kubernetes/pkg/util/procfs"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
@ -173,90 +177,198 @@ type SyncHandler interface {
|
||||
// Option is a functional option type for Kubelet
|
||||
type Option func(*Kubelet)
|
||||
|
||||
// bootstrapping interface for kubelet, targets the initialization protocol
|
||||
type KubeletBootstrap interface {
|
||||
GetConfiguration() *componentconfig.KubeletConfiguration
|
||||
BirthCry()
|
||||
StartGarbageCollection()
|
||||
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
|
||||
ListenAndServeReadOnly(address net.IP, port uint)
|
||||
Run(<-chan kubetypes.PodUpdate)
|
||||
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
|
||||
}
|
||||
|
||||
// create and initialize a Kubelet instance
|
||||
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
|
||||
|
||||
// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
|
||||
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
|
||||
// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
|
||||
type KubeletDeps struct {
|
||||
// TODO(mtaufen): KubeletBuilder:
|
||||
// Mesos currently uses this as a hook to let them make their own call to
|
||||
// let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
|
||||
// their own KubeletBootstrap. It's a useful hook. I need to think about what
|
||||
// a nice home for it would be. There seems to be a trend, between this and
|
||||
// the Options fields below, of providing hooks where you can add extra functionality
|
||||
// to the Kubelet for your solution. Maybe we should centralize these sorts of things?
|
||||
Builder KubeletBuilder
|
||||
|
||||
// TODO(mtaufen): ContainerRuntimeOptions and Options:
|
||||
// Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
|
||||
// seem like a difficult path to trace when it's time to debug something.
|
||||
// I'm leaving these fields here for now, but there is likely an easier-to-follow
|
||||
// way to support their intended use cases. E.g. ContainerRuntimeOptions
|
||||
// is used by Mesos to set an environment variable in containers which has
|
||||
// some connection to their container GC. It seems that Mesos intends to use
|
||||
// Options to add additional node conditions that are updated as part of the
|
||||
// Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
|
||||
// We should think about providing more explicit ways of doing these things.
|
||||
ContainerRuntimeOptions []kubecontainer.Option
|
||||
Options []Option
|
||||
|
||||
// Injected Dependencies
|
||||
Auth server.AuthInterface
|
||||
CAdvisorInterface cadvisor.Interface
|
||||
Cloud cloudprovider.Interface
|
||||
ContainerManager cm.ContainerManager
|
||||
DockerClient dockertools.DockerInterface
|
||||
EventClient *clientset.Clientset
|
||||
KubeClient *clientset.Clientset
|
||||
Mounter mount.Interface
|
||||
NetworkPlugins []network.NetworkPlugin
|
||||
OOMAdjuster *oom.OOMAdjuster
|
||||
OSInterface kubecontainer.OSInterface
|
||||
PodConfig *config.PodConfig
|
||||
Recorder record.EventRecorder
|
||||
Writer kubeio.Writer
|
||||
VolumePlugins []volume.VolumePlugin
|
||||
TLSOptions *server.TLSOptions
|
||||
}
|
||||
|
||||
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) {
|
||||
manifestURLHeader := make(http.Header)
|
||||
if kubeCfg.ManifestURLHeader != "" {
|
||||
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
|
||||
if len(pieces) != 2 {
|
||||
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
|
||||
}
|
||||
manifestURLHeader.Set(pieces[0], pieces[1])
|
||||
}
|
||||
|
||||
// source of all configuration
|
||||
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
|
||||
|
||||
// define file config source
|
||||
if kubeCfg.PodManifestPath != "" {
|
||||
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
|
||||
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
|
||||
}
|
||||
|
||||
// define url config source
|
||||
if kubeCfg.ManifestURL != "" {
|
||||
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
|
||||
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
|
||||
}
|
||||
if kubeDeps.KubeClient != nil {
|
||||
glog.Infof("Watching apiserver")
|
||||
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
|
||||
// No initialization of Kubelet and its modules should happen here.
|
||||
func NewMainKubelet(
|
||||
hostname string,
|
||||
nodeName string,
|
||||
dockerClient dockertools.DockerInterface,
|
||||
kubeClient clientset.Interface,
|
||||
rootDirectory string,
|
||||
seccompProfileRoot string,
|
||||
podInfraContainerImage string,
|
||||
resyncInterval time.Duration,
|
||||
pullQPS float32,
|
||||
pullBurst int,
|
||||
eventQPS float32,
|
||||
eventBurst int,
|
||||
containerGCPolicy kubecontainer.ContainerGCPolicy,
|
||||
sourcesReadyFn config.SourcesReadyFn,
|
||||
registerNode bool,
|
||||
registerSchedulable bool,
|
||||
standaloneMode bool,
|
||||
clusterDomain string,
|
||||
clusterDNS net.IP,
|
||||
masterServiceNamespace string,
|
||||
volumePlugins []volume.VolumePlugin,
|
||||
networkPlugins []network.NetworkPlugin,
|
||||
networkPluginName string,
|
||||
networkPluginMTU int,
|
||||
streamingConnectionIdleTimeout time.Duration,
|
||||
recorder record.EventRecorder,
|
||||
cadvisorInterface cadvisor.Interface,
|
||||
imageGCPolicy images.ImageGCPolicy,
|
||||
diskSpacePolicy DiskSpacePolicy,
|
||||
cloud cloudprovider.Interface,
|
||||
autoDetectCloudProvider bool,
|
||||
nodeLabels map[string]string,
|
||||
nodeStatusUpdateFrequency time.Duration,
|
||||
osInterface kubecontainer.OSInterface,
|
||||
CgroupsPerQOS bool,
|
||||
cgroupRoot string,
|
||||
containerRuntime string,
|
||||
remoteRuntimeEndpoint string,
|
||||
remoteImageEndpoint string,
|
||||
runtimeRequestTimeout time.Duration,
|
||||
rktPath string,
|
||||
rktAPIEndpoint string,
|
||||
rktStage1Image string,
|
||||
mounter mount.Interface,
|
||||
writer kubeio.Writer,
|
||||
configureCBR0 bool,
|
||||
nonMasqueradeCIDR string,
|
||||
podCIDR string,
|
||||
reconcileCIDR bool,
|
||||
maxPods int,
|
||||
podsPerCore int,
|
||||
nvidiaGPUs int,
|
||||
dockerExecHandler dockertools.ExecHandler,
|
||||
resolverConfig string,
|
||||
cpuCFSQuota bool,
|
||||
daemonEndpoints *api.NodeDaemonEndpoints,
|
||||
oomAdjuster *oom.OOMAdjuster,
|
||||
serializeImagePulls bool,
|
||||
containerManager cm.ContainerManager,
|
||||
outOfDiskTransitionFrequency time.Duration,
|
||||
flannelExperimentalOverlay bool,
|
||||
nodeIP net.IP,
|
||||
reservation kubetypes.Reservation,
|
||||
enableCustomMetrics bool,
|
||||
volumeStatsAggPeriod time.Duration,
|
||||
containerRuntimeOptions []kubecontainer.Option,
|
||||
hairpinMode string,
|
||||
babysitDaemons bool,
|
||||
evictionConfig eviction.Config,
|
||||
kubeOptions []Option,
|
||||
enableControllerAttachDetach bool,
|
||||
makeIPTablesUtilChains bool,
|
||||
iptablesMasqueradeBit int,
|
||||
iptablesDropBit int,
|
||||
allowedUnsafeSysctls []string,
|
||||
) (*Kubelet, error) {
|
||||
if rootDirectory == "" {
|
||||
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
||||
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
|
||||
if kubeCfg.RootDirectory == "" {
|
||||
return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
|
||||
}
|
||||
if resyncInterval <= 0 {
|
||||
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
|
||||
if kubeCfg.SyncFrequency.Duration <= 0 {
|
||||
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
|
||||
}
|
||||
|
||||
if kubeCfg.MakeIPTablesUtilChains {
|
||||
if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
|
||||
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
|
||||
}
|
||||
if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
|
||||
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
|
||||
}
|
||||
if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
|
||||
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
|
||||
}
|
||||
}
|
||||
|
||||
hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
|
||||
// Query the cloud provider for our node name, default to hostname
|
||||
nodeName := hostname
|
||||
if kubeDeps.Cloud != nil {
|
||||
var err error
|
||||
instances, ok := kubeDeps.Cloud.Instances()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to get instances from cloud provider")
|
||||
}
|
||||
|
||||
nodeName, err = instances.CurrentNodeName(hostname)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
|
||||
}
|
||||
|
||||
// TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods
|
||||
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
|
||||
// a nil pointer to it when what we really want is a nil interface.
|
||||
var kubeClient clientset.Interface
|
||||
if kubeDeps.KubeClient != nil {
|
||||
kubeClient = kubeDeps.KubeClient
|
||||
// TODO: remove this when we've refactored kubelet to only use clientset.
|
||||
}
|
||||
|
||||
if kubeDeps.PodConfig == nil {
|
||||
var err error
|
||||
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
containerGCPolicy := kubecontainer.ContainerGCPolicy{
|
||||
MinAge: kubeCfg.MinimumGCAge.Duration,
|
||||
MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
|
||||
MaxContainers: int(kubeCfg.MaxContainerCount),
|
||||
}
|
||||
|
||||
daemonEndpoints := &api.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port},
|
||||
}
|
||||
|
||||
imageGCPolicy := images.ImageGCPolicy{
|
||||
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
|
||||
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
|
||||
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
|
||||
}
|
||||
|
||||
diskSpacePolicy := DiskSpacePolicy{
|
||||
DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
|
||||
RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
|
||||
}
|
||||
|
||||
thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evictionConfig := eviction.Config{
|
||||
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
|
||||
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
|
||||
Thresholds: thresholds,
|
||||
}
|
||||
|
||||
reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var dockerExecHandler dockertools.ExecHandler
|
||||
switch kubeCfg.DockerExecHandlerName {
|
||||
case "native":
|
||||
dockerExecHandler = &dockertools.NativeExecHandler{}
|
||||
case "nsenter":
|
||||
dockerExecHandler = &dockertools.NsenterExecHandler{}
|
||||
default:
|
||||
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
|
||||
dockerExecHandler = &dockertools.NativeExecHandler{}
|
||||
}
|
||||
|
||||
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
@ -305,79 +417,81 @@ func NewMainKubelet(
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
|
||||
diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
|
||||
}
|
||||
containerRefManager := kubecontainer.NewRefManager()
|
||||
|
||||
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
|
||||
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
|
||||
|
||||
// TODO: remove when internal cbr0 implementation gets removed in favor
|
||||
// of the kubenet network plugin
|
||||
if networkPluginName == "kubenet" {
|
||||
configureCBR0 = false
|
||||
flannelExperimentalOverlay = false
|
||||
// TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor
|
||||
// of the kubenet network plugin
|
||||
var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0
|
||||
var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay
|
||||
if kubeCfg.NetworkPluginName == "kubenet" {
|
||||
myConfigureCBR0 = false
|
||||
myFlannelExperimentalOverlay = false
|
||||
}
|
||||
|
||||
klet := &Kubelet{
|
||||
hostname: hostname,
|
||||
nodeName: nodeName,
|
||||
dockerClient: dockerClient,
|
||||
dockerClient: kubeDeps.DockerClient,
|
||||
kubeClient: kubeClient,
|
||||
rootDirectory: rootDirectory,
|
||||
resyncInterval: resyncInterval,
|
||||
rootDirectory: kubeCfg.RootDirectory,
|
||||
resyncInterval: kubeCfg.SyncFrequency.Duration,
|
||||
containerRefManager: containerRefManager,
|
||||
httpClient: &http.Client{},
|
||||
sourcesReady: config.NewSourcesReady(sourcesReadyFn),
|
||||
registerNode: registerNode,
|
||||
registerSchedulable: registerSchedulable,
|
||||
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
|
||||
registerNode: kubeCfg.RegisterNode,
|
||||
registerSchedulable: kubeCfg.RegisterSchedulable,
|
||||
standaloneMode: standaloneMode,
|
||||
clusterDomain: clusterDomain,
|
||||
clusterDNS: clusterDNS,
|
||||
clusterDomain: kubeCfg.ClusterDomain,
|
||||
clusterDNS: net.ParseIP(kubeCfg.ClusterDNS),
|
||||
serviceLister: serviceLister,
|
||||
nodeLister: nodeLister,
|
||||
nodeInfo: nodeInfo,
|
||||
masterServiceNamespace: masterServiceNamespace,
|
||||
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
|
||||
recorder: recorder,
|
||||
cadvisor: cadvisorInterface,
|
||||
masterServiceNamespace: kubeCfg.MasterServiceNamespace,
|
||||
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
|
||||
recorder: kubeDeps.Recorder,
|
||||
cadvisor: kubeDeps.CAdvisorInterface,
|
||||
diskSpaceManager: diskSpaceManager,
|
||||
cloud: cloud,
|
||||
autoDetectCloudProvider: autoDetectCloudProvider,
|
||||
cloud: kubeDeps.Cloud,
|
||||
autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider),
|
||||
nodeRef: nodeRef,
|
||||
nodeLabels: nodeLabels,
|
||||
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
|
||||
os: osInterface,
|
||||
nodeLabels: kubeCfg.NodeLabels,
|
||||
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
|
||||
os: kubeDeps.OSInterface,
|
||||
oomWatcher: oomWatcher,
|
||||
CgroupsPerQOS: CgroupsPerQOS,
|
||||
cgroupRoot: cgroupRoot,
|
||||
mounter: mounter,
|
||||
writer: writer,
|
||||
configureCBR0: configureCBR0,
|
||||
nonMasqueradeCIDR: nonMasqueradeCIDR,
|
||||
reconcileCIDR: reconcileCIDR,
|
||||
maxPods: maxPods,
|
||||
podsPerCore: podsPerCore,
|
||||
nvidiaGPUs: nvidiaGPUs,
|
||||
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
|
||||
cgroupRoot: kubeCfg.CgroupRoot,
|
||||
mounter: kubeDeps.Mounter,
|
||||
writer: kubeDeps.Writer,
|
||||
configureCBR0: myConfigureCBR0,
|
||||
nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
|
||||
reconcileCIDR: kubeCfg.ReconcileCIDR,
|
||||
maxPods: int(kubeCfg.MaxPods),
|
||||
podsPerCore: int(kubeCfg.PodsPerCore),
|
||||
nvidiaGPUs: int(kubeCfg.NvidiaGPUs),
|
||||
syncLoopMonitor: atomic.Value{},
|
||||
resolverConfig: resolverConfig,
|
||||
cpuCFSQuota: cpuCFSQuota,
|
||||
resolverConfig: kubeCfg.ResolverConfig,
|
||||
cpuCFSQuota: kubeCfg.CPUCFSQuota,
|
||||
daemonEndpoints: daemonEndpoints,
|
||||
containerManager: containerManager,
|
||||
flannelExperimentalOverlay: flannelExperimentalOverlay,
|
||||
containerManager: kubeDeps.ContainerManager,
|
||||
flannelExperimentalOverlay: myFlannelExperimentalOverlay,
|
||||
flannelHelper: nil,
|
||||
nodeIP: nodeIP,
|
||||
nodeIP: net.ParseIP(kubeCfg.NodeIP),
|
||||
clock: clock.RealClock{},
|
||||
outOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
|
||||
reservation: reservation,
|
||||
enableCustomMetrics: enableCustomMetrics,
|
||||
babysitDaemons: babysitDaemons,
|
||||
enableControllerAttachDetach: enableControllerAttachDetach,
|
||||
outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
|
||||
reservation: *reservation,
|
||||
enableCustomMetrics: kubeCfg.EnableCustomMetrics,
|
||||
babysitDaemons: kubeCfg.BabysitDaemons,
|
||||
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
|
||||
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
|
||||
makeIPTablesUtilChains: makeIPTablesUtilChains,
|
||||
iptablesMasqueradeBit: iptablesMasqueradeBit,
|
||||
iptablesDropBit: iptablesDropBit,
|
||||
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
|
||||
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
|
||||
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
|
||||
}
|
||||
|
||||
if klet.flannelExperimentalOverlay {
|
||||
@ -391,7 +505,7 @@ func NewMainKubelet(
|
||||
glog.Infof("Using node IP: %q", klet.nodeIP.String())
|
||||
}
|
||||
|
||||
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(hairpinMode), containerRuntime, configureCBR0, networkPluginName); err != nil {
|
||||
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil {
|
||||
// This is a non-recoverable error. Returning it up the callstack will just
|
||||
// lead to retries of the same failure, so just fail hard.
|
||||
glog.Fatalf("Invalid hairpin mode: %v", err)
|
||||
@ -400,7 +514,7 @@ func NewMainKubelet(
|
||||
}
|
||||
glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
|
||||
|
||||
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, networkPluginMTU); err != nil {
|
||||
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
klet.networkPlugin = plug
|
||||
@ -419,63 +533,63 @@ func NewMainKubelet(
|
||||
klet.podCache = kubecontainer.NewCache()
|
||||
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
|
||||
|
||||
if remoteRuntimeEndpoint != "" {
|
||||
containerRuntime = "remote"
|
||||
if kubeCfg.RemoteRuntimeEndpoint != "" {
|
||||
kubeCfg.ContainerRuntime = "remote"
|
||||
|
||||
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
|
||||
if remoteImageEndpoint == "" {
|
||||
remoteImageEndpoint = remoteRuntimeEndpoint
|
||||
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
|
||||
if kubeCfg.RemoteImageEndpoint == "" {
|
||||
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the runtime.
|
||||
switch containerRuntime {
|
||||
switch kubeCfg.ContainerRuntime {
|
||||
case "docker":
|
||||
// Only supported one for now, continue.
|
||||
klet.containerRuntime = dockertools.NewDockerManager(
|
||||
dockerClient,
|
||||
kubecontainer.FilterEventRecorder(recorder),
|
||||
kubeDeps.DockerClient,
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
containerRefManager,
|
||||
klet.podManager,
|
||||
machineInfo,
|
||||
podInfraContainerImage,
|
||||
pullQPS,
|
||||
pullBurst,
|
||||
kubeCfg.PodInfraContainerImage,
|
||||
float32(kubeCfg.RegistryPullQPS),
|
||||
int(kubeCfg.RegistryBurst),
|
||||
containerLogsDir,
|
||||
osInterface,
|
||||
kubeDeps.OSInterface,
|
||||
klet.networkPlugin,
|
||||
klet,
|
||||
klet.httpClient,
|
||||
dockerExecHandler,
|
||||
oomAdjuster,
|
||||
kubeDeps.OOMAdjuster,
|
||||
procFs,
|
||||
klet.cpuCFSQuota,
|
||||
imageBackOff,
|
||||
serializeImagePulls,
|
||||
enableCustomMetrics,
|
||||
kubeCfg.SerializeImagePulls,
|
||||
kubeCfg.EnableCustomMetrics,
|
||||
// If using "kubenet", the Kubernetes network plugin that wraps
|
||||
// CNI's bridge plugin, it knows how to set the hairpin veth flag
|
||||
// so we tell the container runtime to back away from setting it.
|
||||
// If the kubelet is started with any other plugin we can't be
|
||||
// sure it handles the hairpin case so we instruct the docker
|
||||
// runtime to set the flag instead.
|
||||
klet.hairpinMode == componentconfig.HairpinVeth && networkPluginName != "kubenet",
|
||||
seccompProfileRoot,
|
||||
containerRuntimeOptions...,
|
||||
klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
|
||||
kubeCfg.SeccompProfileRoot,
|
||||
kubeDeps.ContainerRuntimeOptions...,
|
||||
)
|
||||
case "rkt":
|
||||
// TODO: Include hairpin mode settings in rkt?
|
||||
conf := &rkt.Config{
|
||||
Path: rktPath,
|
||||
Stage1Image: rktStage1Image,
|
||||
Path: kubeCfg.RktPath,
|
||||
Stage1Image: kubeCfg.RktStage1Image,
|
||||
InsecureOptions: "image,ondisk",
|
||||
}
|
||||
rktRuntime, err := rkt.New(
|
||||
rktAPIEndpoint,
|
||||
kubeCfg.RktAPIEndpoint,
|
||||
conf,
|
||||
klet,
|
||||
recorder,
|
||||
kubeDeps.Recorder,
|
||||
containerRefManager,
|
||||
klet.podManager,
|
||||
klet.livenessManager,
|
||||
@ -485,32 +599,32 @@ func NewMainKubelet(
|
||||
utilexec.New(),
|
||||
kubecontainer.RealOS{},
|
||||
imageBackOff,
|
||||
serializeImagePulls,
|
||||
runtimeRequestTimeout,
|
||||
kubeCfg.SerializeImagePulls,
|
||||
kubeCfg.RuntimeRequestTimeout.Duration,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klet.containerRuntime = rktRuntime
|
||||
case "remote":
|
||||
remoteRuntimeService, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout)
|
||||
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
remoteImageService, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout)
|
||||
remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
|
||||
kubecontainer.FilterEventRecorder(recorder),
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
containerRefManager,
|
||||
osInterface,
|
||||
kubeDeps.OSInterface,
|
||||
klet.networkPlugin,
|
||||
klet,
|
||||
klet.httpClient,
|
||||
imageBackOff,
|
||||
serializeImagePulls,
|
||||
kubeCfg.SerializeImagePulls,
|
||||
klet.cpuCFSQuota,
|
||||
remoteRuntimeService,
|
||||
remoteImageService,
|
||||
@ -519,15 +633,15 @@ func NewMainKubelet(
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
|
||||
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
|
||||
}
|
||||
|
||||
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
|
||||
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod, klet.containerRuntime)
|
||||
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
|
||||
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
|
||||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
|
||||
klet.updatePodCIDR(podCIDR)
|
||||
klet.updatePodCIDR(kubeCfg.PodCIDR)
|
||||
|
||||
// setup containerGC
|
||||
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
|
||||
@ -538,7 +652,7 @@ func NewMainKubelet(
|
||||
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
|
||||
|
||||
// setup imageManager
|
||||
imageManager, err := images.NewImageGCManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
|
||||
imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
|
||||
}
|
||||
@ -552,25 +666,25 @@ func NewMainKubelet(
|
||||
klet.livenessManager,
|
||||
klet.runner,
|
||||
containerRefManager,
|
||||
recorder)
|
||||
kubeDeps.Recorder)
|
||||
|
||||
klet.volumePluginMgr, err =
|
||||
NewInitializedVolumePluginMgr(klet, volumePlugins)
|
||||
NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// setup volumeManager
|
||||
klet.volumeManager, err = volumemanager.NewVolumeManager(
|
||||
enableControllerAttachDetach,
|
||||
kubeCfg.EnableControllerAttachDetach,
|
||||
nodeName,
|
||||
klet.podManager,
|
||||
klet.kubeClient,
|
||||
klet.volumePluginMgr,
|
||||
klet.containerRuntime,
|
||||
mounter,
|
||||
kubeDeps.Mounter,
|
||||
klet.getPodsDir(),
|
||||
recorder)
|
||||
kubeDeps.Recorder)
|
||||
|
||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
|
||||
if err != nil {
|
||||
@ -579,14 +693,15 @@ func NewMainKubelet(
|
||||
klet.runtimeCache = runtimeCache
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
|
||||
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
|
||||
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
|
||||
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
|
||||
|
||||
// setup eviction manager
|
||||
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, recorder, nodeRef, klet.clock)
|
||||
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
|
||||
}
|
||||
@ -604,7 +719,7 @@ func NewMainKubelet(
|
||||
}
|
||||
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
|
||||
// Hence, we concatenate those two lists.
|
||||
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
|
||||
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
|
||||
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -614,19 +729,23 @@ func NewMainKubelet(
|
||||
klet.AddPodAdmitHandler(unsafeWhitelist)
|
||||
|
||||
// enable active deadline handler
|
||||
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock)
|
||||
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
|
||||
klet.AddPodSyncHandler(activeDeadlineHandler)
|
||||
|
||||
klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(containerRuntime))
|
||||
klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(kubeCfg.ContainerRuntime))
|
||||
|
||||
// apply functional Option's
|
||||
for _, opt := range kubeOptions {
|
||||
for _, opt := range kubeDeps.Options {
|
||||
opt(klet)
|
||||
}
|
||||
|
||||
// Finally, put the most recent version of the config on the Kubelet, so
|
||||
// people can see how it was configured.
|
||||
klet.kubeletConfiguration = kubeCfg
|
||||
return klet, nil
|
||||
}
|
||||
|
||||
@ -640,6 +759,8 @@ type nodeLister interface {
|
||||
|
||||
// Kubelet is the main kubelet implementation.
|
||||
type Kubelet struct {
|
||||
kubeletConfiguration *componentconfig.KubeletConfiguration
|
||||
|
||||
hostname string
|
||||
nodeName string
|
||||
dockerClient dockertools.DockerInterface
|
||||
@ -792,7 +913,7 @@ type Kubelet struct {
|
||||
resourceAnalyzer stats.ResourceAnalyzer
|
||||
|
||||
// Whether or not we should have the QOS cgroup hierarchy for resource management
|
||||
CgroupsPerQOS bool
|
||||
cgroupsPerQOS bool
|
||||
|
||||
// If non-empty, pass this to the container runtime as the root cgroup.
|
||||
cgroupRoot string
|
||||
@ -2913,6 +3034,11 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16
|
||||
return kl.runner.PortForward(&pod, port, stream)
|
||||
}
|
||||
|
||||
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
|
||||
func (kl *Kubelet) GetConfiguration() *componentconfig.KubeletConfiguration {
|
||||
return kl.kubeletConfiguration
|
||||
}
|
||||
|
||||
// BirthCry sends an event that the kubelet has started up.
|
||||
func (kl *Kubelet) BirthCry() {
|
||||
// Make an event that kubelet restarted.
|
||||
@ -2956,3 +3082,39 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
|
||||
// ContatnerRemoved doesn't affect pod state
|
||||
return event.Type != pleg.ContainerRemoved
|
||||
}
|
||||
|
||||
func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
|
||||
rl := make(api.ResourceList)
|
||||
for k, v := range m {
|
||||
switch api.ResourceName(k) {
|
||||
// Only CPU and memory resources are supported.
|
||||
case api.ResourceCPU, api.ResourceMemory:
|
||||
q, err := resource.ParseQuantity(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if q.Sign() == -1 {
|
||||
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
|
||||
}
|
||||
rl[api.ResourceName(k)] = q
|
||||
default:
|
||||
return nil, fmt.Errorf("cannot reserve %q resource", k)
|
||||
}
|
||||
}
|
||||
return rl, nil
|
||||
}
|
||||
|
||||
func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
|
||||
reservation := new(kubetypes.Reservation)
|
||||
if rl, err := parseResourceList(kubeReserved); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
reservation.Kubernetes = rl
|
||||
}
|
||||
if rl, err := parseResourceList(systemReserved); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
reservation.System = rl
|
||||
}
|
||||
return reservation, nil
|
||||
}
|
||||
|
@ -21,11 +21,18 @@ import (
|
||||
|
||||
kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/kubelet"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
kubeio "k8s.io/kubernetes/pkg/util/io"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/util/oom"
|
||||
"k8s.io/kubernetes/pkg/volume/empty_dir"
|
||||
"k8s.io/kubernetes/test/utils"
|
||||
|
||||
@ -33,7 +40,8 @@ import (
|
||||
)
|
||||
|
||||
type HollowKubelet struct {
|
||||
KubeletConfig *kubeletapp.KubeletConfig
|
||||
KubeletConfiguration *componentconfig.KubeletConfiguration
|
||||
KubeletDeps *kubelet.KubeletDeps
|
||||
}
|
||||
|
||||
func NewHollowKubelet(
|
||||
@ -45,44 +53,116 @@ func NewHollowKubelet(
|
||||
containerManager cm.ContainerManager,
|
||||
maxPods int, podsPerCore int,
|
||||
) *HollowKubelet {
|
||||
testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "")
|
||||
manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir)
|
||||
glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir)
|
||||
// -----------------
|
||||
// Static config
|
||||
// -----------------
|
||||
c := GetHollowKubeletConfig(nodeName, kubeletPort, kubeletReadOnlyPort, maxPods, podsPerCore)
|
||||
|
||||
// -----------------
|
||||
// Injected objects
|
||||
// -----------------
|
||||
d := &kubelet.KubeletDeps{
|
||||
KubeClient: client,
|
||||
DockerClient: dockerClient,
|
||||
CAdvisorInterface: cadvisorInterface,
|
||||
Cloud: nil,
|
||||
OSInterface: &containertest.FakeOS{},
|
||||
ContainerManager: containerManager,
|
||||
VolumePlugins: empty_dir.ProbeVolumePlugins(),
|
||||
TLSOptions: nil,
|
||||
OOMAdjuster: oom.NewFakeOOMAdjuster(),
|
||||
Writer: &kubeio.StdWriter{},
|
||||
Mounter: mount.New(),
|
||||
}
|
||||
|
||||
return &HollowKubelet{
|
||||
KubeletConfig: kubeletapp.SimpleKubelet(
|
||||
client,
|
||||
dockerClient,
|
||||
nodeName,
|
||||
testRootDir,
|
||||
"", /* manifest-url */
|
||||
"0.0.0.0", /* bind address */
|
||||
uint(kubeletPort),
|
||||
uint(kubeletReadOnlyPort),
|
||||
api.NamespaceDefault,
|
||||
empty_dir.ProbeVolumePlugins(),
|
||||
nil, /* tls-options */
|
||||
cadvisorInterface,
|
||||
manifestFilePath,
|
||||
nil, /* cloud-provider */
|
||||
&containertest.FakeOS{}, /* os-interface */
|
||||
20*time.Second, /* FileCheckFrequency */
|
||||
20*time.Second, /* HTTPCheckFrequency */
|
||||
1*time.Minute, /* MinimumGCAge */
|
||||
10*time.Second, /* NodeStatusUpdateFrequency */
|
||||
10*time.Second, /* SyncFrequency */
|
||||
5*time.Minute, /* OutOfDiskTransitionFrequency */
|
||||
5*time.Minute, /* EvictionPressureTransitionPeriod */
|
||||
maxPods,
|
||||
podsPerCore,
|
||||
containerManager,
|
||||
nil,
|
||||
),
|
||||
KubeletConfiguration: c,
|
||||
KubeletDeps: d,
|
||||
}
|
||||
}
|
||||
|
||||
// Starts this HollowKubelet and blocks.
|
||||
func (hk *HollowKubelet) Run() {
|
||||
kubeletapp.RunKubelet(hk.KubeletConfig)
|
||||
kubeletapp.RunKubelet(hk.KubeletConfiguration, hk.KubeletDeps, false, false)
|
||||
select {}
|
||||
}
|
||||
|
||||
// Builds a KubeletConfiguration for the HollowKubelet, ensuring that the
|
||||
// usual defaults are applied for fields we do not override.
|
||||
func GetHollowKubeletConfig(
|
||||
nodeName string,
|
||||
kubeletPort int,
|
||||
kubeletReadOnlyPort int,
|
||||
maxPods int,
|
||||
podsPerCore int) *componentconfig.KubeletConfiguration {
|
||||
|
||||
testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "")
|
||||
manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir)
|
||||
glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir)
|
||||
|
||||
// Do the external -> internal conversion to make sure that defaults
|
||||
// are set for fields not overridden in NewHollowKubelet.
|
||||
tmp := &v1alpha1.KubeletConfiguration{}
|
||||
c := &componentconfig.KubeletConfiguration{}
|
||||
api.Scheme.Convert(tmp, c, nil)
|
||||
|
||||
c.HostnameOverride = nodeName
|
||||
c.RootDirectory = testRootDir
|
||||
c.ManifestURL = ""
|
||||
c.Address = "0.0.0.0" /* bind address */
|
||||
c.Port = int32(kubeletPort)
|
||||
c.ReadOnlyPort = int32(kubeletReadOnlyPort)
|
||||
c.MasterServiceNamespace = api.NamespaceDefault
|
||||
c.PodManifestPath = manifestFilePath
|
||||
c.FileCheckFrequency.Duration = 20 * time.Second
|
||||
c.HTTPCheckFrequency.Duration = 20 * time.Second
|
||||
c.MinimumGCAge.Duration = 1 * time.Minute
|
||||
c.NodeStatusUpdateFrequency.Duration = 10 * time.Second
|
||||
c.SyncFrequency.Duration = 10 * time.Second
|
||||
c.OutOfDiskTransitionFrequency.Duration = 5 * time.Minute
|
||||
c.EvictionPressureTransitionPeriod.Duration = 5 * time.Minute
|
||||
c.MaxPods = int32(maxPods)
|
||||
c.PodsPerCore = int32(podsPerCore)
|
||||
c.ClusterDNS = ""
|
||||
c.DockerExecHandlerName = "native"
|
||||
c.ImageGCHighThresholdPercent = 90
|
||||
c.ImageGCLowThresholdPercent = 80
|
||||
c.LowDiskSpaceThresholdMB = 256
|
||||
c.VolumeStatsAggPeriod.Duration = time.Minute
|
||||
c.CgroupRoot = ""
|
||||
c.ContainerRuntime = "docker"
|
||||
c.CPUCFSQuota = true
|
||||
c.RuntimeCgroups = ""
|
||||
c.EnableControllerAttachDetach = false
|
||||
c.EnableCustomMetrics = false
|
||||
c.EnableDebuggingHandlers = true
|
||||
c.EnableServer = true
|
||||
c.CgroupsPerQOS = false
|
||||
// Since this kubelet runs with --configure-cbr0=false, it needs to use
|
||||
// hairpin-veth to allow hairpin packets. Note that this deviates from
|
||||
// what the "real" kubelet currently does, because there's no way to
|
||||
// set promiscuous mode on docker0.
|
||||
c.HairpinMode = componentconfig.HairpinVeth
|
||||
c.MaxContainerCount = 100
|
||||
c.MaxOpenFiles = 1024
|
||||
c.MaxPerPodContainerCount = 2
|
||||
c.NvidiaGPUs = 0
|
||||
c.RegisterNode = true
|
||||
c.RegisterSchedulable = true
|
||||
c.RegistryBurst = 10
|
||||
c.RegistryPullQPS = 5.0
|
||||
c.ResolverConfig = kubetypes.ResolvConfDefault
|
||||
c.KubeletCgroups = "/kubelet"
|
||||
c.SerializeImagePulls = true
|
||||
c.SystemCgroups = ""
|
||||
c.ProtectKernelDefaults = false
|
||||
|
||||
// TODO(mtaufen): Note that PodInfraContainerImage was being set to the empty value before,
|
||||
// but this may not have been intentional. (previous code (SimpleKubelet)
|
||||
// was peeling it off of a componentconfig.KubeletConfiguration{}, but may
|
||||
// have actually wanted the default).
|
||||
// The default will be present in the KubeletConfiguration contstructed above.
|
||||
|
||||
return c
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user