diff --git a/contrib/mesos/pkg/executor/apis.go b/contrib/mesos/pkg/executor/apis.go new file mode 100644 index 00000000000..b0672c8b2fc --- /dev/null +++ b/contrib/mesos/pkg/executor/apis.go @@ -0,0 +1,45 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "k8s.io/kubernetes/contrib/mesos/pkg/node" + "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +type kubeAPI interface { + killPod(ns, name string) error +} + +type nodeAPI interface { + createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error) +} + +// clientAPIWrapper implements kubeAPI and node API, which serve to isolate external dependencies +// such that they're easier to mock in unit test. +type clientAPIWrapper struct { + client *client.Client +} + +func (cw *clientAPIWrapper) killPod(ns, name string) error { + return cw.client.Pods(ns).Delete(name, api.NewDeleteOptions(0)) +} + +func (cw *clientAPIWrapper) createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error) { + return node.CreateOrUpdate(cw.client, hostname, slaveAttrLabels, annotations) +} diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 9d74efaea07..5270a4ee29a 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -39,11 +39,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" ) @@ -88,24 +86,12 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool { } } -type kuberTask struct { - mesosTaskInfo *mesos.TaskInfo - launchTimer *time.Timer // launchTimer expires when the launch-task process duration exceeds launchGracePeriod - podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods -} - -type podStatusFunc func() (*api.PodStatus, error) - // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type Executor struct { - updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown state stateType - tasks map[string]*kuberTask - pods map[string]*api.Pod lock sync.Mutex - client *client.Client - terminate chan struct{} // signals that the executor should shutdown + terminate chan struct{} // signals that the executor is shutting down outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver dockerClient dockertools.DockerInterface suicideWatch suicideWatcher @@ -113,26 +99,27 @@ type Executor struct { shutdownAlert func() // invoked just prior to executor shutdown kubeletFinished <-chan struct{} // signals that kubelet Run() died exitFunc func(int) - podStatusFunc func(*api.Pod) (*api.PodStatus, error) staticPodsConfigPath string launchGracePeriod time.Duration nodeInfos chan<- NodeInfo initCompleted chan struct{} // closes upon completion of Init() + registry Registry + watcher *watcher + kubeAPI kubeAPI + nodeAPI nodeAPI } type Config struct { - Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet APIClient *client.Client Docker dockertools.DockerInterface ShutdownAlert func() SuicideTimeout time.Duration KubeletFinished <-chan struct{} // signals that kubelet Run() died ExitFunc func(int) - PodStatusFunc func(*api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string - PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration NodeInfos chan<- NodeInfo + Registry Registry } func (k *Executor) isConnected() bool { @@ -149,11 +136,7 @@ func New(config Config) *Executor { launchGracePeriod = time.Duration(math.MaxInt64) } k := &Executor{ - updateChan: config.Updates, state: disconnectedState, - tasks: make(map[string]*kuberTask), - pods: make(map[string]*api.Pod), - client: config.APIClient, terminate: make(chan struct{}), outgoing: make(chan func() (mesos.Status, error), 1024), dockerClient: config.Docker, @@ -162,24 +145,52 @@ func New(config Config) *Executor { suicideWatch: &suicideTimer{}, shutdownAlert: config.ShutdownAlert, exitFunc: config.ExitFunc, - podStatusFunc: config.PodStatusFunc, staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: launchGracePeriod, nodeInfos: config.NodeInfos, initCompleted: make(chan struct{}), + registry: config.Registry, + kubeAPI: &clientAPIWrapper{config.APIClient}, + nodeAPI: &clientAPIWrapper{config.APIClient}, } + runtime.On(k.initCompleted, k.runSendLoop) - po := newPodObserver(config.PodLW, k.updateTask, k.terminate) - runtime.On(k.initCompleted, po.run) + k.watcher = newWatcher(k.registry.watch()) + runtime.On(k.initCompleted, k.watcher.run) return k } +// Done returns a chan that closes when the executor is shutting down +func (k *Executor) Done() <-chan struct{} { + return k.terminate +} + func (k *Executor) Init(driver bindings.ExecutorDriver) { defer close(k.initCompleted) + k.killKubeletContainers() k.resetSuicideWatch(driver) + + k.watcher.addFilter(func(podEvent *PodEvent) bool { + switch podEvent.eventType { + case PodEventIncompatibleUpdate: + log.Warningf("killing %s because of an incompatible update", podEvent.FormatShort()) + k.killPodTask(driver, podEvent.taskID) + // halt processing of this event; when the pod is deleted we'll receive another + // event for that. + return false + + case PodEventDeleted: + // an active pod-task was deleted, alert mesos: + // send back a TASK_KILLED status, we completed the pod-task lifecycle normally. + k.resetSuicideWatch(driver) + k.sendStatus(driver, newStatus(mutil.NewTaskID(podEvent.taskID), mesos.TaskState_TASK_KILLED, "pod-deleted")) + } + return true + }) + //TODO(jdef) monitor kubeletFinished and shutdown if it happens } @@ -192,23 +203,6 @@ func (k *Executor) isDone() bool { } } -// sendPodsSnapshot assumes that caller is holding state lock; returns true when update is sent otherwise false -func (k *Executor) sendPodsSnapshot() bool { - if k.isDone() { - return false - } - snapshot := make([]*api.Pod, 0, len(k.pods)) - for _, v := range k.pods { - snapshot = append(snapshot, v) - } - u := &kubetypes.PodUpdate{ - Op: kubetypes.SET, - Pods: snapshot, - } - k.updateChan <- *u - return true -} - // Registered is called when the executor is successfully registered with the slave. func (k *Executor) Registered( driver bindings.ExecutorDriver, @@ -245,8 +239,7 @@ func (k *Executor) Registered( } if slaveInfo != nil { - _, err := node.CreateOrUpdate( - k.client, + _, err := k.nodeAPI.createOrUpdate( slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes), annotations, @@ -257,10 +250,8 @@ func (k *Executor) Registered( } } - // emit an empty update to allow the mesos "source" to be marked as seen k.lock.Lock() defer k.lock.Unlock() - k.sendPodsSnapshot() if slaveInfo != nil && k.nodeInfos != nil { k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics @@ -279,8 +270,7 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos } if slaveInfo != nil { - _, err := node.CreateOrUpdate( - k.client, + _, err := k.nodeAPI.createOrUpdate( slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes), nil, // don't change annotations @@ -335,8 +325,17 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta if k.isDone() { return } + log.Infof("Launch task %v\n", taskInfo) + taskID := taskInfo.GetTaskId().GetValue() + if p := k.registry.pod(taskID); p != nil { + log.Warningf("task %v already launched", taskID) + // Not to send back TASK_RUNNING or TASK_FAILED here, because + // may be duplicated messages + return + } + if !k.isConnected() { log.Errorf("Ignore launch task because the executor is disconnected\n") k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED, @@ -359,27 +358,10 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta return } - taskId := taskInfo.GetTaskId().GetValue() - k.lock.Lock() - defer k.lock.Unlock() - - if _, found := k.tasks[taskId]; found { - log.Errorf("task already launched\n") - // Not to send back TASK_RUNNING here, because - // may be duplicated messages or duplicated task id. - return - } - // remember this task so that: - // (a) we ignore future launches for it - // (b) we have a record of it so that we can kill it if needed - // (c) we're leaving podName == "" for now, indicates we don't need to delete containers - k.tasks[taskId] = &kuberTask{ - mesosTaskInfo: taskInfo, - launchTimer: time.NewTimer(k.launchGracePeriod), - } k.resetSuicideWatch(driver) - go k.launchTask(driver, taskId, pod) + // run the next step aync because it calls out to apiserver and we don't want to block here + go k.bindAndWatchTask(driver, taskInfo, time.NewTimer(k.launchGracePeriod), pod) } // determine whether we need to start a suicide countdown. if so, then start @@ -398,7 +380,7 @@ func (k *Executor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan stru } if k.suicideWatch != nil { - if len(k.tasks) > 0 { + if !k.registry.empty() { k.suicideWatch.Stop() return } @@ -430,12 +412,8 @@ func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan s } // fail-safe, will abort kamikaze attempts if there are tasks - if len(k.tasks) > 0 { - ids := []string{} - for taskid := range k.tasks { - ids = append(ids, taskid) - } - log.Errorf("suicide attempt failed, there are still running tasks: %v", ids) + if !k.registry.empty() { + log.Errorf("suicide attempt failed, there are still running tasks") return } @@ -447,308 +425,126 @@ func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan s } } -// async continuation of LaunchTask -func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) { - deleteTask := func() { - k.lock.Lock() - defer k.lock.Unlock() - delete(k.tasks, taskId) - k.resetSuicideWatch(driver) - } - - // TODO(k8s): use Pods interface for binding once clusters are upgraded - // return b.Pods(binding.Namespace).Bind(binding) - if pod.Spec.NodeName == "" { - //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go - binding := &api.Binding{ - ObjectMeta: api.ObjectMeta{ - Namespace: pod.Namespace, - Name: pod.Name, - Annotations: make(map[string]string), - }, - Target: api.ObjectReference{ - Kind: "Node", - Name: pod.Annotations[meta.BindingHostKey], - }, - } - - // forward the annotations that the scheduler wants to apply - for k, v := range pod.Annotations { - binding.Annotations[k] = v - } - - // create binding on apiserver - log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations) - ctx := api.WithNamespace(api.NewContext(), binding.Namespace) - err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error() - if err != nil { - deleteTask() - k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, - messages.CreateBindingFailure)) - return - } - } else { - // post annotations update to apiserver - patch := struct { - Metadata struct { - Annotations map[string]string `json:"annotations"` - } `json:"metadata"` - }{} - patch.Metadata.Annotations = pod.Annotations - patchJson, _ := json.Marshal(patch) - log.V(4).Infof("Patching annotations %v of pod %v/%v: %v", pod.Annotations, pod.Namespace, pod.Name, string(patchJson)) - err := k.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error() - if err != nil { - log.Errorf("Error updating annotations of ready-to-launch pod %v/%v: %v", pod.Namespace, pod.Name, err) - deleteTask() - k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, - messages.AnnotationUpdateFailure)) - return - } - } - +func podStatusData(pod *api.Pod, status api.PodStatus) ([]byte, string, error) { podFullName := container.GetPodFullName(pod) - - // allow a recently failed-over scheduler the chance to recover the task/pod binding: - // it may have failed and recovered before the apiserver is able to report the updated - // binding information. replays of this status event will signal to the scheduler that - // the apiserver should be up-to-date. data, err := json.Marshal(api.PodStatusResult{ ObjectMeta: api.ObjectMeta{ Name: podFullName, SelfLink: "/podstatusresult", }, + Status: status, }) - if err != nil { - deleteTask() - log.Errorf("failed to marshal pod status result: %v", err) - k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED, - err.Error())) - return - } - - k.lock.Lock() - defer k.lock.Unlock() - - // find task - task, found := k.tasks[taskId] - if !found { - log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId) - k.reportLostTask(driver, taskId, messages.LaunchTaskFailed) - return - } - - //TODO(jdef) check for duplicate pod name, if found send TASK_ERROR - - // send the new pod to the kubelet which will spin it up - task.podName = podFullName - k.pods[podFullName] = pod - ok := k.sendPodsSnapshot() - if !ok { - task.podName = "" - delete(k.pods, podFullName) - return // executor is terminating, cancel launch - } - - // From here on, we need to delete containers associated with the task upon - // it going into a terminal state. - - // report task is starting to scheduler - statusUpdate := &mesos.TaskStatus{ - TaskId: mutil.NewTaskID(taskId), - State: mesos.TaskState_TASK_STARTING.Enum(), - Message: proto.String(messages.CreateBindingSuccess), - Data: data, - } - k.sendStatus(driver, statusUpdate) - - // Delay reporting 'task running' until container is up. - psf := podStatusFunc(func() (*api.PodStatus, error) { - return k.podStatusFunc(pod) - }) - go k._launchTask(driver, taskId, podFullName, psf, task.launchTimer.C) + return data, podFullName, err } -func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc, expired <-chan time.Time) { - getMarshalledInfo := func() (data []byte, cancel bool) { - // potentially long call.. - if podStatus, err := psf(); err == nil && podStatus != nil { - select { - case <-expired: - cancel = true - default: - k.lock.Lock() - defer k.lock.Unlock() - if _, found := k.tasks[taskId]; !found { - // don't bother with the pod status if the task is already gone - cancel = true - break - } else if podStatus.Phase != api.PodRunning { - // avoid sending back a running status before it's really running - break - } - log.V(2).Infof("Found pod status: '%v'", podStatus) - result := api.PodStatusResult{ - ObjectMeta: api.ObjectMeta{ - Name: podFullName, - SelfLink: "/podstatusresult", - }, - Status: *podStatus, - } - if data, err = json.Marshal(result); err != nil { - log.Errorf("failed to marshal pod status result: %v", err) - } - } +// async continuation of LaunchTask +func (k *Executor) bindAndWatchTask(driver bindings.ExecutorDriver, task *mesos.TaskInfo, launchTimer *time.Timer, pod *api.Pod) { + success := false + defer func() { + if !success { + k.killPodTask(driver, task.TaskId.GetValue()) + k.resetSuicideWatch(driver) } - return - } - -waitForRunningPod: - for { - select { - case <-expired: - log.Warningf("Launch expired grace period of '%v'", k.launchGracePeriod) - break waitForRunningPod - case <-time.After(containerPollTime): - if data, cancel := getMarshalledInfo(); cancel { - break waitForRunningPod - } else if data == nil { - continue waitForRunningPod - } else { - k.lock.Lock() - defer k.lock.Unlock() - task, found := k.tasks[taskId] - if !found { - goto reportLost - } - - statusUpdate := &mesos.TaskStatus{ - TaskId: mutil.NewTaskID(taskId), - State: mesos.TaskState_TASK_RUNNING.Enum(), - Message: proto.String(fmt.Sprintf("pod-running:%s", podFullName)), - Data: data, - } - - k.sendStatus(driver, statusUpdate) - task.launchTimer.Stop() - - // continue to monitor the health of the pod - go k.__launchTask(driver, taskId, podFullName, psf) - return - } - } - } - - k.lock.Lock() - defer k.lock.Unlock() -reportLost: - k.reportLostTask(driver, taskId, messages.KubeletPodLaunchFailed) -} - -func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { - // TODO(nnielsen): Monitor health of pod and report if lost. - // Should we also allow this to fail a couple of times before reporting lost? - // What if the docker daemon is restarting and we can't connect, but it's - // going to bring the pods back online as soon as it restarts? - knownPod := func() bool { - _, err := psf() - return err == nil - } - // Wait for the pod to go away and stop monitoring once it does - // TODO (jdefelice) replace with an /events watch? - for { - time.Sleep(lostPodPollTime) - if k.checkForLostPodTask(driver, taskId, knownPod) { - return - } - } -} - -// Intended to be executed as part of the pod monitoring loop, this fn (ultimately) checks with Docker -// whether the pod is running. It will only return false if the task is still registered and the pod is -// registered in Docker. Otherwise it returns true. If there's still a task record on file, but no pod -// in Docker, then we'll also send a TASK_LOST event. -func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool { - // TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks) - - // isKnownPod() can block so we take special precaution to avoid locking this mutex while calling it - knownTask := func() (ok bool) { - k.lock.Lock() - defer k.lock.Unlock() - _, ok = k.tasks[taskId] - return }() - // TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the - // kubelet may constantly attempt to instantiate a pod as long as it's in the pod state that we're - // handing to it. otherwise, we're probably reporting a TASK_LOST prematurely. Should probably - // consult RestartPolicy to determine appropriate behavior. Should probably also gracefully handle - // docker daemon restarts. - if knownTask { - if isKnownPod() { - return false - } else { - log.Warningf("Detected lost pod, reporting lost task %v", taskId) - - k.lock.Lock() - defer k.lock.Unlock() - k.reportLostTask(driver, taskId, messages.ContainersDisappeared) - } - } else { - log.V(2).Infof("Task %v no longer registered, stop monitoring for lost pods", taskId) + // allow a recently failed-over scheduler the chance to recover the task/pod binding: + // it may have failed and recovered before the apiserver is able to report the updated + // binding information. replays of this status event will signal to the scheduler that + // the apiserver should be up-to-date. + startingData, _, err := podStatusData(pod, api.PodStatus{}) + if err != nil { + log.Errorf("failed to generate pod-task starting data for task %v pod %v/%v: %v", + task.TaskId.GetValue(), pod.Namespace, pod.Name, err) + k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error())) + return } - return true + + err = k.registry.bind(task.TaskId.GetValue(), pod) + if err != nil { + log.Errorf("failed to bind task %v pod %v/%v: %v", + task.TaskId.GetValue(), pod.Namespace, pod.Name, err) + k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error())) + return + } + + // send TASK_STARTING + k.sendStatus(driver, &mesos.TaskStatus{ + TaskId: task.TaskId, + State: mesos.TaskState_TASK_STARTING.Enum(), + Message: proto.String(messages.CreateBindingSuccess), + Data: startingData, + }) + + // within the launch timeout window we should see a pod-task update via the registry. + // if we see a Running update then we need to generate a TASK_RUNNING status update for mesos. + handlerFinished := false + handler := watchHandler{ + expiration: watchExpiration{ + timeout: launchTimer.C, + onEvent: func(taskID string) { + if !handlerFinished { + // launch timeout expired + k.killPodTask(driver, task.TaskId.GetValue()) + } + }, + }, + onEvent: func(podEvent *PodEvent) (bool, error) { + switch podEvent.eventType { + case PodEventUpdated: + log.V(2).Infof("Found status: '%v' for %s", podEvent.pod.Status, podEvent.FormatShort()) + + if podEvent.pod.Status.Phase != api.PodRunning { + // still waiting for pod to transition to a running state, so + // we're not done monitoring yet; check back later.. + break + } + + data, podFullName, err := podStatusData(podEvent.pod, podEvent.pod.Status) + if err != nil { + return false, fmt.Errorf("failed to marshal pod status result: %v", err) + } + + defer k.sendStatus(driver, &mesos.TaskStatus{ + TaskId: task.TaskId, + State: mesos.TaskState_TASK_RUNNING.Enum(), + Message: proto.String("pod-running:" + podFullName), + Data: data, + }) + fallthrough + + case PodEventDeleted: + // we're done monitoring because pod has been deleted + handlerFinished = true + launchTimer.Stop() + } + return handlerFinished, nil + }, + } + k.watcher.forTask(task.TaskId.GetValue(), handler) + success = true } // KillTask is called when the executor receives a request to kill a task. func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) { - if k.isDone() { - return - } - log.Infof("Kill task %v\n", taskId) - - if !k.isConnected() { - //TODO(jdefelice) sent TASK_LOST here? - log.Warningf("Ignore kill task because the executor is disconnected\n") - return - } - - k.lock.Lock() - defer k.lock.Unlock() - k.removePodTask(driver, taskId.GetValue(), messages.TaskKilled, mesos.TaskState_TASK_KILLED) + k.killPodTask(driver, taskId.GetValue()) } -// Reports a lost task to the slave and updates internal task and pod tracking state. -// Assumes that the caller is locking around pod and task state. -func (k *Executor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) { - k.removePodTask(driver, tid, reason, mesos.TaskState_TASK_LOST) -} - -// deletes the pod and task associated with the task identified by tid and sends a task +// deletes the pod and task associated with the task identified by taskID and sends a task // status update to mesos. also attempts to reset the suicide watch. -// Assumes that the caller is locking around pod and task state. -func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) { - task, ok := k.tasks[tid] - if !ok { - log.V(1).Infof("Failed to remove task, unknown task %v\n", tid) +func (k *Executor) killPodTask(driver bindings.ExecutorDriver, taskID string) { + pod := k.registry.pod(taskID) + if pod == nil { + log.V(1).Infof("Failed to remove task, unknown task %v\n", taskID) + k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskID}, mesos.TaskState_TASK_LOST, "kill-pod-task")) return } - delete(k.tasks, tid) - k.resetSuicideWatch(driver) - pid := task.podName - _, found := k.pods[pid] - if !found { - log.Warningf("Cannot remove unknown pod %v for task %v", pid, tid) - } else { - log.V(2).Infof("deleting pod %v for task %v", pid, tid) - delete(k.pods, pid) - - // tell the kubelet to remove the pod - k.sendPodsSnapshot() + // force-delete the pod from the API server + // TODO(jdef) possibly re-use eviction code from stock k8s once it lands? + err := k.kubeAPI.killPod(pod.Namespace, pod.Name) + if err != nil { + log.V(1).Infof("failed to delete task %v pod %v/%v from apiserver: %+v", taskID, pod.Namespace, pod.Name, err) } - // TODO(jdef): ensure that the update propagates, perhaps return a signal chan? - k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason)) } // FrameworkMessage is called when the framework sends some message to the executor @@ -766,11 +562,15 @@ func (k *Executor) FrameworkMessage(driver bindings.ExecutorDriver, message stri if strings.HasPrefix(message, messages.TaskLost+":") { taskId := message[len(messages.TaskLost)+1:] if taskId != "" { + // TODO(jdef) would it make more sense to check the status of the task and + // just replay the last non-terminal message that we sent if the task is + // still active? + // clean up pod state - k.lock.Lock() - defer k.lock.Unlock() - k.reportLostTask(driver, taskId, messages.TaskLostAck) + k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskId}, mesos.TaskState_TASK_LOST, messages.TaskLostAck)) + k.killPodTask(driver, taskId) } + return } switch message { @@ -799,7 +599,6 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) { // signal to all listeners that this KubeletExecutor is done! close(k.terminate) - close(k.updateChan) close(k.nodeInfos) if k.shutdownAlert != nil { @@ -819,7 +618,7 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) { // according to docs, mesos will generate TASK_LOST updates for us // if needed, so don't take extra time to do that here. - k.tasks = map[string]*kuberTask{} + k.registry.shutdown() select { // the main Run() func may still be running... wait for it to finish: it will @@ -940,36 +739,3 @@ func annotationsFor(ei *mesos.ExecutorInfo) (annotations map[string]string, err return } - -// updateTask executes some mutating operation for the given task/pod, blocking until the update is either -// attempted or discarded. uses the executor state lock to synchronize concurrent invocation. returns true -// only if the specified update operation was attempted and also returns true. a result of true also indicates -// changes have been sent upstream to the kubelet. -func (k *Executor) updateTask(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) { - k.lock.Lock() - defer k.lock.Unlock() - - // exclude tasks which are already deleted from our task registry - task := k.tasks[taskId] - if task == nil { - // the pod has completed the launch-task-binding phase because it's been annotated with - // the task-id, but we don't have a record of it; it's best to let the scheduler reconcile. - // it's also possible that our update queue is backed up and hasn't caught up with the state - // of the world yet. - - // TODO(jdef) should we hint to the scheduler (via TASK_FAILED, reason=PodRefersToUnknownTask)? - - err = fmt.Errorf("task %s not found", taskId) - return - } - - oldPod := k.pods[task.podName] - changed = f(task, oldPod) - - // TODO(jdef) this abstraction isn't perfect since changes that only impact the task struct, - // and not the pod, don't require a new pod snapshot sent back to the kubelet. - if changed { - k.sendPodsSnapshot() - } - return -} diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 52e9cafe65a..139042a7a49 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -19,12 +19,10 @@ package executor import ( "fmt" "io/ioutil" - "net" "net/http" "net/http/httptest" "os" "path/filepath" - "reflect" "sync" "sync/atomic" "testing" @@ -39,10 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/dockertools" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" @@ -56,27 +51,11 @@ import ( // after Register is called. func TestExecutorRegister(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor, updates := NewTestKubernetesExecutor() + executor := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) - initialPodUpdate := kubetypes.PodUpdate{ - Pods: []*api.Pod{}, - Op: kubetypes.SET, - } - receivedInitialPodUpdate := false - select { - case update := <-updates: - if reflect.DeepEqual(initialPodUpdate, update) { - receivedInitialPodUpdate = true - } - case <-time.After(util.ForeverTestTimeout): - } - assert.Equal(t, true, receivedInitialPodUpdate, - "executor should have sent an initial PodUpdate "+ - "to the updates chan upon registration") - assert.Equal(t, true, executor.isConnected(), "executor should be connected") mockDriver.AssertExpectations(t) } @@ -85,7 +64,7 @@ func TestExecutorRegister(t *testing.T) { // connected after a call to Disconnected has occurred. func TestExecutorDisconnect(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor, _ := NewTestKubernetesExecutor() + executor := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -100,7 +79,7 @@ func TestExecutorDisconnect(t *testing.T) { // after a connection problem happens, followed by a call to Reregistered. func TestExecutorReregister(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor, _ := NewTestKubernetesExecutor() + executor := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -111,65 +90,109 @@ func TestExecutorReregister(t *testing.T) { mockDriver.AssertExpectations(t) } -type fakeKubelet struct { - *kubelet.Kubelet - hostIP net.IP +type fakeRegistry struct { + sync.Mutex + boundTasks map[string]*api.Pod + updates chan *PodEvent } -func (kl *fakeKubelet) GetHostIP() (net.IP, error) { - return kl.hostIP, nil +func newFakeRegistry() *fakeRegistry { + return &fakeRegistry{boundTasks: map[string]*api.Pod{}, updates: make(chan *PodEvent, 100)} } -// TestExecutorLaunchAndKillTask ensures that the executor is able to launch -// and kill tasks while properly bookkeping its tasks. -func TestExecutorLaunchAndKillTask(t *testing.T) { - // create a fake pod watch. We use that below to submit new pods to the scheduler - podListWatch := NewMockPodsListWatch(api.PodList{}) +func (r *fakeRegistry) empty() bool { + r.Lock() + defer r.Unlock() + return len(r.boundTasks) == 0 +} - // create fake apiserver - testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list) - // TODO: Uncomment when fix #19254 - // defer testApiServer.server.Close() +func (r *fakeRegistry) pod(taskID string) *api.Pod { + r.Lock() + defer r.Unlock() + return r.boundTasks[taskID] +} - mockDriver := &MockExecutorDriver{} - updates := make(chan kubetypes.PodUpdate, 1024) - config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - NodeInfos: make(chan NodeInfo, 1), - APIClient: client.NewOrDie(&client.Config{ - Host: testApiServer.server.URL, - GroupVersion: testapi.Default.GroupVersion(), - }), - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - return &api.PodStatus{ - ContainerStatuses: []api.ContainerStatus{ - { - Name: "foo", - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - }, - }, - Phase: api.PodRunning, - HostIP: "127.0.0.1", - }, nil - }, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, +func (r *fakeRegistry) watch() <-chan *PodEvent { return r.updates } + +func (r *fakeRegistry) shutdown() { + r.Lock() + defer r.Unlock() + r.boundTasks = map[string]*api.Pod{} +} + +func (r *fakeRegistry) bind(taskID string, pod *api.Pod) error { + r.Lock() + defer r.Unlock() + pod.Annotations = map[string]string{ + "k8s.mesosphere.io/taskId": taskID, } - executor := New(config) + r.boundTasks[taskID] = pod + // the normal registry sends a bind.. + r.updates <- &PodEvent{pod: pod, taskID: taskID, eventType: PodEventBound} + return nil +} + +func (r *fakeRegistry) Update(pod *api.Pod) (*PodEvent, error) { + r.Lock() + defer r.Unlock() + taskID, err := taskIDFor(pod) + if err != nil { + return nil, err + } + if _, ok := r.boundTasks[taskID]; !ok { + return nil, errUnknownTask + } + rp := &PodEvent{pod: pod, taskID: taskID, eventType: PodEventUpdated} + r.updates <- rp + return rp, nil +} + +func (r *fakeRegistry) Remove(taskID string) error { + r.Lock() + defer r.Unlock() + pod, ok := r.boundTasks[taskID] + if !ok { + return errUnknownTask + } + delete(r.boundTasks, taskID) + r.updates <- &PodEvent{pod: pod, taskID: taskID, eventType: PodEventDeleted} + return nil +} + +// phaseChange simulates a pod source update; normally this update is generated from a watch +func (r *fakeRegistry) phaseChange(pod *api.Pod, phase api.PodPhase) error { + clone, err := api.Scheme.DeepCopy(pod) + if err != nil { + return err + } + + phasedPod := clone.(*api.Pod) + phasedPod.Status.Phase = phase + _, err = r.Update(phasedPod) + return err +} + +// TestExecutorLaunchAndKillTask ensures that the executor is able to launch tasks and generates +// appropriate status messages for mesos. It then kills the task and validates that appropriate +// actions are taken by the executor. +func TestExecutorLaunchAndKillTask(t *testing.T) { + var ( + mockDriver = &MockExecutorDriver{} + registry = newFakeRegistry() + executor = New(Config{ + Docker: dockertools.ConnectToDockerOrDie("fake://"), + NodeInfos: make(chan NodeInfo, 1), + Registry: registry, + }) + mockKubeAPI = &mockKubeAPI{} + pod = NewTestPod(1) + executorinfo = &mesosproto.ExecutorInfo{} + ) + executor.kubeAPI = mockKubeAPI executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) - select { - case <-updates: - case <-time.After(util.ForeverTestTimeout): - t.Fatalf("Executor should send an initial update on Registration") - } - - pod := NewTestPod(1) - executorinfo := &mesosproto.ExecutorInfo{} podTask, err := podtask.New( api.NewDefaultContext(), "", @@ -178,22 +201,24 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { nil, nil, ) - assert.Equal(t, nil, err, "must be able to create a task from a pod") - podTask.Spec = &podtask.Spec{ - Executor: executorinfo, + pod.Annotations = map[string]string{ + "k8s.mesosphere.io/taskId": podTask.ID, } + + podTask.Spec = &podtask.Spec{Executor: executorinfo} taskInfo, err := podTask.BuildTaskInfo() assert.Equal(t, nil, err, "must be able to build task info") data, err := testapi.Default.Codec().Encode(pod) assert.Equal(t, nil, err, "must be able to encode a pod's spec data") + taskInfo.Data = data var statusUpdateCalls sync.WaitGroup + statusUpdateCalls.Add(1) statusUpdateDone := func(_ mock.Arguments) { statusUpdateCalls.Done() } - statusUpdateCalls.Add(1) mockDriver.On( "SendStatusUpdate", mesosproto.TaskState_TASK_STARTING, @@ -210,20 +235,16 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() - return len(executor.tasks) == 1 && len(executor.pods) == 1 + return !registry.empty() }, "executor must be able to create a task and a pod") - gotPodUpdate := false - select { - case update := <-updates: - if len(update.Pods) == 1 { - gotPodUpdate = true - } - case <-time.After(util.ForeverTestTimeout): - } - assert.Equal(t, true, gotPodUpdate, - "the executor should send an update about a new pod to "+ - "the updates chan when creating a new one.") + // simulate a pod source update; normally this update is generated when binding a pod + err = registry.phaseChange(pod, api.PodPending) + assert.NoError(t, err) + + // simulate a pod source update; normally this update is generated by the kubelet once the pod is healthy + err = registry.phaseChange(pod, api.PodRunning) + assert.NoError(t, err) // Allow some time for asynchronous requests to the driver. finished := kmruntime.After(statusUpdateCalls.Wait) @@ -239,12 +260,16 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { mesosproto.TaskState_TASK_KILLED, ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(statusUpdateDone).Once() - executor.KillTask(mockDriver, taskInfo.TaskId) + // simulate what happens when the apiserver is told to delete a pod + mockKubeAPI.On("killPod", pod.Namespace, pod.Name).Return(nil).Run(func(_ mock.Arguments) { + registry.Remove(podTask.ID) + }) + executor.KillTask(mockDriver, taskInfo.TaskId) assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() - return len(executor.tasks) == 0 && len(executor.pods) == 0 + return registry.empty() }, "executor must be able to kill a created task and pod") // Allow some time for asynchronous requests to the driver. @@ -254,7 +279,9 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } + mockDriver.AssertExpectations(t) + mockKubeAPI.AssertExpectations(t) } // TestExecutorStaticPods test that the ExecutorInfo.data is parsed @@ -340,52 +367,30 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) { // its state. When a Kamikaze message is received, the executor should // attempt suicide. func TestExecutorFrameworkMessage(t *testing.T) { - // create fake apiserver - podListWatch := NewMockPodsListWatch(api.PodList{}) - testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list) - // TODO: Uncomment when fix #19254 - // defer testApiServer.server.Close() - // create and start executor - mockDriver := &MockExecutorDriver{} - kubeletFinished := make(chan struct{}) - config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan kubetypes.PodUpdate, 1024), - NodeInfos: make(chan NodeInfo, 1), - APIClient: client.NewOrDie(&client.Config{ - Host: testApiServer.server.URL, - GroupVersion: testapi.Default.GroupVersion(), - }), - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - return &api.PodStatus{ - ContainerStatuses: []api.ContainerStatus{ - { - Name: "foo", - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - }, - }, - Phase: api.PodRunning, - HostIP: "127.0.0.1", - }, nil - }, - ShutdownAlert: func() { - close(kubeletFinished) - }, - KubeletFinished: kubeletFinished, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - } - executor := New(config) + var ( + mockDriver = &MockExecutorDriver{} + kubeletFinished = make(chan struct{}) + registry = newFakeRegistry() + executor = New(Config{ + Docker: dockertools.ConnectToDockerOrDie("fake://"), + NodeInfos: make(chan NodeInfo, 1), + ShutdownAlert: func() { + close(kubeletFinished) + }, + KubeletFinished: kubeletFinished, + Registry: registry, + }) + pod = NewTestPod(1) + mockKubeAPI = &mockKubeAPI{} + ) + executor.kubeAPI = mockKubeAPI executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) - executor.FrameworkMessage(mockDriver, "test framework message") // set up a pod to then lose - pod := NewTestPod(1) executorinfo := &mesosproto.ExecutorInfo{} podTask, _ := podtask.New( api.NewDefaultContext(), @@ -395,10 +400,13 @@ func TestExecutorFrameworkMessage(t *testing.T) { nil, nil, ) - + pod.Annotations = map[string]string{ + "k8s.mesosphere.io/taskId": podTask.ID, + } podTask.Spec = &podtask.Spec{ Executor: executorinfo, } + taskInfo, err := podTask.BuildTaskInfo() assert.Equal(t, nil, err, "must be able to build task info") @@ -418,9 +426,21 @@ func TestExecutorFrameworkMessage(t *testing.T) { executor.LaunchTask(mockDriver, taskInfo) + // must wait for this otherwise phase changes may not apply + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + executor.lock.Lock() + defer executor.lock.Unlock() + return !registry.empty() + }, "executor must be able to create a task and a pod") + + err = registry.phaseChange(pod, api.PodPending) + assert.NoError(t, err) + err = registry.phaseChange(pod, api.PodRunning) + assert.NoError(t, err) + // waiting until the pod is really running b/c otherwise a TASK_FAILED could be - // triggered by the asynchronously running _launchTask, __launchTask methods - // when removing the task from k.tasks through the "task-lost:foo" message below. + // triggered by the asynchronously running executor methods when removing the task + // from k.tasks through the "task-lost:foo" message below. select { case <-called: case <-time.After(util.ForeverTestTimeout): @@ -434,11 +454,17 @@ func TestExecutorFrameworkMessage(t *testing.T) { mesosproto.TaskState_TASK_LOST, ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once() + // simulate what happens when the apiserver is told to delete a pod + mockKubeAPI.On("killPod", pod.Namespace, pod.Name).Return(nil).Run(func(_ mock.Arguments) { + registry.Remove(podTask.ID) + }) + executor.FrameworkMessage(mockDriver, "task-lost:foo") + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() - return len(executor.tasks) == 0 && len(executor.pods) == 0 + return registry.empty() }, "executor must be able to kill a created task and pod") select { @@ -454,6 +480,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { "executor should have shut down after receiving a Kamikaze message") mockDriver.AssertExpectations(t) + mockKubeAPI.AssertExpectations(t) } // Create a pod with a given index, requiring one port @@ -504,7 +531,7 @@ type TestServer struct { lock sync.Mutex } -func NewTestServer(t *testing.T, namespace string, pods *api.PodList) *TestServer { +func NewTestServer(t *testing.T, namespace string) *TestServer { ts := TestServer{ Stats: map[string]uint{}, } @@ -537,62 +564,41 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch { // TestExecutorShutdown ensures that the executor properly shuts down // when Shutdown is called. func TestExecutorShutdown(t *testing.T) { - mockDriver := &MockExecutorDriver{} - kubeletFinished := make(chan struct{}) - var exitCalled int32 = 0 - updates := make(chan kubetypes.PodUpdate, 1024) - config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - NodeInfos: make(chan NodeInfo, 1), - ShutdownAlert: func() { - close(kubeletFinished) - }, - KubeletFinished: kubeletFinished, - ExitFunc: func(_ int) { - atomic.AddInt32(&exitCalled, 1) - }, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - } - executor := New(config) + var ( + mockDriver = &MockExecutorDriver{} + kubeletFinished = make(chan struct{}) + exitCalled = int32(0) + executor = New(Config{ + Docker: dockertools.ConnectToDockerOrDie("fake://"), + NodeInfos: make(chan NodeInfo, 1), + ShutdownAlert: func() { + close(kubeletFinished) + }, + KubeletFinished: kubeletFinished, + ExitFunc: func(_ int) { + atomic.AddInt32(&exitCalled, 1) + }, + Registry: newFakeRegistry(), + }) + ) executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) - mockDriver.On("Stop").Return(mesosproto.Status_DRIVER_STOPPED, nil).Once() - executor.Shutdown(mockDriver) assert.Equal(t, false, executor.isConnected(), "executor should not be connected after Shutdown") assert.Equal(t, true, executor.isDone(), "executor should be in Done state after Shutdown") - - // channel should be closed now, only a constant number of updates left - num := len(updates) -drainLoop: - for { - select { - case _, ok := <-updates: - if !ok { - break drainLoop - } - num -= 1 - default: - t.Fatal("Updates chan should be closed after Shutdown") - } - } - assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown") - assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, "the executor should call its ExitFunc when it is ready to close down") - mockDriver.AssertExpectations(t) } func TestExecutorsendFrameworkMessage(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor, _ := NewTestKubernetesExecutor() + executor := NewTestKubernetesExecutor() executor.Init(mockDriver) executor.Registered(mockDriver, nil, nil, nil) @@ -612,77 +618,3 @@ func TestExecutorsendFrameworkMessage(t *testing.T) { } mockDriver.AssertExpectations(t) } - -func TestExecutor_updateMetaMap(t *testing.T) { - for i, tc := range []struct { - oldmap map[string]string - newmap map[string]string - wants bool - }{ - { - oldmap: nil, - newmap: nil, - wants: false, - }, - { - oldmap: nil, - newmap: map[string]string{}, - wants: false, - }, - { - oldmap: map[string]string{}, - newmap: nil, - wants: false, - }, - { - oldmap: nil, - newmap: map[string]string{ - "foo": "bar", - }, - wants: true, - }, - { - oldmap: map[string]string{}, - newmap: map[string]string{ - "foo": "bar", - }, - wants: true, - }, - { - oldmap: map[string]string{ - "baz": "qax", - }, - newmap: map[string]string{ - "foo": "bar", - }, - wants: true, - }, - { - oldmap: map[string]string{ - "baz": "qax", - }, - newmap: nil, - wants: true, - }, - { - oldmap: map[string]string{ - "baz": "qax", - "qwe": "iop", - }, - newmap: map[string]string{ - "foo": "bar", - "qwe": "iop", - }, - wants: true, - }, - } { - // do work here - actual := updateMetaMap(&tc.oldmap, tc.newmap) - if actual != tc.wants { - t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.wants, actual) - } - if len(tc.oldmap) != len(tc.newmap) || (len(tc.oldmap) > 0 && !reflect.DeepEqual(tc.oldmap, tc.newmap)) { - t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.newmap, tc.oldmap) - } - } -} diff --git a/contrib/mesos/pkg/executor/mock_test.go b/contrib/mesos/pkg/executor/mock_test.go index a547bded681..fe70393a0b8 100644 --- a/contrib/mesos/pkg/executor/mock_test.go +++ b/contrib/mesos/pkg/executor/mock_test.go @@ -22,11 +22,18 @@ import ( "github.com/mesos/mesos-go/mesosproto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/dockertools" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +type mockKubeAPI struct { + mock.Mock +} + +func (m *mockKubeAPI) killPod(ns, name string) error { + args := m.Called(ns, name) + return args.Error(0) +} + type MockExecutorDriver struct { mock.Mock } @@ -66,18 +73,16 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status return args.Get(0).(mesosproto.Status), args.Error(1) } -func NewTestKubernetesExecutor() (*Executor, chan kubetypes.PodUpdate) { - updates := make(chan kubetypes.PodUpdate, 1024) +func NewTestKubernetesExecutor() *Executor { return New(Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, - PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, - }), updates + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Registry: newFakeRegistry(), + }) } func TestExecutorNew(t *testing.T) { mockDriver := &MockExecutorDriver{} - executor, _ := NewTestKubernetesExecutor() + executor := NewTestKubernetesExecutor() executor.Init(mockDriver) assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization") diff --git a/contrib/mesos/pkg/executor/observer.go b/contrib/mesos/pkg/executor/observer.go deleted file mode 100644 index 8ef92dcb240..00000000000 --- a/contrib/mesos/pkg/executor/observer.go +++ /dev/null @@ -1,171 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package executor - -import ( - log "github.com/golang/glog" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/controller/framework" -) - -// taskUpdateTx execute a task update transaction f for the task identified by -// taskId. if no such task exists then f is not invoked and an error is -// returned. if f is invoked then taskUpdateTx returns the bool result of f. -type taskUpdateTx func(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) - -// podObserver receives callbacks for every pod state change on the apiserver and -// for each decides whether to execute a task update transaction. -type podObserver struct { - podController *framework.Controller - terminate <-chan struct{} - taskUpdateTx taskUpdateTx -} - -func newPodObserver(podLW cache.ListerWatcher, taskUpdateTx taskUpdateTx, terminate <-chan struct{}) *podObserver { - // watch pods from the given pod ListWatch - if podLW == nil { - // fail early to make debugging easier - panic("cannot create executor with nil PodLW") - } - - p := &podObserver{ - terminate: terminate, - taskUpdateTx: taskUpdateTx, - } - _, p.podController = framework.NewInformer(podLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*api.Pod) - log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name) - p.handleChangedApiserverPod(pod) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - pod := newObj.(*api.Pod) - log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name) - p.handleChangedApiserverPod(pod) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*api.Pod) - log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name) - }, - }) - return p -} - -// run begins observing pod state changes; blocks until the terminate chan closes. -func (p *podObserver) run() { - p.podController.Run(p.terminate) -} - -// handleChangedApiserverPod is invoked for pod add/update state changes and decides whether -// task updates are necessary. if so, a task update is executed via taskUpdateTx. -func (p *podObserver) handleChangedApiserverPod(pod *api.Pod) { - // Don't do anything for pods without task anotation which means: - // - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already. - // - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change. - // - all other pods that haven't passed through the launch-task-binding phase, which would set annotations. - taskId := pod.Annotations[meta.TaskIdKey] - if taskId == "" { - // There also could be a race between the overall launch-task process and this update, but here we - // will never be able to process such a stale update because the "update pod" that we're receiving - // in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale - // update on the floor because we'll get another update later that, in addition to the changes that - // we're dropping now, will also include the changes from the binding process. - log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) - return - } - - _, err := p.taskUpdateTx(taskId, func(_ *kuberTask, relatedPod *api.Pod) (sendSnapshot bool) { - if relatedPod == nil { - // should never happen because: - // (a) the update pod record has already passed through the binding phase in launchTasks() - // (b) all remaining updates to executor.{pods,tasks} are sync'd in unison - log.Errorf("internal state error: pod not found for task %s", taskId) - return - } - - // TODO(sttts): check whether we can and should send all "semantic" changes down to the kubelet - // see kubelet/config/config.go for semantic change detection - - // check for updated labels/annotations: need to forward these for the downward API - sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Labels, pod.Labels) - sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Annotations, pod.Annotations) - - // terminating pod? - if pod.Status.Phase == api.PodRunning { - timeModified := differentTime(relatedPod.DeletionTimestamp, pod.DeletionTimestamp) - graceModified := differentPeriod(relatedPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds) - if timeModified || graceModified { - log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", - pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds) - - // modify the pod in our registry instead of sending the new pod. The later - // would allow that other changes bleed into the kubelet. For now we are - // very conservative changing this behaviour. - relatedPod.DeletionTimestamp = pod.DeletionTimestamp - relatedPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds - sendSnapshot = true - } - } - return - }) - if err != nil { - log.Errorf("failed to update pod %s/%s: %+v", pod.Namespace, pod.Name, err) - } -} - -// updateMetaMap looks for differences between src and dest; if there are differences -// then dest is changed (possibly to point to src) and this func returns true. -func updateMetaMap(dest *map[string]string, src map[string]string) (changed bool) { - // check for things in dest that are missing in src - for k := range *dest { - if _, ok := src[k]; !ok { - changed = true - break - } - } - if !changed { - if len(*dest) == 0 { - if len(src) > 0 { - changed = true - goto finished - } - // no update needed - return - } - // check for things in src that are missing/different in dest - for k, v := range src { - if vv, ok := (*dest)[k]; !ok || vv != v { - changed = true - break - } - } - } -finished: - *dest = src - return -} - -func differentTime(a, b *unversioned.Time) bool { - return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) -} - -func differentPeriod(a, b *int64) bool { - return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) -} diff --git a/contrib/mesos/pkg/executor/registry.go b/contrib/mesos/pkg/executor/registry.go new file mode 100644 index 00000000000..d81f41e272a --- /dev/null +++ b/contrib/mesos/pkg/executor/registry.go @@ -0,0 +1,340 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "encoding/json" + "errors" + "sync" + + "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" + + log "github.com/golang/glog" +) + +type ( + podEventType int + + PodEvent struct { + pod *api.Pod + taskID string + eventType podEventType + } + + // Registry is a state store for pod task metadata. Clients are expected to watch() the + // event stream to observe changes over time. + Registry interface { + // Update modifies the registry's iternal representation of the pod; it may also + // modify the pod argument itself. An update may fail because either a pod isn't + // labeled with a task ID, the task ID is unknown, or the nature of the update may + // be incompatible with what's supported in kubernetes-mesos. + Update(pod *api.Pod) (*PodEvent, error) + + // Remove the task from this registry, returns an error if the taskID is unknown. + Remove(taskID string) error + + // bind associates a taskID with a pod, triggers the binding API on the k8s apiserver + // and stores the resulting pod-task metadata. + bind(taskID string, pod *api.Pod) error + + // watch returns the event stream of the registry. clients are expected to read this + // stream otherwise the event buffer will fill up and registry ops will block. + watch() <-chan *PodEvent + + // return true if there are no tasks registered + empty() bool + + // return the api.Pod registered to the given taskID or else nil + pod(taskID string) *api.Pod + + // shutdown any related async processing and clear the internal state of the registry + shutdown() + } + + registryImpl struct { + client *client.Client + updates chan *PodEvent + m sync.RWMutex + boundTasks map[string]*api.Pod + } +) + +var ( + errCreateBindingFailed = errors.New(messages.CreateBindingFailure) + errAnnotationUpdateFailure = errors.New(messages.AnnotationUpdateFailure) + errUnknownTask = errors.New("unknown task ID") + errUnsupportedUpdate = errors.New("pod update allowed by k8s is incompatible with this version of k8s-mesos") +) + +const ( + PodEventBound podEventType = iota + PodEventUpdated + PodEventDeleted + PodEventIncompatibleUpdate + + updatesBacklogSize = 200 +) + +func IsUnsupportedUpdate(err error) bool { + return err == errUnsupportedUpdate +} + +func (rp *PodEvent) Task() string { + return rp.taskID +} + +func (rp *PodEvent) Pod() *api.Pod { + return rp.pod +} + +func (rp *PodEvent) FormatShort() string { + return "task '" + rp.taskID + "' pod '" + rp.pod.Namespace + "/" + rp.pod.Name + "'" +} + +func NewRegistry(client *client.Client) Registry { + r := ®istryImpl{ + client: client, + updates: make(chan *PodEvent, updatesBacklogSize), + boundTasks: make(map[string]*api.Pod), + } + return r +} + +func (r registryImpl) watch() <-chan *PodEvent { + return r.updates +} + +func taskIDFor(pod *api.Pod) (taskID string, err error) { + taskID = pod.Annotations[meta.TaskIdKey] + if taskID == "" { + err = errUnknownTask + } + return +} + +func (r registryImpl) shutdown() { + //TODO(jdef) flesh this out + r.m.Lock() + defer r.m.Unlock() + r.boundTasks = map[string]*api.Pod{} +} + +func (r registryImpl) empty() bool { + r.m.RLock() + defer r.m.RUnlock() + return len(r.boundTasks) == 0 +} + +func (r registryImpl) pod(taskID string) *api.Pod { + r.m.RLock() + defer r.m.RUnlock() + return r.boundTasks[taskID] +} + +func (r registryImpl) Remove(taskID string) error { + r.m.Lock() + defer r.m.Unlock() + pod, ok := r.boundTasks[taskID] + if !ok { + return errUnknownTask + } + + delete(r.boundTasks, taskID) + + r.updates <- &PodEvent{ + pod: pod, + taskID: taskID, + eventType: PodEventDeleted, + } + + log.V(1).Infof("unbound task %v from pod %v/%v", taskID, pod.Namespace, pod.Name) + return nil +} + +func (r registryImpl) Update(pod *api.Pod) (*PodEvent, error) { + // Don't do anything for pods without task anotation which means: + // - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already. + // - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change. + // - all other pods that haven't passed through the launch-task-binding phase, which would set annotations. + taskID, err := taskIDFor(pod) + if err != nil { + // There also could be a race between the overall launch-task process and this update, but here we + // will never be able to process such a stale update because the "update pod" that we're receiving + // in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale + // update on the floor because we'll get another update later that, in addition to the changes that + // we're dropping now, will also include the changes from the binding process. + log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) + return nil, err + } + + // be a good citizen: copy the arg before making any changes to it + clone, err := api.Scheme.DeepCopy(pod) + if err != nil { + return nil, err + } + pod = clone.(*api.Pod) + + r.m.Lock() + defer r.m.Unlock() + oldPod, ok := r.boundTasks[taskID] + if !ok { + return nil, errUnknownTask + } + + registeredPod := &PodEvent{ + pod: pod, + taskID: taskID, + eventType: PodEventUpdated, + } + + // TODO(jdef) would be nice to only execute this logic based on the presence of + // some particular annotation: + // - preserve the original container port spec since the k8sm scheduler + // has likely changed it. + if !copyPorts(pod, oldPod) { + // TODO(jdef) the state of "pod" is possibly inconsistent at this point. + // we don't care for the moment - we might later. + registeredPod.eventType = PodEventIncompatibleUpdate + r.updates <- registeredPod + log.Warningf("pod containers changed in an incompatible way; aborting update") + return registeredPod, errUnsupportedUpdate + } + + // update our internal copy and broadcast the change + r.boundTasks[taskID] = pod + r.updates <- registeredPod + + log.V(1).Infof("updated task %v pod %v/%v", taskID, pod.Namespace, pod.Name) + return registeredPod, nil +} + +// copyPorts copies the container pod specs from src to dest and returns +// true if all ports (in both dest and src) are accounted for, otherwise +// false. if returning false then it's possible that only a partial copy +// has been performed. +func copyPorts(dest, src *api.Pod) bool { + containers := src.Spec.Containers + ctPorts := make(map[string][]api.ContainerPort, len(containers)) + for i := range containers { + ctPorts[containers[i].Name] = containers[i].Ports + } + containers = dest.Spec.Containers + for i := range containers { + name := containers[i].Name + if ports, found := ctPorts[name]; found { + containers[i].Ports = ports + delete(ctPorts, name) + } else { + // old pod spec is missing this container?! + return false + } + } + if len(ctPorts) > 0 { + // new pod spec has containers that aren't in the old pod spec + return false + } + return true +} + +func (r registryImpl) bind(taskID string, pod *api.Pod) error { + + // validate taskID matches that of the annotation + annotatedTaskID, err := taskIDFor(pod) + if err != nil { + log.Warning("failed to bind: missing task ID annotation for pod ", pod.Namespace+"/"+pod.Name) + return errCreateBindingFailed + } + if annotatedTaskID != taskID { + log.Warningf("failed to bind: expected task-id %v instead of %v for pod %v/%v", taskID, annotatedTaskID, pod.Namespace, pod.Name) + return errCreateBindingFailed + } + + // record this as a bound task for now so that we can avoid racing with the mesos pod source, who is + // watching the apiserver for pod updates and will verify pod-task validity with us upon receiving such + boundSuccessfully := false + defer func() { + if !boundSuccessfully { + r.m.Lock() + defer r.m.Unlock() + delete(r.boundTasks, taskID) + } + }() + func() { + r.m.Lock() + defer r.m.Unlock() + r.boundTasks[taskID] = pod + }() + + if pod.Spec.NodeName == "" { + //HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go + binding := &api.Binding{ + ObjectMeta: api.ObjectMeta{ + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: make(map[string]string), + }, + Target: api.ObjectReference{ + Kind: "Node", + Name: pod.Annotations[meta.BindingHostKey], + }, + } + + // forward the annotations that the scheduler wants to apply + for k, v := range pod.Annotations { + binding.Annotations[k] = v + } + + // create binding on apiserver + log.Infof("Binding task %v pod '%v/%v' to '%v' with annotations %+v...", + taskID, pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations) + ctx := api.WithNamespace(api.NewContext(), binding.Namespace) + err := r.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error() + if err != nil { + log.Warningf("failed to bind task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err) + return errCreateBindingFailed + } + } else { + // post annotations update to apiserver + patch := struct { + Metadata struct { + Annotations map[string]string `json:"annotations"` + } `json:"metadata"` + }{} + patch.Metadata.Annotations = pod.Annotations + patchJson, _ := json.Marshal(patch) + log.V(4).Infof("Patching annotations %v of task %v pod %v/%v: %v", pod.Annotations, taskID, pod.Namespace, pod.Name, string(patchJson)) + err := r.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error() + if err != nil { + log.Errorf("Error updating annotations of ready-to-launch task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err) + return errAnnotationUpdateFailure + } + } + + boundSuccessfully = true + + r.updates <- &PodEvent{ + pod: pod, + taskID: taskID, + eventType: PodEventBound, + } + + log.V(1).Infof("bound task %v to pod %v/%v", taskID, pod.Namespace, pod.Name) + return nil +} diff --git a/contrib/mesos/pkg/executor/service/podsource.go b/contrib/mesos/pkg/executor/service/podsource.go index 1cb25488a22..5a546c8a0c7 100644 --- a/contrib/mesos/pkg/executor/service/podsource.go +++ b/contrib/mesos/pkg/executor/service/podsource.go @@ -17,12 +17,12 @@ limitations under the License. package service import ( - "fmt" - - log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/executor" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + + log "github.com/golang/glog" ) const ( @@ -32,97 +32,95 @@ const ( mesosSource = kubetypes.ApiserverSource ) -// sourceMesos merges pods from mesos, and mirror pods from the apiserver. why? -// (a) can't have two sources with the same name; -// (b) all sources, other than ApiserverSource are considered static/mirror -// sources, and; -// (c) kubelet wants to see mirror pods reflected in a non-static source. -// -// Mesos pods must appear to come from apiserver due to (b), while reflected -// static pods (mirror pods) must appear to come from apiserver due to (c). -// -// The only option I could think of was creating a source that merges the pod -// streams. I don't like it. But I could think of anything else, other than -// starting to hack up the kubelet's understanding of mirror/static pod -// sources (ouch!) -type sourceMesos struct { - sourceFinished chan struct{} // sourceFinished closes when mergeAndForward exits - out chan<- interface{} // out is the sink for merged pod snapshots - mirrorPods chan []*api.Pod // mirrorPods communicates snapshots of the current set of mirror pods - execUpdates <-chan kubetypes.PodUpdate // execUpdates receives snapshots of the current set of mesos pods -} +type ( + podName struct { + namespace, name string + } + + sourceMesos struct { + stop <-chan struct{} + out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well + registry executor.Registry + priorPodNames map[podName]string // map podName to taskID + } +) -// newSourceMesos creates a pod config source that merges pod updates from -// mesos (via execUpdates), and mirror pod updates from the apiserver (via -// podWatch) writing the merged update stream to the out chan. It is expected -// that execUpdates will only ever contain SET operations. The source takes -// ownership of the sourceFinished chan, closing it when the source terminates. -// Source termination happens when the execUpdates chan is closed and fully -// drained of updates. func newSourceMesos( - sourceFinished chan struct{}, - execUpdates <-chan kubetypes.PodUpdate, + stop <-chan struct{}, out chan<- interface{}, podWatch *cache.ListWatch, + registry executor.Registry, ) { source := &sourceMesos{ - sourceFinished: sourceFinished, - mirrorPods: make(chan []*api.Pod), - execUpdates: execUpdates, - out: out, + stop: stop, + out: out, + registry: registry, + priorPodNames: make(map[podName]string), } - // reflect changes from the watch into a chan, filtered to include only mirror pods (have an ConfigMirrorAnnotationKey attr) - cache.NewReflector(podWatch, &api.Pod{}, cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), 0).RunUntil(sourceFinished) - go source.mergeAndForward() + // reflect changes from the watch into a chan, filtered to include only mirror pods + // (have an ConfigMirrorAnnotationKey attr) + cache.NewReflector( + podWatch, + &api.Pod{}, + cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), + 0, + ).RunUntil(stop) } +// send is an update callback invoked by NewUndeltaStore func (source *sourceMesos) send(objs []interface{}) { - var mirrors []*api.Pod + var ( + pods = make([]*api.Pod, 0, len(objs)) + podNames = make(map[podName]string, len(objs)) + ) + for _, o := range objs { p := o.(*api.Pod) + addPod := false if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok { - mirrors = append(mirrors, p) + // pass through all mirror pods + addPod = true + } else if rpod, err := source.registry.Update(p); err == nil { + // pod is bound to a task, and the update is compatible + // so we'll allow it through + addPod = true + p = rpod.Pod() // use the (possibly) updated pod spec! + podNames[podName{p.Namespace, p.Name}] = rpod.Task() + } else if rpod != nil { + // we were able to ID the pod but the update still failed... + log.Warningf("failed to update registry for task %v pod %v/%v: %v", + rpod.Task(), p.Namespace, p.Name, err) + } else { + // unrecognized pod, skip! + log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name) } + + if addPod { + pods = append(pods, p) + } + } + + // detect when pods are deleted and notify the registry + for k, taskID := range source.priorPodNames { + if _, found := podNames[k]; !found { + source.registry.Remove(taskID) + } + } + + source.priorPodNames = podNames + + u := kubetypes.PodUpdate{ + Op: kubetypes.SET, + Pods: pods, + Source: mesosSource, } select { - case <-source.sourceFinished: - case source.mirrorPods <- mirrors: - } -} - -func (source *sourceMesos) mergeAndForward() { - // execUpdates will be closed by the executor on shutdown - defer close(source.sourceFinished) - var ( - mirrors = []*api.Pod{} - pods = []*api.Pod{} - ) -eventLoop: - for { + case <-source.stop: + default: select { - case m := <-source.mirrorPods: - mirrors = m[:] - u := kubetypes.PodUpdate{ - Op: kubetypes.SET, - Pods: append(m, pods...), - Source: mesosSource, - } - log.V(3).Infof("mirror update, sending snapshot of size %d", len(u.Pods)) - source.out <- u - case u, ok := <-source.execUpdates: - if !ok { - break eventLoop - } - if u.Op != kubetypes.SET { - panic(fmt.Sprintf("unexpected Op type: %v", u.Op)) - } - - pods = u.Pods[:] - u.Pods = append(u.Pods, mirrors...) - u.Source = mesosSource - log.V(3).Infof("pods update, sending snapshot of size %d", len(u.Pods)) - source.out <- u + case <-source.stop: + case source.out <- u: } } - log.V(2).Infoln("mesos pod source terminating normally") + log.V(2).Infof("sent %d pod updates", len(pods)) } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 048b59afde5..1523e19c239 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -46,11 +46,6 @@ type KubeletExecutorServer struct { *options.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration - - // TODO(sttts): remove necessity to access the kubelet from the executor - - klet *kubelet.Kubelet // once set, immutable - kletReady chan struct{} // once closed, klet is guaranteed to be valid and concurrently readable } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -58,7 +53,6 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { KubeletServer: options.NewKubeletServer(), SuicideTimeout: config.DefaultSuicideTimeout, LaunchGracePeriod: config.DefaultLaunchGracePeriod, - kletReady: make(chan struct{}), } if pwd, err := os.Getwd(); err != nil { log.Warningf("failed to determine current directory: %v", err) @@ -77,43 +71,20 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { } func (s *KubeletExecutorServer) runExecutor( - execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{}, staticPodsConfigPath string, apiclient *client.Client, - podLW *cache.ListWatch, -) error { + registry executor.Registry, +) (<-chan struct{}, error) { exec := executor.New(executor.Config{ - Updates: execUpdates, - APIClient: apiclient, - Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), - SuicideTimeout: s.SuicideTimeout, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - select { - case <-s.kletReady: - default: - return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") - } - - status, err := s.klet.GetRuntime().GetAPIPodStatus(pod) - if err != nil { - return nil, err - } - - status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses) - hostIP, err := s.klet.GetHostIP() - if err != nil { - log.Errorf("Cannot get host IP: %v", err) - } else { - status.HostIP = hostIP.String() - } - return status, nil - }, + Registry: registry, + APIClient: apiclient, + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, StaticPodsConfigPath: staticPodsConfigPath, - PodLW: podLW, NodeInfos: nodeInfos, }) @@ -125,7 +96,7 @@ func (s *KubeletExecutorServer) runExecutor( } driver, err := bindings.NewMesosExecutorDriver(dconfig) if err != nil { - return fmt.Errorf("failed to create executor driver: %v", err) + return nil, fmt.Errorf("failed to create executor driver: %v", err) } log.V(2).Infof("Initialize executor driver...") exec.Init(driver) @@ -138,16 +109,17 @@ func (s *KubeletExecutorServer) runExecutor( log.Info("executor Run completed") }() - return nil + return exec.Done(), nil } func (s *KubeletExecutorServer) runKubelet( - execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{}, staticPodsConfigPath string, apiclient *client.Client, podLW *cache.ListWatch, + registry executor.Registry, + executorDone <-chan struct{}, ) (err error) { defer func() { if err != nil { @@ -163,19 +135,15 @@ func (s *KubeletExecutorServer) runKubelet( } // apply Mesos specific settings - executorDone := make(chan struct{}) kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { k, pc, err := kubeletapp.CreateAndInitKubelet(kc) if err != nil { return k, pc, err } - s.klet = k.(*kubelet.Kubelet) - close(s.kletReady) // intentionally crash if this is called more than once - // decorate kubelet such that it shuts down when the executor is decorated := &executorKubelet{ - Kubelet: s.klet, + Kubelet: k.(*kubelet.Kubelet), kubeletDone: kubeletDone, executorDone: executorDone, } @@ -230,21 +198,20 @@ func (s *KubeletExecutorServer) runKubelet( } }() - // create main pod source, it will close executorDone when the executor updates stop flowing - newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW) + // create main pod source, it will stop generating events once executorDone is closed + newSourceMesos(executorDone, kcfg.PodConfig.Channel(mesosSource), podLW, registry) // create static-pods directory file source log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) - // run the kubelet, until execUpdates is closed + // run the kubelet // NOTE: because kcfg != nil holds, the upstream Run function will not // initialize the cloud provider. We explicitly wouldn't want // that because then every kubelet instance would query the master // state.json which does not scale. err = kubeletapp.Run(s.KubeletServer, kcfg) - return } @@ -252,7 +219,6 @@ func (s *KubeletExecutorServer) runKubelet( func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { // create shared channels kubeletFinished := make(chan struct{}) - execUpdates := make(chan kubetypes.PodUpdate, 1) nodeInfos := make(chan executor.NodeInfo, 1) // create static pods directory @@ -273,18 +239,22 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { return fmt.Errorf("cannot create API client: %v", err) } - pw := cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, - fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + var ( + pw = cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, + fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), + ) + reg = executor.NewRegistry(apiclient) ) // start executor - err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) + var executorDone <-chan struct{} + executorDone, err = s.runExecutor(nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, reg) if err != nil { return err } // start kubelet, blocking - return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) + return s.runKubelet(nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw, reg, executorDone) } func defaultBindingAddress() string { diff --git a/contrib/mesos/pkg/executor/suicide_test.go b/contrib/mesos/pkg/executor/suicide_test.go index 5426433f671..55ad5ccb0c1 100644 --- a/contrib/mesos/pkg/executor/suicide_test.go +++ b/contrib/mesos/pkg/executor/suicide_test.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" bindings "github.com/mesos/mesos-go/executor" + "k8s.io/kubernetes/pkg/api" ) type suicideTracker struct { @@ -67,7 +68,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper { func TestSuicide_zeroTimeout(t *testing.T) { defer glog.Flush() - k, _ := NewTestKubernetesExecutor() + k := NewTestKubernetesExecutor() tracker := &suicideTracker{suicideWatcher: k.suicideWatch} k.suicideWatch = tracker @@ -92,14 +93,14 @@ func TestSuicide_zeroTimeout(t *testing.T) { func TestSuicide_WithTasks(t *testing.T) { defer glog.Flush() - k, _ := NewTestKubernetesExecutor() + k := NewTestKubernetesExecutor() k.suicideTimeout = 50 * time.Millisecond jumps := uint32(0) tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} k.suicideWatch = tracker - k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding + k.registry.bind("foo", &api.Pod{}) // prevent suicide attempts from succeeding // call reset with a nil timer glog.Infoln("Resetting suicide watch with 1 task") @@ -119,7 +120,7 @@ func TestSuicide_WithTasks(t *testing.T) { t.Fatalf("initial suicide watch setup failed") } - delete(k.tasks, "foo") // zero remaining tasks + k.registry.Remove("foo") // zero remaining tasks k.suicideTimeout = 1500 * time.Millisecond suicideStart := time.Now() @@ -142,7 +143,7 @@ func TestSuicide_WithTasks(t *testing.T) { } k.lock.Lock() - k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding + k.registry.bind("foo", &api.Pod{}) // prevent suicide attempts from succeeding k.lock.Unlock() // reset the suicide watch, which should stop the existing timer @@ -164,7 +165,7 @@ func TestSuicide_WithTasks(t *testing.T) { } k.lock.Lock() - delete(k.tasks, "foo") // allow suicide attempts to schedule + k.registry.Remove("foo") // allow suicide attempts to schedule k.lock.Unlock() // reset the suicide watch, which should reset a stopped timer diff --git a/contrib/mesos/pkg/executor/watcher.go b/contrib/mesos/pkg/executor/watcher.go new file mode 100644 index 00000000000..081e85375ba --- /dev/null +++ b/contrib/mesos/pkg/executor/watcher.go @@ -0,0 +1,150 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "sync" + "time" + + log "github.com/golang/glog" +) + +type ( + // filter registration events, return false to abort further processing of the event + watchFilter func(pod *PodEvent) (accept bool) + + watchExpiration struct { + // timeout closes when the handler has expired; it delivers at most one Time. + timeout <-chan time.Time + + // onEvent is an optional callback that is invoked if/when the expired chan + // closes + onEvent func(taskID string) + } + + watchHandler struct { + // prevent callbacks from being invoked simultaneously + sync.Mutex + + // handle registration events, return true to indicate the handler should be + // de-registered upon completion. If pod is nil then the associated handler + // has expired. + onEvent func(pod *PodEvent) (done bool, err error) + + // expiration is an optional configuration that indicates when a handler should + // be considered to have expired, and what action to take upon such + expiration watchExpiration + } + + // watcher observes PodEvent events and conditionally executes handlers that + // have been associated with the taskID of the PodEvent. + watcher struct { + updates <-chan *PodEvent + rw sync.RWMutex + handlers map[string]watchHandler + filters []watchFilter + runOnce chan struct{} + } +) + +func newWatcher(updates <-chan *PodEvent) *watcher { + return &watcher{ + updates: updates, + handlers: make(map[string]watchHandler), + runOnce: make(chan struct{}), + } +} + +func (pw *watcher) run() { + select { + case <-pw.runOnce: + log.Error("run() has already been invoked for this pod-watcher") + return + default: + close(pw.runOnce) + } +updateLoop: + for u := range pw.updates { + log.V(2).Info("filtering " + u.FormatShort()) + for _, f := range pw.filters { + if !f(u) { + continue updateLoop + } + } + log.V(1).Info("handling " + u.FormatShort()) + h, ok := func() (h watchHandler, ok bool) { + pw.rw.RLock() + defer pw.rw.RUnlock() + h, ok = pw.handlers[u.taskID] + return + }() + if ok { + log.V(1).Info("executing action for " + u.FormatShort()) + done, err := func() (bool, error) { + h.Lock() + defer h.Unlock() + return h.onEvent(u) + }() + if err != nil { + log.Error(err) + } + if done { + // de-register handler upon successful completion of action + log.V(1).Info("de-registering handler for " + u.FormatShort()) + func() { + pw.rw.Lock() + delete(pw.handlers, u.taskID) + pw.rw.Unlock() + }() + } + } + } +} + +func (pw *watcher) addFilter(f watchFilter) { + select { + case <-pw.runOnce: + log.Errorf("failed to add filter because pod-watcher is already running") + default: + pw.filters = append(pw.filters, f) + } +} + +// forTask associates a handler `h` with the given taskID. +func (pw *watcher) forTask(taskID string, h watchHandler) { + pw.rw.Lock() + pw.handlers[taskID] = h + pw.rw.Unlock() + + if exp := h.expiration; exp.timeout != nil { + go func() { + <-exp.timeout + log.V(1).Infof("expiring handler for task %v", taskID) + + // de-register handler upon expiration + pw.rw.Lock() + delete(pw.handlers, taskID) + pw.rw.Unlock() + + if exp.onEvent != nil { + h.Lock() + defer h.Unlock() + exp.onEvent(taskID) + } + }() + } +}