diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 83340dc07ba..f063ffdc54c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -99,7 +99,7 @@ type NodeInfo struct { // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. -type KubernetesMesosExecutor struct { +type Executor struct { updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType tasks map[string]*kuberTask @@ -136,13 +136,13 @@ type Config struct { NodeInfos chan<- NodeInfo } -func (k *KubernetesMesosExecutor) isConnected() bool { +func (k *Executor) isConnected() bool { return connectedState == (&k.state).get() } // New creates a new kubernetes executor. -func New(config Config) *KubernetesMesosExecutor { - k := &KubernetesMesosExecutor{ +func New(config Config) *Executor { + k := &Executor{ updateChan: config.Updates, state: disconnectedState, tasks: make(map[string]*kuberTask), @@ -187,7 +187,7 @@ func New(config Config) *KubernetesMesosExecutor { return k } -func (k *KubernetesMesosExecutor) Init(driver bindings.ExecutorDriver) { +func (k *Executor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) @@ -196,7 +196,7 @@ func (k *KubernetesMesosExecutor) Init(driver bindings.ExecutorDriver) { //TODO(jdef) monitor kubeletFinished and shutdown if it happens } -func (k *KubernetesMesosExecutor) isDone() bool { +func (k *Executor) isDone() bool { select { case <-k.terminate: return true @@ -206,7 +206,7 @@ func (k *KubernetesMesosExecutor) isDone() bool { } // sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false -func (k *KubernetesMesosExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool { +func (k *Executor) sendPodUpdate(u *kubetypes.PodUpdate) bool { if k.isDone() { return false } @@ -215,7 +215,7 @@ func (k *KubernetesMesosExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool { } // Registered is called when the executor is successfully registered with the slave. -func (k *KubernetesMesosExecutor) Registered(driver bindings.ExecutorDriver, +func (k *Executor) Registered(driver bindings.ExecutorDriver, executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) { if k.isDone() { return @@ -252,7 +252,7 @@ func (k *KubernetesMesosExecutor) Registered(driver bindings.ExecutorDriver, // Reregistered is called when the executor is successfully re-registered with the slave. // This can happen when the slave fails over. -func (k *KubernetesMesosExecutor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos.SlaveInfo) { +func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos.SlaveInfo) { if k.isDone() { return } @@ -280,7 +280,7 @@ func (k *KubernetesMesosExecutor) Reregistered(driver bindings.ExecutorDriver, s } // initializeStaticPodsSource unzips the data slice into the static-pods directory -func (k *KubernetesMesosExecutor) initializeStaticPodsSource(data []byte) { +func (k *Executor) initializeStaticPodsSource(data []byte) { log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) err := archive.UnzipDir(data, k.staticPodsConfigPath) if err != nil { @@ -290,7 +290,7 @@ func (k *KubernetesMesosExecutor) initializeStaticPodsSource(data []byte) { } // Disconnected is called when the executor is disconnected from the slave. -func (k *KubernetesMesosExecutor) Disconnected(driver bindings.ExecutorDriver) { +func (k *Executor) Disconnected(driver bindings.ExecutorDriver) { if k.isDone() { return } @@ -306,7 +306,7 @@ func (k *KubernetesMesosExecutor) Disconnected(driver bindings.ExecutorDriver) { // is running, but the binding is not recorded in the Kubernetes store yet. // This function is invoked to tell the executor to record the binding in the // Kubernetes store and start the pod via the Kubelet. -func (k *KubernetesMesosExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.TaskInfo) { +func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.TaskInfo) { if k.isDone() { return } @@ -356,7 +356,7 @@ func (k *KubernetesMesosExecutor) LaunchTask(driver bindings.ExecutorDriver, tas go k.launchTask(driver, taskId, pod) } -func (k *KubernetesMesosExecutor) handleChangedApiserverPod(pod *api.Pod) { +func (k *Executor) handleChangedApiserverPod(pod *api.Pod) { // exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already taskId := pod.Annotations[meta.TaskIdKey] if taskId == "" { @@ -402,7 +402,7 @@ func (k *KubernetesMesosExecutor) handleChangedApiserverPod(pod *api.Pod) { // a timer that, upon expiration, causes this executor to commit suicide. // this implementation runs asynchronously. callers that wish to wait for the // reset to complete may wait for the returned signal chan to close. -func (k *KubernetesMesosExecutor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan struct{} { +func (k *Executor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan struct{} { ch := make(chan struct{}) go func() { defer close(ch) @@ -432,7 +432,7 @@ func (k *KubernetesMesosExecutor) resetSuicideWatch(driver bindings.ExecutorDriv return ch } -func (k *KubernetesMesosExecutor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan struct{}) { +func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan struct{}) { k.lock.Lock() defer k.lock.Unlock() @@ -464,7 +464,7 @@ func (k *KubernetesMesosExecutor) attemptSuicide(driver bindings.ExecutorDriver, } // async continuation of LaunchTask -func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) { +func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) { deleteTask := func() { k.lock.Lock() defer k.lock.Unlock() @@ -588,7 +588,7 @@ func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, tas go k._launchTask(driver, taskId, podFullName, psf) } -func (k *KubernetesMesosExecutor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { +func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { expired := make(chan struct{}) @@ -669,7 +669,7 @@ reportLost: k.reportLostTask(driver, taskId, messages.LaunchTaskFailed) } -func (k *KubernetesMesosExecutor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { +func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { // TODO(nnielsen): Monitor health of pod and report if lost. // Should we also allow this to fail a couple of times before reporting lost? // What if the docker daemon is restarting and we can't connect, but it's @@ -692,7 +692,7 @@ func (k *KubernetesMesosExecutor) __launchTask(driver bindings.ExecutorDriver, t // whether the pod is running. It will only return false if the task is still registered and the pod is // registered in Docker. Otherwise it returns true. If there's still a task record on file, but no pod // in Docker, then we'll also send a TASK_LOST event. -func (k *KubernetesMesosExecutor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool { +func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool { // TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks) k.lock.Lock() defer k.lock.Unlock() @@ -716,7 +716,7 @@ func (k *KubernetesMesosExecutor) checkForLostPodTask(driver bindings.ExecutorDr } // KillTask is called when the executor receives a request to kill a task. -func (k *KubernetesMesosExecutor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) { +func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) { if k.isDone() { return } @@ -735,14 +735,14 @@ func (k *KubernetesMesosExecutor) KillTask(driver bindings.ExecutorDriver, taskI // Reports a lost task to the slave and updates internal task and pod tracking state. // Assumes that the caller is locking around pod and task state. -func (k *KubernetesMesosExecutor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) { +func (k *Executor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) { k.removePodTask(driver, tid, reason, mesos.TaskState_TASK_LOST) } // deletes the pod and task associated with the task identified by tid and sends a task // status update to mesos. also attempts to reset the suicide watch. // Assumes that the caller is locking around pod and task state. -func (k *KubernetesMesosExecutor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) { +func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) { task, ok := k.tasks[tid] if !ok { log.V(1).Infof("Failed to remove task, unknown task %v\n", tid) @@ -770,7 +770,7 @@ func (k *KubernetesMesosExecutor) removePodTask(driver bindings.ExecutorDriver, } // FrameworkMessage is called when the framework sends some message to the executor -func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDriver, message string) { +func (k *Executor) FrameworkMessage(driver bindings.ExecutorDriver, message string) { if k.isDone() { return } @@ -798,14 +798,14 @@ func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDrive } // Shutdown is called when the executor receives a shutdown request. -func (k *KubernetesMesosExecutor) Shutdown(driver bindings.ExecutorDriver) { +func (k *Executor) Shutdown(driver bindings.ExecutorDriver) { k.lock.Lock() defer k.lock.Unlock() k.doShutdown(driver) } // assumes that caller has obtained state lock -func (k *KubernetesMesosExecutor) doShutdown(driver bindings.ExecutorDriver) { +func (k *Executor) doShutdown(driver bindings.ExecutorDriver) { defer func() { log.Errorf("exiting with unclean shutdown: %v", recover()) if k.exitFunc != nil { @@ -859,7 +859,7 @@ func (k *KubernetesMesosExecutor) doShutdown(driver bindings.ExecutorDriver) { } // Destroy existing k8s containers -func (k *KubernetesMesosExecutor) killKubeletContainers() { +func (k *Executor) killKubeletContainers() { if containers, err := dockertools.GetKubeletDockerContainers(k.dockerClient, true); err == nil { opts := docker.RemoveContainerOptions{ RemoveVolumes: true, @@ -878,7 +878,7 @@ func (k *KubernetesMesosExecutor) killKubeletContainers() { } // Error is called when some error happens. -func (k *KubernetesMesosExecutor) Error(driver bindings.ExecutorDriver, message string) { +func (k *Executor) Error(driver bindings.ExecutorDriver, message string) { log.Errorln(message) } @@ -890,7 +890,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes } } -func (k *KubernetesMesosExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { +func (k *Executor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { select { case <-k.terminate: default: @@ -898,7 +898,7 @@ func (k *KubernetesMesosExecutor) sendStatus(driver bindings.ExecutorDriver, sta } } -func (k *KubernetesMesosExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { +func (k *Executor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { select { case <-k.terminate: default: @@ -906,7 +906,7 @@ func (k *KubernetesMesosExecutor) sendFrameworkMessage(driver bindings.ExecutorD } } -func (k *KubernetesMesosExecutor) sendLoop() { +func (k *Executor) sendLoop() { defer log.V(1).Info("sender loop exiting") for { select { diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index 9532629fcb5..a547bded681 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -66,7 +66,7 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() (*KubernetesMesosExecutor, chan kubetypes.PodUpdate) { +func NewTestKubernetesExecutor() (*Executor, chan kubetypes.PodUpdate) { updates := make(chan kubetypes.PodUpdate, 1024) return New(Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"),