Rename KubernetesMesosExecutor -> Executor

This commit is contained in:
Dr. Stefan Schimanski 2015-11-04 11:43:48 +01:00
parent 68179524ba
commit ec582e736e
2 changed files with 31 additions and 31 deletions

View File

@ -99,7 +99,7 @@ type NodeInfo struct {
// KubernetesExecutor is an mesos executor that runs pods
// in a minion machine.
type KubernetesMesosExecutor struct {
type Executor struct {
updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown
state stateType
tasks map[string]*kuberTask
@ -136,13 +136,13 @@ type Config struct {
NodeInfos chan<- NodeInfo
}
func (k *KubernetesMesosExecutor) isConnected() bool {
func (k *Executor) isConnected() bool {
return connectedState == (&k.state).get()
}
// New creates a new kubernetes executor.
func New(config Config) *KubernetesMesosExecutor {
k := &KubernetesMesosExecutor{
func New(config Config) *Executor {
k := &Executor{
updateChan: config.Updates,
state: disconnectedState,
tasks: make(map[string]*kuberTask),
@ -187,7 +187,7 @@ func New(config Config) *KubernetesMesosExecutor {
return k
}
func (k *KubernetesMesosExecutor) Init(driver bindings.ExecutorDriver) {
func (k *Executor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers()
k.resetSuicideWatch(driver)
@ -196,7 +196,7 @@ func (k *KubernetesMesosExecutor) Init(driver bindings.ExecutorDriver) {
//TODO(jdef) monitor kubeletFinished and shutdown if it happens
}
func (k *KubernetesMesosExecutor) isDone() bool {
func (k *Executor) isDone() bool {
select {
case <-k.terminate:
return true
@ -206,7 +206,7 @@ func (k *KubernetesMesosExecutor) isDone() bool {
}
// sendPodUpdate assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *KubernetesMesosExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
func (k *Executor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
if k.isDone() {
return false
}
@ -215,7 +215,7 @@ func (k *KubernetesMesosExecutor) sendPodUpdate(u *kubetypes.PodUpdate) bool {
}
// Registered is called when the executor is successfully registered with the slave.
func (k *KubernetesMesosExecutor) Registered(driver bindings.ExecutorDriver,
func (k *Executor) Registered(driver bindings.ExecutorDriver,
executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) {
if k.isDone() {
return
@ -252,7 +252,7 @@ func (k *KubernetesMesosExecutor) Registered(driver bindings.ExecutorDriver,
// Reregistered is called when the executor is successfully re-registered with the slave.
// This can happen when the slave fails over.
func (k *KubernetesMesosExecutor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos.SlaveInfo) {
func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos.SlaveInfo) {
if k.isDone() {
return
}
@ -280,7 +280,7 @@ func (k *KubernetesMesosExecutor) Reregistered(driver bindings.ExecutorDriver, s
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *KubernetesMesosExecutor) initializeStaticPodsSource(data []byte) {
func (k *Executor) initializeStaticPodsSource(data []byte) {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := archive.UnzipDir(data, k.staticPodsConfigPath)
if err != nil {
@ -290,7 +290,7 @@ func (k *KubernetesMesosExecutor) initializeStaticPodsSource(data []byte) {
}
// Disconnected is called when the executor is disconnected from the slave.
func (k *KubernetesMesosExecutor) Disconnected(driver bindings.ExecutorDriver) {
func (k *Executor) Disconnected(driver bindings.ExecutorDriver) {
if k.isDone() {
return
}
@ -306,7 +306,7 @@ func (k *KubernetesMesosExecutor) Disconnected(driver bindings.ExecutorDriver) {
// is running, but the binding is not recorded in the Kubernetes store yet.
// This function is invoked to tell the executor to record the binding in the
// Kubernetes store and start the pod via the Kubelet.
func (k *KubernetesMesosExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.TaskInfo) {
func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.TaskInfo) {
if k.isDone() {
return
}
@ -356,7 +356,7 @@ func (k *KubernetesMesosExecutor) LaunchTask(driver bindings.ExecutorDriver, tas
go k.launchTask(driver, taskId, pod)
}
func (k *KubernetesMesosExecutor) handleChangedApiserverPod(pod *api.Pod) {
func (k *Executor) handleChangedApiserverPod(pod *api.Pod) {
// exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
@ -402,7 +402,7 @@ func (k *KubernetesMesosExecutor) handleChangedApiserverPod(pod *api.Pod) {
// a timer that, upon expiration, causes this executor to commit suicide.
// this implementation runs asynchronously. callers that wish to wait for the
// reset to complete may wait for the returned signal chan to close.
func (k *KubernetesMesosExecutor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan struct{} {
func (k *Executor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
@ -432,7 +432,7 @@ func (k *KubernetesMesosExecutor) resetSuicideWatch(driver bindings.ExecutorDriv
return ch
}
func (k *KubernetesMesosExecutor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan struct{}) {
func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan struct{}) {
k.lock.Lock()
defer k.lock.Unlock()
@ -464,7 +464,7 @@ func (k *KubernetesMesosExecutor) attemptSuicide(driver bindings.ExecutorDriver,
}
// async continuation of LaunchTask
func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) {
func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) {
deleteTask := func() {
k.lock.Lock()
defer k.lock.Unlock()
@ -588,7 +588,7 @@ func (k *KubernetesMesosExecutor) launchTask(driver bindings.ExecutorDriver, tas
go k._launchTask(driver, taskId, podFullName, psf)
}
func (k *KubernetesMesosExecutor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) {
func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) {
expired := make(chan struct{})
@ -669,7 +669,7 @@ reportLost:
k.reportLostTask(driver, taskId, messages.LaunchTaskFailed)
}
func (k *KubernetesMesosExecutor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) {
func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) {
// TODO(nnielsen): Monitor health of pod and report if lost.
// Should we also allow this to fail a couple of times before reporting lost?
// What if the docker daemon is restarting and we can't connect, but it's
@ -692,7 +692,7 @@ func (k *KubernetesMesosExecutor) __launchTask(driver bindings.ExecutorDriver, t
// whether the pod is running. It will only return false if the task is still registered and the pod is
// registered in Docker. Otherwise it returns true. If there's still a task record on file, but no pod
// in Docker, then we'll also send a TASK_LOST event.
func (k *KubernetesMesosExecutor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool {
func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool {
// TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks)
k.lock.Lock()
defer k.lock.Unlock()
@ -716,7 +716,7 @@ func (k *KubernetesMesosExecutor) checkForLostPodTask(driver bindings.ExecutorDr
}
// KillTask is called when the executor receives a request to kill a task.
func (k *KubernetesMesosExecutor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) {
func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) {
if k.isDone() {
return
}
@ -735,14 +735,14 @@ func (k *KubernetesMesosExecutor) KillTask(driver bindings.ExecutorDriver, taskI
// Reports a lost task to the slave and updates internal task and pod tracking state.
// Assumes that the caller is locking around pod and task state.
func (k *KubernetesMesosExecutor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) {
func (k *Executor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) {
k.removePodTask(driver, tid, reason, mesos.TaskState_TASK_LOST)
}
// deletes the pod and task associated with the task identified by tid and sends a task
// status update to mesos. also attempts to reset the suicide watch.
// Assumes that the caller is locking around pod and task state.
func (k *KubernetesMesosExecutor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) {
func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) {
task, ok := k.tasks[tid]
if !ok {
log.V(1).Infof("Failed to remove task, unknown task %v\n", tid)
@ -770,7 +770,7 @@ func (k *KubernetesMesosExecutor) removePodTask(driver bindings.ExecutorDriver,
}
// FrameworkMessage is called when the framework sends some message to the executor
func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDriver, message string) {
func (k *Executor) FrameworkMessage(driver bindings.ExecutorDriver, message string) {
if k.isDone() {
return
}
@ -798,14 +798,14 @@ func (k *KubernetesMesosExecutor) FrameworkMessage(driver bindings.ExecutorDrive
}
// Shutdown is called when the executor receives a shutdown request.
func (k *KubernetesMesosExecutor) Shutdown(driver bindings.ExecutorDriver) {
func (k *Executor) Shutdown(driver bindings.ExecutorDriver) {
k.lock.Lock()
defer k.lock.Unlock()
k.doShutdown(driver)
}
// assumes that caller has obtained state lock
func (k *KubernetesMesosExecutor) doShutdown(driver bindings.ExecutorDriver) {
func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
defer func() {
log.Errorf("exiting with unclean shutdown: %v", recover())
if k.exitFunc != nil {
@ -859,7 +859,7 @@ func (k *KubernetesMesosExecutor) doShutdown(driver bindings.ExecutorDriver) {
}
// Destroy existing k8s containers
func (k *KubernetesMesosExecutor) killKubeletContainers() {
func (k *Executor) killKubeletContainers() {
if containers, err := dockertools.GetKubeletDockerContainers(k.dockerClient, true); err == nil {
opts := docker.RemoveContainerOptions{
RemoveVolumes: true,
@ -878,7 +878,7 @@ func (k *KubernetesMesosExecutor) killKubeletContainers() {
}
// Error is called when some error happens.
func (k *KubernetesMesosExecutor) Error(driver bindings.ExecutorDriver, message string) {
func (k *Executor) Error(driver bindings.ExecutorDriver, message string) {
log.Errorln(message)
}
@ -890,7 +890,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes
}
}
func (k *KubernetesMesosExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) {
func (k *Executor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) {
select {
case <-k.terminate:
default:
@ -898,7 +898,7 @@ func (k *KubernetesMesosExecutor) sendStatus(driver bindings.ExecutorDriver, sta
}
}
func (k *KubernetesMesosExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) {
func (k *Executor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) {
select {
case <-k.terminate:
default:
@ -906,7 +906,7 @@ func (k *KubernetesMesosExecutor) sendFrameworkMessage(driver bindings.ExecutorD
}
}
func (k *KubernetesMesosExecutor) sendLoop() {
func (k *Executor) sendLoop() {
defer log.V(1).Info("sender loop exiting")
for {
select {

View File

@ -66,7 +66,7 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
return args.Get(0).(mesosproto.Status), args.Error(1)
}
func NewTestKubernetesExecutor() (*KubernetesMesosExecutor, chan kubetypes.PodUpdate) {
func NewTestKubernetesExecutor() (*Executor, chan kubetypes.PodUpdate) {
updates := make(chan kubetypes.PodUpdate, 1024)
return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),