diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index d44b9b2248c..14d5558df56 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -95,14 +95,14 @@ type podStatusFunc func() (*api.PodStatus, error) // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - updateChan chan<- interface{} // to send pod config updates to the kubelet + updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType tasks map[string]*kuberTask pods map[string]*api.Pod lock sync.Mutex - sourcename string client *client.Client - done chan struct{} // signals shutdown + terminate chan struct{} // signals that the executor should shutdown + registered chan struct{} // closed when registerd outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher @@ -113,14 +113,12 @@ type KubernetesExecutor struct { exitFunc func(int) podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string - initialRegComplete chan struct{} podController *framework.Controller launchGracePeriod time.Duration } type Config struct { - Updates chan<- interface{} // to send pod config updates to the kubelet - SourceName string + Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet APIClient *client.Client Docker dockertools.DockerInterface ShutdownAlert func() @@ -144,9 +142,8 @@ func New(config Config) *KubernetesExecutor { state: disconnectedState, tasks: make(map[string]*kuberTask), pods: make(map[string]*api.Pod), - sourcename: config.SourceName, client: config.APIClient, - done: make(chan struct{}), + terminate: make(chan struct{}), outgoing: make(chan func() (mesos.Status, error), 1024), dockerClient: config.Docker, suicideTimeout: config.SuicideTimeout, @@ -155,7 +152,7 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - initialRegComplete: make(chan struct{}), + registered: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } @@ -185,26 +182,24 @@ func New(config Config) *KubernetesExecutor { return k } -func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { - return k.initialRegComplete +// InitiallyRegistered returns a channel which is closed when the executor is +// registered with the Mesos master. +func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} { + return k.registered } func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) - go k.podController.Run(k.done) + go k.podController.Run(k.terminate) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } -func (k *KubernetesExecutor) Done() <-chan struct{} { - return k.done -} - func (k *KubernetesExecutor) isDone() bool { select { - case <-k.done: + case <-k.terminate: return true default: return false @@ -234,7 +229,12 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.initialRegistration.Do(k.onInitialRegistration) + k.updateChan <- kubetypes.PodUpdate{ + Pods: []*api.Pod{}, + Op: kubetypes.SET, + } + + close(k.registered) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -255,12 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI } } - k.initialRegistration.Do(k.onInitialRegistration) -} - -func (k *KubernetesExecutor) onInitialRegistration() { - defer close(k.initialRegComplete) - // emit an empty update to allow the mesos "source" to be marked as seen k.lock.Lock() defer k.lock.Unlock() @@ -268,9 +262,8 @@ func (k *KubernetesExecutor) onInitialRegistration() { return } k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: k.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } } @@ -818,7 +811,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { (&k.state).transitionTo(terminalState) // signal to all listeners that this KubeletExecutor is done! - close(k.done) + close(k.terminate) + close(k.updateChan) if k.shutdownAlert != nil { func() { @@ -892,7 +886,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) } } @@ -900,7 +894,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status * func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) } } @@ -910,12 +904,12 @@ func (k *KubernetesExecutor) sendLoop() { defer log.V(1).Info("sender loop exiting") for { select { - case <-k.done: + case <-k.terminate: return default: if !k.isConnected() { select { - case <-k.done: + case <-k.terminate: case <-time.After(1 * time.Second): } continue @@ -935,7 +929,7 @@ func (k *KubernetesExecutor) sendLoop() { } // attempt to re-queue the sender select { - case <-k.done: + case <-k.terminate: case k.outgoing <- sender: } } diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index d2c908c5620..a91ca55c304 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -63,18 +63,14 @@ func TestExecutorRegister(t *testing.T) { executor.Registered(mockDriver, nil, nil, nil) initialPodUpdate := kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: executor.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } receivedInitialPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok { - if reflect.DeepEqual(initialPodUpdate, update) { - receivedInitialPodUpdate = true - } + case update := <-updates: + if reflect.DeepEqual(initialPodUpdate, update) { + receivedInitialPodUpdate = true } case <-time.After(time.Second): } @@ -136,7 +132,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { defer testApiServer.server.Close() mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, @@ -154,7 +150,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -205,9 +201,8 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { gotPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok && len(update.Pods) == 1 { + case update := <-updates: + if len(update.Pods) == 1 { gotPodUpdate = true } case <-time.After(time.Second): @@ -302,7 +297,7 @@ func TestExecutorStaticPods(t *testing.T) { mockDriver := &MockExecutorDriver{} config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init + Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -384,7 +379,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { kubeletFinished := make(chan struct{}) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: make(chan kubetypes.PodUpdate, 1024), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -399,7 +394,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, HostIP: "127.0.0.1", }, nil }, @@ -560,9 +555,10 @@ func TestExecutorShutdown(t *testing.T) { mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) var exitCalled int32 = 0 + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: updates, ShutdownAlert: func() { close(kubeletFinished) }, @@ -586,11 +582,21 @@ func TestExecutorShutdown(t *testing.T) { assert.Equal(t, true, executor.isDone(), "executor should be in Done state after Shutdown") - select { - case <-executor.Done(): - default: - t.Fatal("done channel should be closed after shutdown") + // channel should be closed now, only a constant number of updates left + num := len(updates) +drainLoop: + for { + select { + case _, ok := <-updates: + if !ok { + break drainLoop + } + num -= 1 + default: + t.Fatal("Updates chan should be closed after Shutdown") + } } + assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown") assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, "the executor should call its ExitFunc when it is ready to close down") diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index 727a66336bb..f73039ff7dd 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "k8s.io/kubernetes/pkg/api" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/dockertools" ) @@ -65,13 +66,12 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) { - updates := make(chan interface{}, 1024) +func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { + updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - SourceName: "executor_test", }), updates } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 271e765aaee..45488830f9f 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -18,13 +18,13 @@ package service import ( "fmt" - "math/rand" "net" "net/http" "os" "path/filepath" "strconv" "strings" + "sync" "time" log "github.com/golang/glog" @@ -35,10 +35,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" - "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -50,6 +48,7 @@ import ( utilio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/rand" ) const ( @@ -62,6 +61,8 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration + kletLock sync.Mutex + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -100,6 +101,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { s.SystemContainer = "" s.DockerDaemonContainer = "" + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } + // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() @@ -173,6 +181,43 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } + dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + + //TODO(jdef) either configure Watch here with something useful, or else + // get rid of it from executor.Config + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockerClient, + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() + + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + }) manifestURLHeader := make(http.Header) if s.ManifestURLHeader != "" { @@ -183,6 +228,30 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { manifestURLHeader.Set(pieces[0], pieces[1]) } + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + log.Fatalf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + <-exec.InitiallyRegistered() + + // prepare kubelet kcfg := app.KubeletConfig{ Address: s.Address, AllowPrivileged: s.AllowPrivileged, @@ -196,7 +265,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { ContainerRuntime: s.ContainerRuntime, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + DockerClient: dockerClient, DockerDaemonContainer: s.DockerDaemonContainer, DockerExecHandler: dockerExecHandler, EnableDebuggingHandlers: s.EnableDebuggingHandlers, @@ -248,9 +317,8 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { kcfg.NodeName = kcfg.Hostname kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, hks, clientConfig) + return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished) }) - err = app.RunKubelet(&kcfg) if err != nil { return err @@ -282,8 +350,10 @@ func defaultBindingAddress() string { func (ks *KubeletExecutorServer) createAndInitKubelet( kc *app.KubeletConfig, - hks hyperkube.Interface, clientConfig *client.Config, + staticPodsConfigPath string, + execUpdates <-chan kubetypes.PodUpdate, + kubeletDone chan<- struct{}, ) (app.KubeletBootstrap, *kconfig.PodConfig, error) { // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop @@ -304,8 +374,24 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( MaxContainers: kc.MaxContainerCount, } + // create main pod source pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) updates := pc.Channel(MESOS_CFG_SOURCE) + executorDone := make(chan struct{}) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := pc.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) klet, err := kubelet.NewMainKubelet( kc.Hostname, @@ -363,97 +449,33 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return nil, nil, err } - // create static pods directory - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") - err = os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return nil, nil, err - } - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - exec := executor.New(executor.Config{ - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, - LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - status, err := klet.GetRuntime().GetPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, - StaticPodsConfigPath: staticPodsConfigPath, - PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), - }) - - // initialize driver and initialize the executor with it - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: ks.HostnameOverride, - BindingAddress: ks.Address, - } - driver, err := bindings.NewMesosExecutorDriver(dconfig) - if err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } - log.V(2).Infof("Initialize executor driver...") - exec.Init(driver) - - // start the driver - go func() { - if _, err := driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - - <- exec.InitialRegComplete() + ks.kletLock.Lock() + ks.klet = klet + ks.kletLock.Unlock() k := &kubeletExecutor{ - Kubelet: klet, - address: ks.Address, - dockerClient: kc.DockerClient, - hks: hks, - kubeletFinished: kubeletFinished, - executorDone: exec.Done(), - clientConfig: clientConfig, + Kubelet: ks.klet, + address: ks.Address, + dockerClient: kc.DockerClient, + kubeletDone: kubeletDone, + clientConfig: clientConfig, + executorDone: executorDone, } k.BirthCry() k.StartGarbageCollection() - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - return k, pc, nil } // kubelet decorator type kubeletExecutor struct { *kubelet.Kubelet - address net.IP - dockerClient dockertools.DockerInterface - hks hyperkube.Interface - kubeletFinished chan struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // from KubeletExecutor.Done() - clientConfig *client.Config + address net.IP + dockerClient dockertools.DockerInterface + kubeletDone chan<- struct{} // closed once kubelet.Run() returns + executorDone <-chan struct{} // closed when executor terminates + clientConfig *client.Config } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { @@ -463,21 +485,21 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // never returns. -func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { +func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { defer func() { - close(kl.kubeletFinished) + close(kl.kubeletDone) util.HandleCrash() log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity // important: never return! this is in our contract select {} }() - // push updates through a closable pipe. when the executor indicates shutdown - // via Done() we want to stop the Kubelet from processing updates. - pipe := make(chan kubetypes.PodUpdate) + // push merged updates into another, closable update channel which is closed + // when the executor shuts down. + closableUpdates := make(chan kubetypes.PodUpdate) go func() { - // closing pipe will cause our patched kubelet's syncLoop() to exit - defer close(pipe) + // closing closableUpdates will cause our patched kubelet's syncLoop() to exit + defer close(closableUpdates) pipeLoop: for { select { @@ -485,9 +507,9 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { break pipeLoop default: select { - case u := <-updates: + case u := <-mergedUpdates: select { - case pipe <- u: // noop + case closableUpdates <- u: // noop case <-kl.executorDone: break pipeLoop } @@ -498,12 +520,12 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { } }() - // we expect that Run() will complete after the pipe is closed and the + // we expect that Run() will complete after closableUpdates is closed and the // kubelet's syncLoop() has finished processing its backlog, which hopefully // will not take very long. Peeking into the future (current k8s master) it // seems that the backlog has grown from 1 to 50 -- this may negatively impact // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) + util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands // Force kubelet to delete all pods.