Consolidate executor driver initialization code

This commit is contained in:
Dr. Stefan Schimanski
2015-08-21 17:23:23 +02:00
parent efdd726027
commit 93ae257af4

View File

@@ -25,7 +25,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
log "github.com/golang/glog"
@@ -257,6 +256,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
return err
}
// start health check server
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go util.Until(func() {
@@ -401,6 +401,29 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
})
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: ks.HostnameOverride,
BindingAddress: ks.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
log.Fatalf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
<- exec.InitialRegComplete()
k := &kubeletExecutor{
Kubelet: klet,
address: ks.Address,
@@ -411,27 +434,9 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
clientConfig: clientConfig,
}
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: ks.HostnameOverride,
BindingAddress: ks.Address,
}
if driver, err := bindings.NewMesosExecutorDriver(dconfig); err != nil {
log.Fatalf("failed to create executor driver: %v", err)
} else {
k.driver = driver
}
k.BirthCry()
k.StartGarbageCollection()
log.V(2).Infof("Initialize executor driver...")
exec.Init(k.driver)
<- exec.InitialRegComplete()
// from here the executor is registered with the Mesos master
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
@@ -443,8 +448,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
// kubelet decorator
type kubeletExecutor struct {
*kubelet.Kubelet
initialize sync.Once
driver bindings.ExecutorDriver
address net.IP
dockerClient dockertools.DockerInterface
hks hyperkube.Interface
@@ -454,16 +457,6 @@ type kubeletExecutor struct {
}
func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) {
// this func could be called many times, depending how often the HTTP server crashes,
// so only execute certain initialization procs once
kl.initialize.Do(func() {
go func() {
if _, err := kl.driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
})
log.Infof("Starting kubelet server...")
kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers)
}