Re-use CreateAndInitKubelet

This commit is contained in:
Dr. Stefan Schimanski
2015-09-23 13:07:03 +02:00
parent 4ec703174b
commit d74950cfb9

View File

@@ -37,10 +37,8 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
)
const (
@@ -141,24 +139,75 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
return nil
}
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{},
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
// apply Messo specific settings
executorDone := make(chan struct{})
kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished)
k, pc, err := app.CreateAndInitKubelet(kc)
if err != nil {
return k, pc, err
}
klet := k.(*kubelet.Kubelet)
s.kletLock.Lock()
s.klet = klet
s.kletLock.Unlock()
// decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{
Kubelet: klet,
kubeletDone: kubeletDone,
executorDone: executorDone,
}
return decorated, pc, nil
}
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient
kcfg.NodeName = kcfg.HostnameOverride
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
kcfg.StandaloneMode = false
kcfg.SystemContainer = "" // don't take control over other system processes.
if kcfg.Cloud != nil {
// fail early and hard because having the cloud provider loaded would go unnoticed,
// but break bigger cluster because accessing the state.json from every slave kills the master.
panic("cloud provider must not be set")
}
// create main pod source
updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE)
go func() {
// execUpdates will be closed by the executor on shutdown
defer close(executorDone)
for u := range execUpdates {
u.Source = MESOS_CFG_SOURCE
updates <- u
}
}()
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
// run the kubelet, until execUpdates is closed
// NOTE: because kcfg != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master
// state.json which does not scale.
err = s.KubeletServer.Run(kcfg)
}
if err != nil {
close(kubeletFinished)
// close the channel here. When Run returns without error, the executorKubelet is
// responsible to do this. If it returns with an error, we are responsible here.
close(kubeletDone)
}
return err
}
@@ -184,7 +233,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
}
if err != nil {
// required for k8sm since we need to send api.Binding information back to the apiserver
log.Fatalf("No API client: %v", err)
return fmt.Errorf("cannot create API client: %v", err)
}
// start executor
@@ -205,182 +254,3 @@ func defaultBindingAddress() string {
return libProcessIP
}
}
func (ks *KubeletExecutorServer) createAndInitKubelet(
kc *app.KubeletConfig,
staticPodsConfigPath string,
execUpdates <-chan kubetypes.PodUpdate,
kubeletDone chan<- struct{},
) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
// TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient client.Interface
if kc.KubeClient == nil {
kubeClient = nil
} else {
kubeClient = kc.KubeClient
}
gcPolicy := kubeletContainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
}
// create main pod source
pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder)
updates := pc.Channel(MESOS_CFG_SOURCE)
executorDone := make(chan struct{})
go func() {
// execUpdates will be closed by the executor on shutdown
defer close(executorDone)
for u := range execUpdates {
u.Source = MESOS_CFG_SOURCE
updates <- u
}
}()
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
klet, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.NodeName,
kc.DockerClient,
kubeClient,
kc.RootDirectory,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy,
pc.SeenAllSources,
kc.RegisterNode,
kc.RegisterSchedulable,
kc.StandaloneMode,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.NetworkPlugins,
kc.NetworkPluginName,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
kc.CAdvisorInterface,
kc.ImageGCPolicy,
kc.DiskSpacePolicy,
kc.Cloud,
kc.NodeStatusUpdateFrequency,
kc.ResourceContainer,
kc.OSInterface,
kc.CgroupRoot,
kc.ContainerRuntime,
kc.RktPath,
kc.RktStage1Image,
kc.Mounter,
kc.Writer,
kc.DockerDaemonContainer,
kc.SystemContainer,
kc.ConfigureCBR0,
kc.PodCIDR,
kc.ReconcileCIDR,
kc.MaxPods,
kc.DockerExecHandler,
kc.ResolverConfig,
kc.CPUCFSQuota,
&api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},
},
kc.OOMAdjuster,
)
if err != nil {
return nil, nil, err
}
ks.kletLock.Lock()
ks.klet = klet
ks.kletLock.Unlock()
// decorate kubelet such that it shuts down when the executor is
k := &executorKubelet{
Kubelet: ks.klet,
kubeletDone: kubeletDone,
executorDone: executorDone,
}
k.BirthCry()
k.StartGarbageCollection()
return k, pc, nil
}
// kubelet decorator
type kubeletExecutor struct {
*kubelet.Kubelet
address net.IP
dockerClient dockertools.DockerInterface
kubeletDone chan<- struct{} // closed once kubelet.Run() returns
executorDone <-chan struct{} // closed when executor terminates
}
func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) {
log.Infof("Starting kubelet server...")
kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers)
}
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns.
func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
defer func() {
close(kl.kubeletDone)
util.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract
select {}
}()
// push merged updates into another, closable update channel which is closed
// when the executor shuts down.
closableUpdates := make(chan kubetypes.PodUpdate)
go func() {
// closing closableUpdates will cause our patched kubelet's syncLoop() to exit
defer close(closableUpdates)
pipeLoop:
for {
select {
case <-kl.executorDone:
break pipeLoop
default:
select {
case u := <-mergedUpdates:
select {
case closableUpdates <- u: // noop
case <-kl.executorDone:
break pipeLoop
}
case <-kl.executorDone:
break pipeLoop
}
}
}
}()
// we expect that Run() will complete after closableUpdates is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}