mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #18434 from mesosphere/sur-680-kubelet-events
Auto commit by PR queue bot
This commit is contained in:
commit
940f5d4447
@ -147,83 +147,104 @@ func (s *KubeletExecutorServer) runKubelet(
|
|||||||
staticPodsConfigPath string,
|
staticPodsConfigPath string,
|
||||||
apiclient *client.Client,
|
apiclient *client.Client,
|
||||||
podLW *cache.ListWatch,
|
podLW *cache.ListWatch,
|
||||||
) error {
|
) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
// close the channel here. When Run returns without error, the executorKubelet is
|
||||||
|
// responsible to do this. If it returns with an error, we are responsible here.
|
||||||
|
close(kubeletDone)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
kcfg, err := s.UnsecuredKubeletConfig()
|
kcfg, err := s.UnsecuredKubeletConfig()
|
||||||
if err == nil {
|
|
||||||
// apply Mesos specific settings
|
|
||||||
executorDone := make(chan struct{})
|
|
||||||
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
|
|
||||||
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
|
|
||||||
if err != nil {
|
|
||||||
return k, pc, err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.klet = k.(*kubelet.Kubelet)
|
|
||||||
close(s.kletReady) // intentionally crash if this is called more than once
|
|
||||||
|
|
||||||
// decorate kubelet such that it shuts down when the executor is
|
|
||||||
decorated := &executorKubelet{
|
|
||||||
Kubelet: s.klet,
|
|
||||||
kubeletDone: kubeletDone,
|
|
||||||
executorDone: executorDone,
|
|
||||||
}
|
|
||||||
|
|
||||||
return decorated, pc, nil
|
|
||||||
}
|
|
||||||
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
|
|
||||||
kcfg.Hostname = kcfg.HostnameOverride
|
|
||||||
kcfg.KubeClient = apiclient
|
|
||||||
kcfg.NodeName = kcfg.HostnameOverride
|
|
||||||
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
|
|
||||||
kcfg.StandaloneMode = false
|
|
||||||
kcfg.SystemContainer = "" // don't take control over other system processes.
|
|
||||||
if kcfg.Cloud != nil {
|
|
||||||
// fail early and hard because having the cloud provider loaded would go unnoticed,
|
|
||||||
// but break bigger cluster because accessing the state.json from every slave kills the master.
|
|
||||||
panic("cloud provider must not be set")
|
|
||||||
}
|
|
||||||
|
|
||||||
// create custom cAdvisor interface which return the resource values that Mesos reports
|
|
||||||
ni := <-nodeInfos
|
|
||||||
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
kcfg.CAdvisorInterface = cAdvisorInterface
|
|
||||||
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for ni := range nodeInfos {
|
|
||||||
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
|
|
||||||
log.V(3).Infof("ignoring updated node resources: %v", ni)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// create main pod source, it will close executorDone when the executor updates stop flowing
|
|
||||||
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW)
|
|
||||||
|
|
||||||
// create static-pods directory file source
|
|
||||||
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
|
|
||||||
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
|
|
||||||
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
|
|
||||||
|
|
||||||
// run the kubelet, until execUpdates is closed
|
|
||||||
// NOTE: because kcfg != nil holds, the upstream Run function will not
|
|
||||||
// initialize the cloud provider. We explicitly wouldn't want
|
|
||||||
// that because then every kubelet instance would query the master
|
|
||||||
// state.json which does not scale.
|
|
||||||
err = s.KubeletServer.Run(kcfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// close the channel here. When Run returns without error, the executorKubelet is
|
return err
|
||||||
// responsible to do this. If it returns with an error, we are responsible here.
|
|
||||||
close(kubeletDone)
|
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
// apply Mesos specific settings
|
||||||
|
executorDone := make(chan struct{})
|
||||||
|
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
|
||||||
|
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
|
||||||
|
if err != nil {
|
||||||
|
return k, pc, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.klet = k.(*kubelet.Kubelet)
|
||||||
|
close(s.kletReady) // intentionally crash if this is called more than once
|
||||||
|
|
||||||
|
// decorate kubelet such that it shuts down when the executor is
|
||||||
|
decorated := &executorKubelet{
|
||||||
|
Kubelet: s.klet,
|
||||||
|
kubeletDone: kubeletDone,
|
||||||
|
executorDone: executorDone,
|
||||||
|
}
|
||||||
|
|
||||||
|
return decorated, pc, nil
|
||||||
|
}
|
||||||
|
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
|
||||||
|
kcfg.Hostname = kcfg.HostnameOverride
|
||||||
|
kcfg.KubeClient = apiclient
|
||||||
|
|
||||||
|
// taken from KubeletServer#Run(*KubeletConfig)
|
||||||
|
eventClientConfig, err := s.CreateAPIServerClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a separate client for events
|
||||||
|
eventClientConfig.QPS = s.EventRecordQPS
|
||||||
|
eventClientConfig.Burst = s.EventBurst
|
||||||
|
kcfg.EventClient, err = client.New(eventClientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
kcfg.NodeName = kcfg.HostnameOverride
|
||||||
|
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
|
||||||
|
kcfg.StandaloneMode = false
|
||||||
|
kcfg.SystemContainer = "" // don't take control over other system processes.
|
||||||
|
if kcfg.Cloud != nil {
|
||||||
|
// fail early and hard because having the cloud provider loaded would go unnoticed,
|
||||||
|
// but break bigger cluster because accessing the state.json from every slave kills the master.
|
||||||
|
panic("cloud provider must not be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
// create custom cAdvisor interface which return the resource values that Mesos reports
|
||||||
|
ni := <-nodeInfos
|
||||||
|
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
kcfg.CAdvisorInterface = cAdvisorInterface
|
||||||
|
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for ni := range nodeInfos {
|
||||||
|
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
|
||||||
|
log.V(3).Infof("ignoring updated node resources: %v", ni)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create main pod source, it will close executorDone when the executor updates stop flowing
|
||||||
|
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW)
|
||||||
|
|
||||||
|
// create static-pods directory file source
|
||||||
|
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
|
||||||
|
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
|
||||||
|
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
|
||||||
|
|
||||||
|
// run the kubelet, until execUpdates is closed
|
||||||
|
// NOTE: because kcfg != nil holds, the upstream Run function will not
|
||||||
|
// initialize the cloud provider. We explicitly wouldn't want
|
||||||
|
// that because then every kubelet instance would query the master
|
||||||
|
// state.json which does not scale.
|
||||||
|
err = s.KubeletServer.Run(kcfg)
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the specified KubeletExecutorServer.
|
// Run runs the specified KubeletExecutorServer.
|
||||||
|
Loading…
Reference in New Issue
Block a user