From a60df400fd7a577913620544dfa0a9c55c1a1039 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 21 Aug 2015 18:30:29 +0200 Subject: [PATCH] Decouple executor initialization from kubelet This patch reduces the dependencies of the executor from the kubelet. This makes it possible launch the kubelet after the executor. This considerably reduces the complexity of the startup code. Moreover, this work is a requirement to use a standalone kubelet some day. --- contrib/mesos/pkg/executor/executor.go | 60 +++-- contrib/mesos/pkg/executor/executor_test.go | 50 ++-- contrib/mesos/pkg/executor/mock_test.go | 6 +- contrib/mesos/pkg/executor/service/service.go | 214 ++++++++++-------- 4 files changed, 176 insertions(+), 154 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index d44b9b2248c..14d5558df56 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -95,14 +95,14 @@ type podStatusFunc func() (*api.PodStatus, error) // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - updateChan chan<- interface{} // to send pod config updates to the kubelet + updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType tasks map[string]*kuberTask pods map[string]*api.Pod lock sync.Mutex - sourcename string client *client.Client - done chan struct{} // signals shutdown + terminate chan struct{} // signals that the executor should shutdown + registered chan struct{} // closed when registerd outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher @@ -113,14 +113,12 @@ type KubernetesExecutor struct { exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string - initialRegComplete chan struct{} podController *framework.Controller launchGracePeriod time.Duration } type Config struct { - Updates chan<- interface{} // to send pod config updates to the kubelet - SourceName string + Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet APIClient *client.Client Docker dockertools.DockerInterface ShutdownAlert func() @@ -144,9 +142,8 @@ func New(config Config) *KubernetesExecutor { state: disconnectedState, tasks: make(map[string]*kuberTask), pods: make(map[string]*api.Pod), - sourcename: config.SourceName, client: config.APIClient, - done: make(chan struct{}), + terminate: make(chan struct{}), outgoing: make(chan func() (mesos.Status, error), 1024), dockerClient: config.Docker, suicideTimeout: config.SuicideTimeout, @@ -155,7 +152,7 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - initialRegComplete: make(chan struct{}), + registered: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } @@ -185,26 +182,24 @@ func New(config Config) *KubernetesExecutor { return k } -func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { - return k.initialRegComplete +// InitiallyRegistered returns a channel which is closed when the executor is +// registered with the Mesos master. +func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} { + return k.registered } func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) - go k.podController.Run(k.done) + go k.podController.Run(k.terminate) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } -func (k *KubernetesExecutor) Done() <-chan struct{} { - return k.done -} - func (k *KubernetesExecutor) isDone() bool { select { - case <-k.done: + case <-k.terminate: return true default: return false @@ -234,7 +229,12 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.initialRegistration.Do(k.onInitialRegistration) + k.updateChan <- kubetypes.PodUpdate{ + Pods: []*api.Pod{}, + Op: kubetypes.SET, + } + + close(k.registered) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -255,12 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI } } - k.initialRegistration.Do(k.onInitialRegistration) -} - -func (k *KubernetesExecutor) onInitialRegistration() { - defer close(k.initialRegComplete) - // emit an empty update to allow the mesos "source" to be marked as seen k.lock.Lock() defer k.lock.Unlock() @@ -268,9 +262,8 @@ func (k *KubernetesExecutor) onInitialRegistration() { return } k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: k.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } } @@ -818,7 +811,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { (&k.state).transitionTo(terminalState) // signal to all listeners that this KubeletExecutor is done! - close(k.done) + close(k.terminate) + close(k.updateChan) if k.shutdownAlert != nil { func() { @@ -892,7 +886,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) } } @@ -900,7 +894,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status * func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) } } @@ -910,12 +904,12 @@ func (k *KubernetesExecutor) sendLoop() { defer log.V(1).Info("sender loop exiting") for { select { - case <-k.done: + case <-k.terminate: return default: if !k.isConnected() { select { - case <-k.done: + case <-k.terminate: case <-time.After(1 * time.Second): } continue @@ -935,7 +929,7 @@ func (k *KubernetesExecutor) sendLoop() { } // attempt to re-queue the sender select { - case <-k.done: + case <-k.terminate: case k.outgoing <- sender: } } diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index d2c908c5620..a91ca55c304 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -63,18 +63,14 @@ func TestExecutorRegister(t *testing.T) { executor.Registered(mockDriver, nil, nil, nil) initialPodUpdate := kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: executor.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } receivedInitialPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok { - if reflect.DeepEqual(initialPodUpdate, update) { - receivedInitialPodUpdate = true - } + case update := <-updates: + if reflect.DeepEqual(initialPodUpdate, update) { + receivedInitialPodUpdate = true } case <-time.After(time.Second): } @@ -136,7 +132,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { defer testApiServer.server.Close() mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, @@ -154,7 +150,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -205,9 +201,8 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { gotPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok && len(update.Pods) == 1 { + case update := <-updates: + if len(update.Pods) == 1 { gotPodUpdate = true } case <-time.After(time.Second): @@ -302,7 +297,7 @@ func TestExecutorStaticPods(t *testing.T) { mockDriver := &MockExecutorDriver{} config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init + Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -384,7 +379,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { kubeletFinished := make(chan struct{}) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: make(chan kubetypes.PodUpdate, 1024), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -399,7 +394,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -560,9 +555,10 @@ func TestExecutorShutdown(t *testing.T) { mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) var exitCalled int32 = 0 + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: updates, ShutdownAlert: func() { close(kubeletFinished) }, @@ -586,11 +582,21 @@ func TestExecutorShutdown(t *testing.T) { assert.Equal(t, true, executor.isDone(), "executor should be in Done state after Shutdown") - select { - case <-executor.Done(): - default: - t.Fatal("done channel should be closed after shutdown") + // channel should be closed now, only a constant number of updates left + num := len(updates) +drainLoop: + for { + select { + case _, ok := <-updates: + if !ok { + break drainLoop + } + num -= 1 + default: + t.Fatal("Updates chan should be closed after Shutdown") + } } + assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown") assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, "the executor should call its ExitFunc when it is ready to close down") diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index 727a66336bb..f73039ff7dd 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/pkg/api" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/dockertools" ) @@ -65,13 +66,12 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) { - updates := make(chan interface{}, 1024) +func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { + updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - SourceName: "executor_test", }), updates } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 271e765aaee..45488830f9f 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -18,13 +18,13 @@ package service import ( "fmt" - "math/rand" "net" "net/http" "os" "path/filepath" "strconv" "strings" + "sync" "time" log "github.com/golang/glog" @@ -35,10 +35,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" - "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -50,6 +48,7 @@ import ( utilio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/rand" ) const ( @@ -62,6 +61,8 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration + kletLock sync.Mutex + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -100,6 +101,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { s.SystemContainer = "" s.DockerDaemonContainer = "" + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } + // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() @@ -173,6 +181,43 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } + dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + + //TODO(jdef) either configure Watch here with something useful, or else + // get rid of it from executor.Config + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockerClient, + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() + + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + }) manifestURLHeader := make(http.Header) if s.ManifestURLHeader != "" { @@ -183,6 +228,30 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { manifestURLHeader.Set(pieces[0], pieces[1]) } + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + log.Fatalf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + <-exec.InitiallyRegistered() + + // prepare kubelet kcfg := app.KubeletConfig{ Address: s.Address, AllowPrivileged: s.AllowPrivileged, @@ -196,7 +265,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { ContainerRuntime: s.ContainerRuntime, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + DockerClient: dockerClient, DockerDaemonContainer: s.DockerDaemonContainer, DockerExecHandler: dockerExecHandler, EnableDebuggingHandlers: s.EnableDebuggingHandlers, @@ -248,9 +317,8 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { kcfg.NodeName = kcfg.Hostname kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, hks, clientConfig) + return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished) }) - err = app.RunKubelet(&kcfg) if err != nil { return err @@ -282,8 +350,10 @@ func defaultBindingAddress() string { func (ks *KubeletExecutorServer) createAndInitKubelet( kc *app.KubeletConfig, - hks hyperkube.Interface, clientConfig *client.Config, + staticPodsConfigPath string, + execUpdates <-chan kubetypes.PodUpdate, + kubeletDone chan<- struct{}, ) (app.KubeletBootstrap, *kconfig.PodConfig, error) { // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop @@ -304,8 +374,24 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( MaxContainers: kc.MaxContainerCount, } + // create main pod source pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) updates := pc.Channel(MESOS_CFG_SOURCE) + executorDone := make(chan struct{}) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := pc.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) klet, err := kubelet.NewMainKubelet( kc.Hostname, @@ -363,97 +449,33 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return nil, nil, err } - // create static pods directory - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") - err = os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return nil, nil, err - } - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - exec := executor.New(executor.Config{ - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, - LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - status, err := klet.GetRuntime().GetPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), - }) - - // initialize driver and initialize the executor with it - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: ks.HostnameOverride, - BindingAddress: ks.Address, - } - driver, err := bindings.NewMesosExecutorDriver(dconfig) - if err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } - log.V(2).Infof("Initialize executor driver...") - exec.Init(driver) - - // start the driver - go func() { - if _, err := driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - - <- exec.InitialRegComplete() + ks.kletLock.Lock() + ks.klet = klet + ks.kletLock.Unlock() k := &kubeletExecutor{ - Kubelet: klet, - address: ks.Address, - dockerClient: kc.DockerClient, - hks: hks, - kubeletFinished: kubeletFinished, - executorDone: exec.Done(), - clientConfig: clientConfig, + Kubelet: ks.klet, + address: ks.Address, + dockerClient: kc.DockerClient, + kubeletDone: kubeletDone, + clientConfig: clientConfig, + executorDone: executorDone, } k.BirthCry() k.StartGarbageCollection() - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - return k, pc, nil } // kubelet decorator type kubeletExecutor struct { *kubelet.Kubelet - address net.IP - dockerClient dockertools.DockerInterface - hks hyperkube.Interface - kubeletFinished chan struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // from KubeletExecutor.Done() - clientConfig *client.Config + address net.IP + dockerClient dockertools.DockerInterface + kubeletDone chan<- struct{} // closed once kubelet.Run() returns + executorDone <-chan struct{} // closed when executor terminates + clientConfig *client.Config } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { @@ -463,21 +485,21 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // never returns. -func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { +func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { defer func() { - close(kl.kubeletFinished) + close(kl.kubeletDone) util.HandleCrash() log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity // important: never return! this is in our contract select {} }() - // push updates through a closable pipe. when the executor indicates shutdown - // via Done() we want to stop the Kubelet from processing updates. - pipe := make(chan kubetypes.PodUpdate) + // push merged updates into another, closable update channel which is closed + // when the executor shuts down. + closableUpdates := make(chan kubetypes.PodUpdate) go func() { - // closing pipe will cause our patched kubelet's syncLoop() to exit - defer close(pipe) + // closing closableUpdates will cause our patched kubelet's syncLoop() to exit + defer close(closableUpdates) pipeLoop: for { select { @@ -485,9 +507,9 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { break pipeLoop default: select { - case u := <-updates: + case u := <-mergedUpdates: select { - case pipe <- u: // noop + case closableUpdates <- u: // noop case <-kl.executorDone: break pipeLoop } @@ -498,12 +520,12 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { } }() - // we expect that Run() will complete after the pipe is closed and the + // we expect that Run() will complete after closableUpdates is closed and the // kubelet's syncLoop() has finished processing its backlog, which hopefully // will not take very long. Peeking into the future (current k8s master) it // seems that the backlog has grown from 1 to 50 -- this may negatively impact // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) + util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands // Force kubelet to delete all pods.