diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 7b6fd5bdd03..14ba8e3433e 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -19,7 +19,6 @@ package executor import ( "encoding/json" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -40,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -89,57 +87,45 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool { type kuberTask struct { mesosTaskInfo *mesos.TaskInfo - podName string + podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods } type podStatusFunc func() (*api.PodStatus, error) -// KubeletInterface consists of the kubelet.Kubelet API's that we actually use -type KubeletInterface interface { - GetHostIP() (net.IP, error) -} - // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { - kl KubeletInterface // the kubelet instance. - updateChan chan<- interface{} // to send pod config updates to the kubelet + updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType tasks map[string]*kuberTask pods map[string]*api.Pod - lock sync.RWMutex - sourcename string + lock sync.Mutex client *client.Client - done chan struct{} // signals shutdown + terminate chan struct{} // signals that the executor should shutdown outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher suicideTimeout time.Duration shutdownAlert func() // invoked just prior to executor shutdown kubeletFinished <-chan struct{} // signals that kubelet Run() died - initialRegistration sync.Once exitFunc func(int) - podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) - staticPodsConfig []byte + podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string - initialRegComplete chan struct{} podController *framework.Controller launchGracePeriod time.Duration } type Config struct { - Kubelet KubeletInterface - Updates chan<- interface{} // to send pod config updates to the kubelet - SourceName string + Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet APIClient *client.Client Docker dockertools.DockerInterface ShutdownAlert func() SuicideTimeout time.Duration KubeletFinished <-chan struct{} // signals that kubelet Run() died ExitFunc func(int) - PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) + PodStatusFunc func(*api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string - PodLW cache.ListerWatcher + PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration } @@ -150,14 +136,12 @@ func (k *KubernetesExecutor) isConnected() bool { // New creates a new kubernetes executor. func New(config Config) *KubernetesExecutor { k := &KubernetesExecutor{ - kl: config.Kubelet, updateChan: config.Updates, state: disconnectedState, tasks: make(map[string]*kuberTask), pods: make(map[string]*api.Pod), - sourcename: config.SourceName, client: config.APIClient, - done: make(chan struct{}), + terminate: make(chan struct{}), outgoing: make(chan func() (mesos.Status, error), 1024), dockerClient: config.Docker, suicideTimeout: config.SuicideTimeout, @@ -166,12 +150,15 @@ func New(config Config) *KubernetesExecutor { shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, podStatusFunc: config.PodStatusFunc, - initialRegComplete: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, } // watch pods from the given pod ListWatch + if config.PodLW == nil { + // fail early to make debugging easier + panic("cannot create executor with nil PodLW") + } _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*api.Pod) @@ -196,24 +183,29 @@ func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) - go k.podController.Run(k.done) + go k.podController.Run(k.terminate) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } -func (k *KubernetesExecutor) Done() <-chan struct{} { - return k.done -} - func (k *KubernetesExecutor) isDone() bool { select { - case <-k.done: + case <-k.terminate: return true default: return false } } +// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false +func (k *KubernetesExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool { + if k.isDone() { + return false + } + k.updateChan <- *u + return true +} + // Registered is called when the executor is successfully registered with the slave. func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) { @@ -227,7 +219,7 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } if executorInfo != nil && executorInfo.Data != nil { - k.staticPodsConfig = executorInfo.Data + k.initializeStaticPodsSource(executorInfo.Data) } if slaveInfo != nil { @@ -237,7 +229,13 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, } } - k.initialRegistration.Do(k.onInitialRegistration) + // emit an empty update to allow the mesos "source" to be marked as seen + k.lock.Lock() + defer k.lock.Unlock() + k.sendPodUpdate(&kubetypes.PodUpdate{ + Pods: []*api.Pod{}, + Op: kubetypes.SET, + }) } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -257,39 +255,16 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI log.Errorf("cannot update node labels: %v", err) } } - - k.initialRegistration.Do(k.onInitialRegistration) } -func (k *KubernetesExecutor) onInitialRegistration() { - defer close(k.initialRegComplete) - - // emit an empty update to allow the mesos "source" to be marked as seen - k.updateChan <- kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: k.sourcename, - } -} - -// InitializeStaticPodsSource blocks until initial regstration is complete and -// then creates a static pod source using the given factory func. -func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) { - <-k.initialRegComplete - - if k.staticPodsConfig == nil { - return - } - +// initializeStaticPodsSource unzips the data slice into the static-pods directory +func (k *KubernetesExecutor) initializeStaticPodsSource(data []byte) { log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) - err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath) + err := archive.UnzipDir(data, k.staticPodsConfigPath) if err != nil { log.Errorf("Failed to extract static pod config: %v", err) return } - - log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath) - sourceFactory() } // Disconnected is called when the executor is disconnected from the slave. @@ -393,11 +368,10 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.UPDATE, Pods: []*api.Pod{oldPod}, - } - k.updateChan <- update + }) } } } @@ -550,7 +524,7 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.lock.Lock() defer k.lock.Unlock() - // Add the task. + // find task task, found := k.tasks[taskId] if !found { log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId) @@ -560,18 +534,23 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s //TODO(jdef) check for duplicate pod name, if found send TASK_ERROR - // from here on, we need to delete containers associated with the task - // upon it going into a terminal state + // send the new pod to the kubelet which will spin it up + ok := k.sendPodUpdate(&kubetypes.PodUpdate{ + Op: kubetypes.ADD, + Pods: []*api.Pod{pod}, + }) + if !ok { + return // executor is terminating, cancel launch + } + + // mark task as sent by setting the podName and register the sent pod task.podName = podFullName k.pods[podFullName] = pod - // send the new pod to the kubelet which will spin it up - update := kubetypes.PodUpdate{ - Op: kubetypes.ADD, - Pods: []*api.Pod{pod}, - } - k.updateChan <- update + // From here on, we need to delete containers associated with the task upon + // it going into a terminal state. + // report task is starting to scheduler statusUpdate := &mesos.TaskStatus{ TaskId: mutil.NewTaskID(taskId), State: mesos.TaskState_TASK_STARTING.Enum(), @@ -582,20 +561,8 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s // Delay reporting 'task running' until container is up. psf := podStatusFunc(func() (*api.PodStatus, error) { - status, err := k.podStatusFunc(k.kl, pod) - if err != nil { - return nil, err - } - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := k.kl.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil + return k.podStatusFunc(pod) }) - go k._launchTask(driver, taskId, podFullName, psf) } @@ -771,11 +738,10 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid, delete(k.pods, pid) // tell the kubelet to remove the pod - update := kubetypes.PodUpdate{ + k.sendPodUpdate(&kubetypes.PodUpdate{ Op: kubetypes.REMOVE, Pods: []*api.Pod{pod}, - } - k.updateChan <- update + }) } // TODO(jdef): ensure that the update propagates, perhaps return a signal chan? k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason)) @@ -828,7 +794,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { (&k.state).transitionTo(terminalState) // signal to all listeners that this KubeletExecutor is done! - close(k.done) + close(k.terminate) + close(k.updateChan) if k.shutdownAlert != nil { func() { @@ -902,7 +869,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) } } @@ -910,7 +877,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status * func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { select { - case <-k.done: + case <-k.terminate: default: k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) } } @@ -920,12 +887,12 @@ func (k *KubernetesExecutor) sendLoop() { defer log.V(1).Info("sender loop exiting") for { select { - case <-k.done: + case <-k.terminate: return default: if !k.isConnected() { select { - case <-k.done: + case <-k.terminate: case <-time.After(1 * time.Second): } continue @@ -945,7 +912,7 @@ func (k *KubernetesExecutor) sendLoop() { } // attempt to re-queue the sender select { - case <-k.done: + case <-k.terminate: case k.outgoing <- sender: } } diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 1e127977bf2..029e64076eb 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" "github.com/mesos/mesos-go/mesosproto" @@ -57,31 +58,22 @@ import ( // after Register is called. func TestExecutorRegister(t *testing.T) { mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) - executor := New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - SourceName: "executor_test", - }) + executor, updates := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) initialPodUpdate := kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - Source: executor.sourcename, + Pods: []*api.Pod{}, + Op: kubetypes.SET, } receivedInitialPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok { - if reflect.DeepEqual(initialPodUpdate, update) { - receivedInitialPodUpdate = true - } + case update := <-updates: + if reflect.DeepEqual(initialPodUpdate, update) { + receivedInitialPodUpdate = true } - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): } assert.Equal(t, true, receivedInitialPodUpdate, "executor should have sent an initial PodUpdate "+ @@ -95,7 +87,7 @@ func TestExecutorRegister(t *testing.T) { // connected after a call to Disconnected has occurred. func TestExecutorDisconnect(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -110,7 +102,7 @@ func TestExecutorDisconnect(t *testing.T) { // after a connection problem happens, followed by a call to Reregistered. func TestExecutorReregister(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -141,7 +133,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { defer testApiServer.server.Close() mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: updates, @@ -149,11 +141,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -163,9 +151,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -174,7 +164,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { select { case <-updates: - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("Executor should send an initial update on Registration") } @@ -204,7 +194,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { executor.LaunchTask(mockDriver, taskInfo) - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 1 && len(executor.pods) == 1 @@ -212,12 +202,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { gotPodUpdate := false select { - case m := <-updates: - update, ok := m.(kubetypes.PodUpdate) - if ok && len(update.Pods) == 1 { + case update := <-updates: + if len(update.Pods) == 1 { gotPodUpdate = true } - case <-time.After(time.Second): + case <-time.After(util.ForeverTestTimeout): } assert.Equal(t, true, gotPodUpdate, "the executor should send an update about a new pod to "+ @@ -227,7 +216,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished := kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } @@ -239,7 +228,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { executor.KillTask(mockDriver, taskInfo.TaskId) - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 0 && len(executor.pods) == 0 @@ -249,7 +238,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished = kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } mockDriver.AssertExpectations(t) @@ -307,16 +296,14 @@ func TestExecutorStaticPods(t *testing.T) { defer os.RemoveAll(staticPodsConfigPath) mockDriver := &MockExecutorDriver{} - updates := make(chan interface{}, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init + Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &kubelet.Kubelet{}, - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -330,12 +317,14 @@ func TestExecutorStaticPods(t *testing.T) { }, nil }, StaticPodsConfigPath: staticPodsConfigPath, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) + + // register static pod source hostname := "h1" - go executor.InitializeStaticPodsSource(func() { - kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates) - }) + fileSourceUpdates := make(chan interface{}, 1024) + kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, fileSourceUpdates) // create ExecutorInfo with static pod zip in data field executorInfo := mesosutil.NewExecutorInfo( @@ -350,14 +339,14 @@ func TestExecutorStaticPods(t *testing.T) { // wait for static pod to start seenPods := map[string]struct{}{} - timeout := time.After(time.Second) + timeout := time.After(util.ForeverTestTimeout) defer mockDriver.AssertExpectations(t) for { // filter by PodUpdate type select { case <-timeout: t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods)) - case update, ok := <-updates: + case update, ok := <-fileSourceUpdates: if !ok { return } @@ -391,16 +380,12 @@ func TestExecutorFrameworkMessage(t *testing.T) { kubeletFinished := make(chan struct{}) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: make(chan kubetypes.PodUpdate, 1024), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), }), - Kubelet: &fakeKubelet{ - Kubelet: &kubelet.Kubelet{}, - hostIP: net.IPv4(127, 0, 0, 1), - }, - PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { return &api.PodStatus{ ContainerStatuses: []api.ContainerStatus{ { @@ -410,13 +395,15 @@ func TestExecutorFrameworkMessage(t *testing.T) { }, }, }, - Phase: api.PodRunning, + Phase: api.PodRunning, + HostIP: "127.0.0.1", }, nil }, ShutdownAlert: func() { close(kubeletFinished) }, KubeletFinished: kubeletFinished, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -452,7 +439,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { // when removing the task from k.tasks through the "task-lost:foo" message below. select { case <-called: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate for the running task") } @@ -464,7 +451,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once() executor.FrameworkMessage(mockDriver, "task-lost:foo") - assertext.EventuallyTrue(t, 5*time.Second, func() bool { + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return len(executor.tasks) == 0 && len(executor.pods) == 0 @@ -472,7 +459,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { select { case <-called: - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate") } @@ -569,9 +556,10 @@ func TestExecutorShutdown(t *testing.T) { mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) var exitCalled int32 = 0 + updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), + Updates: updates, ShutdownAlert: func() { close(kubeletFinished) }, @@ -579,6 +567,7 @@ func TestExecutorShutdown(t *testing.T) { ExitFunc: func(_ int) { atomic.AddInt32(&exitCalled, 1) }, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, } executor := New(config) @@ -594,11 +583,21 @@ func TestExecutorShutdown(t *testing.T) { assert.Equal(t, true, executor.isDone(), "executor should be in Done state after Shutdown") - select { - case <-executor.Done(): - default: - t.Fatal("done channel should be closed after shutdown") + // channel should be closed now, only a constant number of updates left + num := len(updates) +drainLoop: + for { + select { + case _, ok := <-updates: + if !ok { + break drainLoop + } + num -= 1 + default: + t.Fatal("Updates chan should be closed after Shutdown") + } } + assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown") assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, "the executor should call its ExitFunc when it is ready to close down") @@ -608,7 +607,7 @@ func TestExecutorShutdown(t *testing.T) { func TestExecutorsendFrameworkMessage(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -623,7 +622,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) { // guard against data race in mock driver between AssertExpectations and Called select { case <-called: // expected - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("expected call to SendFrameworkMessage") } mockDriver.AssertExpectations(t) diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index be4951eab58..1a8b35ac6a2 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -22,7 +22,9 @@ import ( "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/dockertools" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) type MockExecutorDriver struct { @@ -64,16 +66,18 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() *KubernetesExecutor { +func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) { + updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan interface{}, 1024), - }) + Updates: updates, + PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, + }), updates } func TestExecutorNew(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor := NewTestKubernetesExecutor() + executor, _ := NewTestKubernetesExecutor() executor.Init(mockDriver) assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization") diff --git a/contrib/mesos/pkg/executor/service/kubelet.go b/contrib/mesos/pkg/executor/service/kubelet.go new file mode 100644 index 00000000000..191f0e00e73 --- /dev/null +++ b/contrib/mesos/pkg/executor/service/kubelet.go @@ -0,0 +1,83 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + log "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/util" +) + +// executorKubelet decorates the kubelet with a Run function that notifies the +// executor by closing kubeletDone before entering blocking state. +type executorKubelet struct { + *kubelet.Kubelet + kubeletDone chan<- struct{} // closed once kubelet.Run() returns + executorDone <-chan struct{} // closed when executor terminates +} + +// Run runs the main kubelet loop, closing the kubeletFinished chan when the +// loop exits. Like the upstream Run, it will never return. +func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) { + defer func() { + // When this Run function is called, we close it here. + // Otherwise, KubeletExecutorServer.runKubelet will. + close(kl.kubeletDone) + util.HandleCrash() + log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity + // important: never return! this is in our contract + select {} + }() + + // push merged updates into another, closable update channel which is closed + // when the executor shuts down. + closableUpdates := make(chan kubetypes.PodUpdate) + go func() { + // closing closableUpdates will cause our patched kubelet's syncLoop() to exit + defer close(closableUpdates) + pipeLoop: + for { + select { + case <-kl.executorDone: + break pipeLoop + default: + select { + case u := <-mergedUpdates: + select { + case closableUpdates <- u: // noop + case <-kl.executorDone: + break pipeLoop + } + case <-kl.executorDone: + break pipeLoop + } + } + } + }() + + // we expect that Run() will complete after closableUpdates is closed and the + // kubelet's syncLoop() has finished processing its backlog, which hopefully + // will not take very long. Peeking into the future (current k8s master) it + // seems that the backlog has grown from 1 to 50 -- this may negatively impact + // us going forward, time will tell. + util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) + + //TODO(jdef) revisit this if/when executor failover lands + // Force kubelet to delete all pods. + kl.HandlePodDeletions(kl.GetPods()) +} diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 27200494e4d..a6454938927 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -18,13 +18,9 @@ package service import ( "fmt" - "math/rand" "net" - "net/http" "os" "path/filepath" - "strconv" - "strings" "sync" "time" @@ -38,19 +34,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" - "k8s.io/kubernetes/pkg/kubelet/cadvisor" kconfig "k8s.io/kubernetes/pkg/kubelet/config" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" - utilio "k8s.io/kubernetes/pkg/util/io" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/util/oom" ) const ( @@ -63,6 +51,9 @@ type KubeletExecutorServer struct { *app.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration + + kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor + klet *kubelet.Kubelet } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -87,19 +78,152 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { - rand.Seed(time.Now().UTC().UnixNano()) +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { + exec := executor.New(executor.Config{ + Updates: execUpdates, + APIClient: apiclient, + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { + s.kletLock.Lock() + defer s.kletLock.Unlock() - oomAdjuster := oom.NewOOMAdjuster() - if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { - log.Info(err) + if s.klet == nil { + return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") + } + + status, err := s.klet.GetRuntime().GetPodStatus(pod) + if err != nil { + return nil, err + } + + status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) + hostIP, err := s.klet.GetHostIP() + if err != nil { + log.Errorf("Cannot get host IP: %v", err) + } else { + status.HostIP = hostIP.String() + } + return status, nil + }, + StaticPodsConfigPath: staticPodsConfigPath, + PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, + fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + ), + }) + + // initialize driver and initialize the executor with it + dconfig := bindings.DriverConfig{ + Executor: exec, + HostnameOverride: s.HostnameOverride, + BindingAddress: s.Address, + } + driver, err := bindings.NewMesosExecutorDriver(dconfig) + if err != nil { + return fmt.Errorf("failed to create executor driver: %v", err) + } + log.V(2).Infof("Initialize executor driver...") + exec.Init(driver) + + // start the driver + go func() { + if _, err := driver.Run(); err != nil { + log.Fatalf("executor driver failed: %v", err) + } + log.Info("executor Run completed") + }() + + return nil +} + +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { + kcfg, err := s.UnsecuredKubeletConfig() + if err == nil { + // apply Messo specific settings + executorDone := make(chan struct{}) + kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { + k, pc, err := app.CreateAndInitKubelet(kc) + if err != nil { + return k, pc, err + } + + klet := k.(*kubelet.Kubelet) + + s.kletLock.Lock() + s.klet = klet + s.kletLock.Unlock() + + // decorate kubelet such that it shuts down when the executor is + decorated := &executorKubelet{ + Kubelet: klet, + kubeletDone: kubeletDone, + executorDone: executorDone, + } + + return decorated, pc, nil + } + kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup + kcfg.Hostname = kcfg.HostnameOverride + kcfg.KubeClient = apiclient + kcfg.NodeName = kcfg.HostnameOverride + kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source + kcfg.StandaloneMode = false + kcfg.SystemContainer = "" // don't take control over other system processes. + if kcfg.Cloud != nil { + // fail early and hard because having the cloud provider loaded would go unnoticed, + // but break bigger cluster because accessing the state.json from every slave kills the master. + panic("cloud provider must not be set") + } + + // create main pod source + updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) + + // run the kubelet, until execUpdates is closed + // NOTE: because kcfg != nil holds, the upstream Run function will not + // initialize the cloud provider. We explicitly wouldn't want + // that because then every kubelet instance would query the master + // state.json which does not scale. + err = s.KubeletServer.Run(kcfg) } - // empty string for the docker and system containers (= cgroup paths). This - // stops the kubelet taking any control over other system processes. - s.SystemContainer = "" - s.DockerDaemonContainer = "" + if err != nil { + // close the channel here. When Run returns without error, the executorKubelet is + // responsible to do this. If it returns with an error, we are responsible here. + close(kubeletDone) + } + return err +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { + // create shared channels + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0750) + if err != nil { + return err + } // create apiserver client var apiclient *client.Client @@ -108,167 +232,18 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { apiclient, err = client.New(clientConfig) } if err != nil { - // required for k8sm since we need to send api.Binding information - // back to the apiserver - log.Fatalf("No API client: %v", err) + // required for k8sm since we need to send api.Binding information back to the apiserver + return fmt.Errorf("cannot create API client: %v", err) } - log.Infof("Using root directory: %v", s.RootDirectory) - credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) - - cAdvisorInterface, err := cadvisor.New(s.CAdvisorPort) + // start executor + err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) if err != nil { return err } - imageGCPolicy := kubelet.ImageGCPolicy{ - HighThresholdPercent: s.ImageGCHighThresholdPercent, - LowThresholdPercent: s.ImageGCLowThresholdPercent, - } - - diskSpacePolicy := kubelet.DiskSpacePolicy{ - DockerFreeDiskMB: s.LowDiskSpaceThresholdMB, - RootFreeDiskMB: s.LowDiskSpaceThresholdMB, - } - - //TODO(jdef) intentionally NOT initializing a cloud provider here since: - //(a) the kubelet doesn't actually use it - //(b) we don't need to create N-kubelet connections to zookeeper for no good reason - //cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) - //log.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) - - hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ",")) - if err != nil { - return err - } - - hostPIDSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostPIDSources, ",")) - if err != nil { - return err - } - - hostIPCSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostIPCSources, ",")) - if err != nil { - return err - } - - tlsOptions, err := s.InitializeTLS() - if err != nil { - return err - } - mounter := mount.New() - if s.Containerized { - log.V(2).Info("Running kubelet in containerized mode (experimental)") - mounter = &mount.NsenterMounter{} - } - - var writer utilio.Writer = &utilio.StdWriter{} - var dockerExecHandler dockertools.ExecHandler - switch s.DockerExecHandlerName { - case "native": - dockerExecHandler = &dockertools.NativeExecHandler{} - case "nsenter": - writer = &utilio.NsenterWriter{} - dockerExecHandler = &dockertools.NsenterExecHandler{} - default: - log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) - dockerExecHandler = &dockertools.NativeExecHandler{} - } - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - kcfg := app.KubeletConfig{ - Address: s.Address, - AllowPrivileged: s.AllowPrivileged, - CAdvisorInterface: cAdvisorInterface, - CgroupRoot: s.CgroupRoot, - Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl - ClusterDNS: s.ClusterDNS, - ClusterDomain: s.ClusterDomain, - // ConfigFile: "" - ConfigureCBR0: s.ConfigureCBR0, - ContainerRuntime: s.ContainerRuntime, - CPUCFSQuota: s.CPUCFSQuota, - DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), - DockerDaemonContainer: s.DockerDaemonContainer, - DockerExecHandler: dockerExecHandler, - EnableDebuggingHandlers: s.EnableDebuggingHandlers, - EnableServer: s.EnableServer, - EventBurst: s.EventBurst, - EventRecordQPS: s.EventRecordQPS, - FileCheckFrequency: s.FileCheckFrequency, - HostnameOverride: s.HostnameOverride, - HostNetworkSources: hostNetworkSources, - HostPIDSources: hostPIDSources, - HostIPCSources: hostIPCSources, - // HTTPCheckFrequency - ImageGCPolicy: imageGCPolicy, - KubeClient: apiclient, - // ManifestURL: "" - ManifestURLHeader: manifestURLHeader, - MasterServiceNamespace: s.MasterServiceNamespace, - MaxContainerCount: s.MaxContainerCount, - MaxOpenFiles: s.MaxOpenFiles, - MaxPerPodContainerCount: s.MaxPerPodContainerCount, - MaxPods: s.MaxPods, - MinimumGCAge: s.MinimumGCAge, - Mounter: mounter, - NetworkPluginName: s.NetworkPluginName, - NetworkPlugins: app.ProbeNetworkPlugins(s.NetworkPluginDir), - NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, - OOMAdjuster: oomAdjuster, - OSInterface: kubecontainer.RealOS{}, - PodCIDR: s.PodCIDR, - PodInfraContainerImage: s.PodInfraContainerImage, - Port: s.Port, - ReadOnlyPort: s.ReadOnlyPort, - RegisterNode: s.RegisterNode, - RegistryBurst: s.RegistryBurst, - RegistryPullQPS: s.RegistryPullQPS, - ResolverConfig: s.ResolverConfig, - ResourceContainer: s.ResourceContainer, - RootDirectory: s.RootDirectory, - Runonce: s.RunOnce, - // StandaloneMode: false - StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, - SyncFrequency: s.SyncFrequency, - SystemContainer: s.SystemContainer, - TLSOptions: tlsOptions, - VolumePlugins: app.ProbeVolumePlugins(), - Writer: writer, - } - - kcfg.NodeName = kcfg.Hostname - - kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, hks, clientConfig) - }) - - err = app.RunKubelet(&kcfg) - if err != nil { - return err - } - - if s.HealthzPort > 0 { - healthz.DefaultHealthz() - go util.Until(func() { - err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) - if err != nil { - log.Errorf("Starting health server failed: %v", err) - } - }, 5*time.Second, util.NeverStop) - } - - // block until executor is shut down or commits shutdown - select {} + // start kubelet, blocking + return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) } func defaultBindingAddress() string { @@ -279,222 +254,3 @@ func defaultBindingAddress() string { return libProcessIP } } - -func (ks *KubeletExecutorServer) createAndInitKubelet( - kc *app.KubeletConfig, - hks hyperkube.Interface, - clientConfig *client.Config, -) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - - // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop - // up into "per source" synchronizations - // TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods - // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing - // a nil pointer to it when what we really want is a nil interface. - var kubeClient client.Interface - if kc.KubeClient == nil { - kubeClient = nil - } else { - kubeClient = kc.KubeClient - } - - gcPolicy := kubecontainer.ContainerGCPolicy{ - MinAge: kc.MinimumGCAge, - MaxPerPodContainer: kc.MaxPerPodContainerCount, - MaxContainers: kc.MaxContainerCount, - } - - pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) - updates := pc.Channel(MESOS_CFG_SOURCE) - - klet, err := kubelet.NewMainKubelet( - kc.Hostname, - kc.NodeName, - kc.DockerClient, - kubeClient, - kc.RootDirectory, - kc.PodInfraContainerImage, - kc.SyncFrequency, - float32(kc.RegistryPullQPS), - kc.RegistryBurst, - kc.EventRecordQPS, - kc.EventBurst, - gcPolicy, - pc.SeenAllSources, - kc.RegisterNode, - kc.RegisterSchedulable, - kc.StandaloneMode, - kc.ClusterDomain, - net.IP(kc.ClusterDNS), - kc.MasterServiceNamespace, - kc.VolumePlugins, - kc.NetworkPlugins, - kc.NetworkPluginName, - kc.StreamingConnectionIdleTimeout, - kc.Recorder, - kc.CAdvisorInterface, - kc.ImageGCPolicy, - kc.DiskSpacePolicy, - kc.Cloud, - kc.NodeStatusUpdateFrequency, - kc.ResourceContainer, - kc.OSInterface, - kc.CgroupRoot, - kc.ContainerRuntime, - kc.RktPath, - kc.RktStage1Image, - kc.Mounter, - kc.Writer, - kc.DockerDaemonContainer, - kc.SystemContainer, - kc.ConfigureCBR0, - kc.PodCIDR, - kc.ReconcileCIDR, - kc.MaxPods, - kc.DockerExecHandler, - kc.ResolverConfig, - kc.CPUCFSQuota, - &api.NodeDaemonEndpoints{ - KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)}, - }, - kc.OOMAdjuster, - ) - if err != nil { - return nil, nil, err - } - - //TODO(jdef) either configure Watch here with something useful, or else - // get rid of it from executor.Config - kubeletFinished := make(chan struct{}) - staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") - exec := executor.New(executor.Config{ - Kubelet: klet, - Updates: updates, - SourceName: MESOS_CFG_SOURCE, - APIClient: kc.KubeClient, - Docker: kc.DockerClient, - SuicideTimeout: ks.SuicideTimeout, - LaunchGracePeriod: ks.LaunchGracePeriod, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { - return klet.GetRuntime().GetPodStatus(pod) - }, - StaticPodsConfigPath: staticPodsConfigPath, - PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), - }) - - go exec.InitializeStaticPodsSource(func() { - // Create file source only when we are called back. Otherwise, it is never marked unseen. - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - }) - - k := &kubeletExecutor{ - Kubelet: klet, - address: ks.Address, - dockerClient: kc.DockerClient, - hks: hks, - kubeletFinished: kubeletFinished, - executorDone: exec.Done(), - clientConfig: clientConfig, - } - - dconfig := bindings.DriverConfig{ - Executor: exec, - HostnameOverride: ks.HostnameOverride, - BindingAddress: ks.Address, - } - if driver, err := bindings.NewMesosExecutorDriver(dconfig); err != nil { - log.Fatalf("failed to create executor driver: %v", err) - } else { - k.driver = driver - } - - log.V(2).Infof("Initialize executor driver...") - - k.BirthCry() - exec.Init(k.driver) - - k.StartGarbageCollection() - - return k, pc, nil -} - -// kubelet decorator -type kubeletExecutor struct { - *kubelet.Kubelet - initialize sync.Once - driver bindings.ExecutorDriver - address net.IP - dockerClient dockertools.DockerInterface - hks hyperkube.Interface - kubeletFinished chan struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // from KubeletExecutor.Done() - clientConfig *client.Config -} - -func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { - // this func could be called many times, depending how often the HTTP server crashes, - // so only execute certain initialization procs once - kl.initialize.Do(func() { - go func() { - if _, err := kl.driver.Run(); err != nil { - log.Fatalf("executor driver failed: %v", err) - } - log.Info("executor Run completed") - }() - }) - log.Infof("Starting kubelet server...") - kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers) -} - -// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. -// never returns. -func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { - defer func() { - close(kl.kubeletFinished) - util.HandleCrash() - log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity - // important: never return! this is in our contract - select {} - }() - - // push updates through a closable pipe. when the executor indicates shutdown - // via Done() we want to stop the Kubelet from processing updates. - pipe := make(chan kubetypes.PodUpdate) - go func() { - // closing pipe will cause our patched kubelet's syncLoop() to exit - defer close(pipe) - pipeLoop: - for { - select { - case <-kl.executorDone: - break pipeLoop - default: - select { - case u := <-updates: - select { - case pipe <- u: // noop - case <-kl.executorDone: - break pipeLoop - } - case <-kl.executorDone: - break pipeLoop - } - } - } - }() - - // we expect that Run() will complete after the pipe is closed and the - // kubelet's syncLoop() has finished processing its backlog, which hopefully - // will not take very long. Peeking into the future (current k8s master) it - // seems that the backlog has grown from 1 to 50 -- this may negatively impact - // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) - - //TODO(jdef) revisit this if/when executor failover lands - // Force kubelet to delete all pods. - kl.HandlePodDeletions(kl.GetPods()) -} diff --git a/contrib/mesos/pkg/executor/suicide_test.go b/contrib/mesos/pkg/executor/suicide_test.go index 706ad2876e2..5426433f671 100644 --- a/contrib/mesos/pkg/executor/suicide_test.go +++ b/contrib/mesos/pkg/executor/suicide_test.go @@ -67,7 +67,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper { func TestSuicide_zeroTimeout(t *testing.T) { defer glog.Flush() - k := New(Config{}) + k, _ := NewTestKubernetesExecutor() tracker := &suicideTracker{suicideWatcher: k.suicideWatch} k.suicideWatch = tracker @@ -92,9 +92,8 @@ func TestSuicide_zeroTimeout(t *testing.T) { func TestSuicide_WithTasks(t *testing.T) { defer glog.Flush() - k := New(Config{ - SuicideTimeout: 50 * time.Millisecond, - }) + k, _ := NewTestKubernetesExecutor() + k.suicideTimeout = 50 * time.Millisecond jumps := uint32(0) tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}