replace pod-observer and check-for-lost-pod polling with a single pod watcher and task registry

This commit is contained in:
James DeFelice 2016-01-14 02:05:11 +00:00
parent cfd046f73f
commit b2013cb1ba
10 changed files with 991 additions and 955 deletions

View File

@ -0,0 +1,45 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
type kubeAPI interface {
killPod(ns, name string) error
}
type nodeAPI interface {
createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error)
}
// clientAPIWrapper implements kubeAPI and node API, which serve to isolate external dependencies
// such that they're easier to mock in unit test.
type clientAPIWrapper struct {
client *client.Client
}
func (cw *clientAPIWrapper) killPod(ns, name string) error {
return cw.client.Pods(ns).Delete(name, api.NewDeleteOptions(0))
}
func (cw *clientAPIWrapper) createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error) {
return node.CreateOrUpdate(cw.client, hostname, slaveAttrLabels, annotations)
}

View File

@ -39,11 +39,9 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
@ -88,24 +86,12 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool {
} }
} }
type kuberTask struct {
mesosTaskInfo *mesos.TaskInfo
launchTimer *time.Timer // launchTimer expires when the launch-task process duration exceeds launchGracePeriod
podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods
}
type podStatusFunc func() (*api.PodStatus, error)
// KubernetesExecutor is an mesos executor that runs pods // KubernetesExecutor is an mesos executor that runs pods
// in a minion machine. // in a minion machine.
type Executor struct { type Executor struct {
updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown
state stateType state stateType
tasks map[string]*kuberTask
pods map[string]*api.Pod
lock sync.Mutex lock sync.Mutex
client *client.Client terminate chan struct{} // signals that the executor is shutting down
terminate chan struct{} // signals that the executor should shutdown
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher suicideWatch suicideWatcher
@ -113,26 +99,27 @@ type Executor struct {
shutdownAlert func() // invoked just prior to executor shutdown shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died kubeletFinished <-chan struct{} // signals that kubelet Run() died
exitFunc func(int) exitFunc func(int)
podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string staticPodsConfigPath string
launchGracePeriod time.Duration launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo nodeInfos chan<- NodeInfo
initCompleted chan struct{} // closes upon completion of Init() initCompleted chan struct{} // closes upon completion of Init()
registry Registry
watcher *watcher
kubeAPI kubeAPI
nodeAPI nodeAPI
} }
type Config struct { type Config struct {
Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet
APIClient *client.Client APIClient *client.Client
Docker dockertools.DockerInterface Docker dockertools.DockerInterface
ShutdownAlert func() ShutdownAlert func()
SuicideTimeout time.Duration SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int) ExitFunc func(int)
PodStatusFunc func(*api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string StaticPodsConfigPath string
PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
LaunchGracePeriod time.Duration LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo NodeInfos chan<- NodeInfo
Registry Registry
} }
func (k *Executor) isConnected() bool { func (k *Executor) isConnected() bool {
@ -149,11 +136,7 @@ func New(config Config) *Executor {
launchGracePeriod = time.Duration(math.MaxInt64) launchGracePeriod = time.Duration(math.MaxInt64)
} }
k := &Executor{ k := &Executor{
updateChan: config.Updates,
state: disconnectedState, state: disconnectedState,
tasks: make(map[string]*kuberTask),
pods: make(map[string]*api.Pod),
client: config.APIClient,
terminate: make(chan struct{}), terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024), outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker, dockerClient: config.Docker,
@ -162,24 +145,52 @@ func New(config Config) *Executor {
suicideWatch: &suicideTimer{}, suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert, shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc, exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc,
staticPodsConfigPath: config.StaticPodsConfigPath, staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: launchGracePeriod, launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos, nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}), initCompleted: make(chan struct{}),
registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient},
nodeAPI: &clientAPIWrapper{config.APIClient},
} }
runtime.On(k.initCompleted, k.runSendLoop) runtime.On(k.initCompleted, k.runSendLoop)
po := newPodObserver(config.PodLW, k.updateTask, k.terminate) k.watcher = newWatcher(k.registry.watch())
runtime.On(k.initCompleted, po.run) runtime.On(k.initCompleted, k.watcher.run)
return k return k
} }
// Done returns a chan that closes when the executor is shutting down
func (k *Executor) Done() <-chan struct{} {
return k.terminate
}
func (k *Executor) Init(driver bindings.ExecutorDriver) { func (k *Executor) Init(driver bindings.ExecutorDriver) {
defer close(k.initCompleted) defer close(k.initCompleted)
k.killKubeletContainers() k.killKubeletContainers()
k.resetSuicideWatch(driver) k.resetSuicideWatch(driver)
k.watcher.addFilter(func(podEvent *PodEvent) bool {
switch podEvent.eventType {
case PodEventIncompatibleUpdate:
log.Warningf("killing %s because of an incompatible update", podEvent.FormatShort())
k.killPodTask(driver, podEvent.taskID)
// halt processing of this event; when the pod is deleted we'll receive another
// event for that.
return false
case PodEventDeleted:
// an active pod-task was deleted, alert mesos:
// send back a TASK_KILLED status, we completed the pod-task lifecycle normally.
k.resetSuicideWatch(driver)
k.sendStatus(driver, newStatus(mutil.NewTaskID(podEvent.taskID), mesos.TaskState_TASK_KILLED, "pod-deleted"))
}
return true
})
//TODO(jdef) monitor kubeletFinished and shutdown if it happens //TODO(jdef) monitor kubeletFinished and shutdown if it happens
} }
@ -192,23 +203,6 @@ func (k *Executor) isDone() bool {
} }
} }
// sendPodsSnapshot assumes that caller is holding state lock; returns true when update is sent otherwise false
func (k *Executor) sendPodsSnapshot() bool {
if k.isDone() {
return false
}
snapshot := make([]*api.Pod, 0, len(k.pods))
for _, v := range k.pods {
snapshot = append(snapshot, v)
}
u := &kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: snapshot,
}
k.updateChan <- *u
return true
}
// Registered is called when the executor is successfully registered with the slave. // Registered is called when the executor is successfully registered with the slave.
func (k *Executor) Registered( func (k *Executor) Registered(
driver bindings.ExecutorDriver, driver bindings.ExecutorDriver,
@ -245,8 +239,7 @@ func (k *Executor) Registered(
} }
if slaveInfo != nil { if slaveInfo != nil {
_, err := node.CreateOrUpdate( _, err := k.nodeAPI.createOrUpdate(
k.client,
slaveInfo.GetHostname(), slaveInfo.GetHostname(),
node.SlaveAttributesToLabels(slaveInfo.Attributes), node.SlaveAttributesToLabels(slaveInfo.Attributes),
annotations, annotations,
@ -257,10 +250,8 @@ func (k *Executor) Registered(
} }
} }
// emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock() k.lock.Lock()
defer k.lock.Unlock() defer k.lock.Unlock()
k.sendPodsSnapshot()
if slaveInfo != nil && k.nodeInfos != nil { if slaveInfo != nil && k.nodeInfos != nil {
k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics
@ -279,8 +270,7 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
} }
if slaveInfo != nil { if slaveInfo != nil {
_, err := node.CreateOrUpdate( _, err := k.nodeAPI.createOrUpdate(
k.client,
slaveInfo.GetHostname(), slaveInfo.GetHostname(),
node.SlaveAttributesToLabels(slaveInfo.Attributes), node.SlaveAttributesToLabels(slaveInfo.Attributes),
nil, // don't change annotations nil, // don't change annotations
@ -335,8 +325,17 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta
if k.isDone() { if k.isDone() {
return return
} }
log.Infof("Launch task %v\n", taskInfo) log.Infof("Launch task %v\n", taskInfo)
taskID := taskInfo.GetTaskId().GetValue()
if p := k.registry.pod(taskID); p != nil {
log.Warningf("task %v already launched", taskID)
// Not to send back TASK_RUNNING or TASK_FAILED here, because
// may be duplicated messages
return
}
if !k.isConnected() { if !k.isConnected() {
log.Errorf("Ignore launch task because the executor is disconnected\n") log.Errorf("Ignore launch task because the executor is disconnected\n")
k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED, k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED,
@ -359,27 +358,10 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta
return return
} }
taskId := taskInfo.GetTaskId().GetValue()
k.lock.Lock()
defer k.lock.Unlock()
if _, found := k.tasks[taskId]; found {
log.Errorf("task already launched\n")
// Not to send back TASK_RUNNING here, because
// may be duplicated messages or duplicated task id.
return
}
// remember this task so that:
// (a) we ignore future launches for it
// (b) we have a record of it so that we can kill it if needed
// (c) we're leaving podName == "" for now, indicates we don't need to delete containers
k.tasks[taskId] = &kuberTask{
mesosTaskInfo: taskInfo,
launchTimer: time.NewTimer(k.launchGracePeriod),
}
k.resetSuicideWatch(driver) k.resetSuicideWatch(driver)
go k.launchTask(driver, taskId, pod) // run the next step aync because it calls out to apiserver and we don't want to block here
go k.bindAndWatchTask(driver, taskInfo, time.NewTimer(k.launchGracePeriod), pod)
} }
// determine whether we need to start a suicide countdown. if so, then start // determine whether we need to start a suicide countdown. if so, then start
@ -398,7 +380,7 @@ func (k *Executor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan stru
} }
if k.suicideWatch != nil { if k.suicideWatch != nil {
if len(k.tasks) > 0 { if !k.registry.empty() {
k.suicideWatch.Stop() k.suicideWatch.Stop()
return return
} }
@ -430,12 +412,8 @@ func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan s
} }
// fail-safe, will abort kamikaze attempts if there are tasks // fail-safe, will abort kamikaze attempts if there are tasks
if len(k.tasks) > 0 { if !k.registry.empty() {
ids := []string{} log.Errorf("suicide attempt failed, there are still running tasks")
for taskid := range k.tasks {
ids = append(ids, taskid)
}
log.Errorf("suicide attempt failed, there are still running tasks: %v", ids)
return return
} }
@ -447,308 +425,126 @@ func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan s
} }
} }
// async continuation of LaunchTask func podStatusData(pod *api.Pod, status api.PodStatus) ([]byte, string, error) {
func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) {
deleteTask := func() {
k.lock.Lock()
defer k.lock.Unlock()
delete(k.tasks, taskId)
k.resetSuicideWatch(driver)
}
// TODO(k8s): use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
if pod.Spec.NodeName == "" {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
Annotations: make(map[string]string),
},
Target: api.ObjectReference{
Kind: "Node",
Name: pod.Annotations[meta.BindingHostKey],
},
}
// forward the annotations that the scheduler wants to apply
for k, v := range pod.Annotations {
binding.Annotations[k] = v
}
// create binding on apiserver
log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
if err != nil {
deleteTask()
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
messages.CreateBindingFailure))
return
}
} else {
// post annotations update to apiserver
patch := struct {
Metadata struct {
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
}{}
patch.Metadata.Annotations = pod.Annotations
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching annotations %v of pod %v/%v: %v", pod.Annotations, pod.Namespace, pod.Name, string(patchJson))
err := k.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error()
if err != nil {
log.Errorf("Error updating annotations of ready-to-launch pod %v/%v: %v", pod.Namespace, pod.Name, err)
deleteTask()
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
messages.AnnotationUpdateFailure))
return
}
}
podFullName := container.GetPodFullName(pod) podFullName := container.GetPodFullName(pod)
// allow a recently failed-over scheduler the chance to recover the task/pod binding:
// it may have failed and recovered before the apiserver is able to report the updated
// binding information. replays of this status event will signal to the scheduler that
// the apiserver should be up-to-date.
data, err := json.Marshal(api.PodStatusResult{ data, err := json.Marshal(api.PodStatusResult{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: podFullName, Name: podFullName,
SelfLink: "/podstatusresult", SelfLink: "/podstatusresult",
}, },
Status: status,
}) })
if err != nil { return data, podFullName, err
deleteTask()
log.Errorf("failed to marshal pod status result: %v", err)
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
err.Error()))
return
}
k.lock.Lock()
defer k.lock.Unlock()
// find task
task, found := k.tasks[taskId]
if !found {
log.V(1).Infof("task %v not found, probably killed: aborting launch, reporting lost", taskId)
k.reportLostTask(driver, taskId, messages.LaunchTaskFailed)
return
}
//TODO(jdef) check for duplicate pod name, if found send TASK_ERROR
// send the new pod to the kubelet which will spin it up
task.podName = podFullName
k.pods[podFullName] = pod
ok := k.sendPodsSnapshot()
if !ok {
task.podName = ""
delete(k.pods, podFullName)
return // executor is terminating, cancel launch
}
// From here on, we need to delete containers associated with the task upon
// it going into a terminal state.
// report task is starting to scheduler
statusUpdate := &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
State: mesos.TaskState_TASK_STARTING.Enum(),
Message: proto.String(messages.CreateBindingSuccess),
Data: data,
}
k.sendStatus(driver, statusUpdate)
// Delay reporting 'task running' until container is up.
psf := podStatusFunc(func() (*api.PodStatus, error) {
return k.podStatusFunc(pod)
})
go k._launchTask(driver, taskId, podFullName, psf, task.launchTimer.C)
} }
func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc, expired <-chan time.Time) { // async continuation of LaunchTask
getMarshalledInfo := func() (data []byte, cancel bool) { func (k *Executor) bindAndWatchTask(driver bindings.ExecutorDriver, task *mesos.TaskInfo, launchTimer *time.Timer, pod *api.Pod) {
// potentially long call.. success := false
if podStatus, err := psf(); err == nil && podStatus != nil { defer func() {
select { if !success {
case <-expired: k.killPodTask(driver, task.TaskId.GetValue())
cancel = true k.resetSuicideWatch(driver)
default:
k.lock.Lock()
defer k.lock.Unlock()
if _, found := k.tasks[taskId]; !found {
// don't bother with the pod status if the task is already gone
cancel = true
break
} else if podStatus.Phase != api.PodRunning {
// avoid sending back a running status before it's really running
break
}
log.V(2).Infof("Found pod status: '%v'", podStatus)
result := api.PodStatusResult{
ObjectMeta: api.ObjectMeta{
Name: podFullName,
SelfLink: "/podstatusresult",
},
Status: *podStatus,
}
if data, err = json.Marshal(result); err != nil {
log.Errorf("failed to marshal pod status result: %v", err)
}
}
} }
return
}
waitForRunningPod:
for {
select {
case <-expired:
log.Warningf("Launch expired grace period of '%v'", k.launchGracePeriod)
break waitForRunningPod
case <-time.After(containerPollTime):
if data, cancel := getMarshalledInfo(); cancel {
break waitForRunningPod
} else if data == nil {
continue waitForRunningPod
} else {
k.lock.Lock()
defer k.lock.Unlock()
task, found := k.tasks[taskId]
if !found {
goto reportLost
}
statusUpdate := &mesos.TaskStatus{
TaskId: mutil.NewTaskID(taskId),
State: mesos.TaskState_TASK_RUNNING.Enum(),
Message: proto.String(fmt.Sprintf("pod-running:%s", podFullName)),
Data: data,
}
k.sendStatus(driver, statusUpdate)
task.launchTimer.Stop()
// continue to monitor the health of the pod
go k.__launchTask(driver, taskId, podFullName, psf)
return
}
}
}
k.lock.Lock()
defer k.lock.Unlock()
reportLost:
k.reportLostTask(driver, taskId, messages.KubeletPodLaunchFailed)
}
func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) {
// TODO(nnielsen): Monitor health of pod and report if lost.
// Should we also allow this to fail a couple of times before reporting lost?
// What if the docker daemon is restarting and we can't connect, but it's
// going to bring the pods back online as soon as it restarts?
knownPod := func() bool {
_, err := psf()
return err == nil
}
// Wait for the pod to go away and stop monitoring once it does
// TODO (jdefelice) replace with an /events watch?
for {
time.Sleep(lostPodPollTime)
if k.checkForLostPodTask(driver, taskId, knownPod) {
return
}
}
}
// Intended to be executed as part of the pod monitoring loop, this fn (ultimately) checks with Docker
// whether the pod is running. It will only return false if the task is still registered and the pod is
// registered in Docker. Otherwise it returns true. If there's still a task record on file, but no pod
// in Docker, then we'll also send a TASK_LOST event.
func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool {
// TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks)
// isKnownPod() can block so we take special precaution to avoid locking this mutex while calling it
knownTask := func() (ok bool) {
k.lock.Lock()
defer k.lock.Unlock()
_, ok = k.tasks[taskId]
return
}() }()
// TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the // allow a recently failed-over scheduler the chance to recover the task/pod binding:
// kubelet may constantly attempt to instantiate a pod as long as it's in the pod state that we're // it may have failed and recovered before the apiserver is able to report the updated
// handing to it. otherwise, we're probably reporting a TASK_LOST prematurely. Should probably // binding information. replays of this status event will signal to the scheduler that
// consult RestartPolicy to determine appropriate behavior. Should probably also gracefully handle // the apiserver should be up-to-date.
// docker daemon restarts. startingData, _, err := podStatusData(pod, api.PodStatus{})
if knownTask { if err != nil {
if isKnownPod() { log.Errorf("failed to generate pod-task starting data for task %v pod %v/%v: %v",
return false task.TaskId.GetValue(), pod.Namespace, pod.Name, err)
} else { k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error()))
log.Warningf("Detected lost pod, reporting lost task %v", taskId) return
k.lock.Lock()
defer k.lock.Unlock()
k.reportLostTask(driver, taskId, messages.ContainersDisappeared)
}
} else {
log.V(2).Infof("Task %v no longer registered, stop monitoring for lost pods", taskId)
} }
return true
err = k.registry.bind(task.TaskId.GetValue(), pod)
if err != nil {
log.Errorf("failed to bind task %v pod %v/%v: %v",
task.TaskId.GetValue(), pod.Namespace, pod.Name, err)
k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error()))
return
}
// send TASK_STARTING
k.sendStatus(driver, &mesos.TaskStatus{
TaskId: task.TaskId,
State: mesos.TaskState_TASK_STARTING.Enum(),
Message: proto.String(messages.CreateBindingSuccess),
Data: startingData,
})
// within the launch timeout window we should see a pod-task update via the registry.
// if we see a Running update then we need to generate a TASK_RUNNING status update for mesos.
handlerFinished := false
handler := watchHandler{
expiration: watchExpiration{
timeout: launchTimer.C,
onEvent: func(taskID string) {
if !handlerFinished {
// launch timeout expired
k.killPodTask(driver, task.TaskId.GetValue())
}
},
},
onEvent: func(podEvent *PodEvent) (bool, error) {
switch podEvent.eventType {
case PodEventUpdated:
log.V(2).Infof("Found status: '%v' for %s", podEvent.pod.Status, podEvent.FormatShort())
if podEvent.pod.Status.Phase != api.PodRunning {
// still waiting for pod to transition to a running state, so
// we're not done monitoring yet; check back later..
break
}
data, podFullName, err := podStatusData(podEvent.pod, podEvent.pod.Status)
if err != nil {
return false, fmt.Errorf("failed to marshal pod status result: %v", err)
}
defer k.sendStatus(driver, &mesos.TaskStatus{
TaskId: task.TaskId,
State: mesos.TaskState_TASK_RUNNING.Enum(),
Message: proto.String("pod-running:" + podFullName),
Data: data,
})
fallthrough
case PodEventDeleted:
// we're done monitoring because pod has been deleted
handlerFinished = true
launchTimer.Stop()
}
return handlerFinished, nil
},
}
k.watcher.forTask(task.TaskId.GetValue(), handler)
success = true
} }
// KillTask is called when the executor receives a request to kill a task. // KillTask is called when the executor receives a request to kill a task.
func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) { func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) {
if k.isDone() { k.killPodTask(driver, taskId.GetValue())
return
}
log.Infof("Kill task %v\n", taskId)
if !k.isConnected() {
//TODO(jdefelice) sent TASK_LOST here?
log.Warningf("Ignore kill task because the executor is disconnected\n")
return
}
k.lock.Lock()
defer k.lock.Unlock()
k.removePodTask(driver, taskId.GetValue(), messages.TaskKilled, mesos.TaskState_TASK_KILLED)
} }
// Reports a lost task to the slave and updates internal task and pod tracking state. // deletes the pod and task associated with the task identified by taskID and sends a task
// Assumes that the caller is locking around pod and task state.
func (k *Executor) reportLostTask(driver bindings.ExecutorDriver, tid, reason string) {
k.removePodTask(driver, tid, reason, mesos.TaskState_TASK_LOST)
}
// deletes the pod and task associated with the task identified by tid and sends a task
// status update to mesos. also attempts to reset the suicide watch. // status update to mesos. also attempts to reset the suicide watch.
// Assumes that the caller is locking around pod and task state. func (k *Executor) killPodTask(driver bindings.ExecutorDriver, taskID string) {
func (k *Executor) removePodTask(driver bindings.ExecutorDriver, tid, reason string, state mesos.TaskState) { pod := k.registry.pod(taskID)
task, ok := k.tasks[tid] if pod == nil {
if !ok { log.V(1).Infof("Failed to remove task, unknown task %v\n", taskID)
log.V(1).Infof("Failed to remove task, unknown task %v\n", tid) k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskID}, mesos.TaskState_TASK_LOST, "kill-pod-task"))
return return
} }
delete(k.tasks, tid)
k.resetSuicideWatch(driver)
pid := task.podName // force-delete the pod from the API server
_, found := k.pods[pid] // TODO(jdef) possibly re-use eviction code from stock k8s once it lands?
if !found { err := k.kubeAPI.killPod(pod.Namespace, pod.Name)
log.Warningf("Cannot remove unknown pod %v for task %v", pid, tid) if err != nil {
} else { log.V(1).Infof("failed to delete task %v pod %v/%v from apiserver: %+v", taskID, pod.Namespace, pod.Name, err)
log.V(2).Infof("deleting pod %v for task %v", pid, tid)
delete(k.pods, pid)
// tell the kubelet to remove the pod
k.sendPodsSnapshot()
} }
// TODO(jdef): ensure that the update propagates, perhaps return a signal chan?
k.sendStatus(driver, newStatus(mutil.NewTaskID(tid), state, reason))
} }
// FrameworkMessage is called when the framework sends some message to the executor // FrameworkMessage is called when the framework sends some message to the executor
@ -766,11 +562,15 @@ func (k *Executor) FrameworkMessage(driver bindings.ExecutorDriver, message stri
if strings.HasPrefix(message, messages.TaskLost+":") { if strings.HasPrefix(message, messages.TaskLost+":") {
taskId := message[len(messages.TaskLost)+1:] taskId := message[len(messages.TaskLost)+1:]
if taskId != "" { if taskId != "" {
// TODO(jdef) would it make more sense to check the status of the task and
// just replay the last non-terminal message that we sent if the task is
// still active?
// clean up pod state // clean up pod state
k.lock.Lock() k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskId}, mesos.TaskState_TASK_LOST, messages.TaskLostAck))
defer k.lock.Unlock() k.killPodTask(driver, taskId)
k.reportLostTask(driver, taskId, messages.TaskLostAck)
} }
return
} }
switch message { switch message {
@ -799,7 +599,6 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
// signal to all listeners that this KubeletExecutor is done! // signal to all listeners that this KubeletExecutor is done!
close(k.terminate) close(k.terminate)
close(k.updateChan)
close(k.nodeInfos) close(k.nodeInfos)
if k.shutdownAlert != nil { if k.shutdownAlert != nil {
@ -819,7 +618,7 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
// according to docs, mesos will generate TASK_LOST updates for us // according to docs, mesos will generate TASK_LOST updates for us
// if needed, so don't take extra time to do that here. // if needed, so don't take extra time to do that here.
k.tasks = map[string]*kuberTask{} k.registry.shutdown()
select { select {
// the main Run() func may still be running... wait for it to finish: it will // the main Run() func may still be running... wait for it to finish: it will
@ -940,36 +739,3 @@ func annotationsFor(ei *mesos.ExecutorInfo) (annotations map[string]string, err
return return
} }
// updateTask executes some mutating operation for the given task/pod, blocking until the update is either
// attempted or discarded. uses the executor state lock to synchronize concurrent invocation. returns true
// only if the specified update operation was attempted and also returns true. a result of true also indicates
// changes have been sent upstream to the kubelet.
func (k *Executor) updateTask(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) {
k.lock.Lock()
defer k.lock.Unlock()
// exclude tasks which are already deleted from our task registry
task := k.tasks[taskId]
if task == nil {
// the pod has completed the launch-task-binding phase because it's been annotated with
// the task-id, but we don't have a record of it; it's best to let the scheduler reconcile.
// it's also possible that our update queue is backed up and hasn't caught up with the state
// of the world yet.
// TODO(jdef) should we hint to the scheduler (via TASK_FAILED, reason=PodRefersToUnknownTask)?
err = fmt.Errorf("task %s not found", taskId)
return
}
oldPod := k.pods[task.podName]
changed = f(task, oldPod)
// TODO(jdef) this abstraction isn't perfect since changes that only impact the task struct,
// and not the pod, don't require a new pod snapshot sent back to the kubelet.
if changed {
k.sendPodsSnapshot()
}
return
}

View File

@ -19,12 +19,10 @@ package executor
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -39,10 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -56,27 +51,11 @@ import (
// after Register is called. // after Register is called.
func TestExecutorRegister(t *testing.T) { func TestExecutorRegister(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor, updates := NewTestKubernetesExecutor() executor := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
initialPodUpdate := kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
}
receivedInitialPodUpdate := false
select {
case update := <-updates:
if reflect.DeepEqual(initialPodUpdate, update) {
receivedInitialPodUpdate = true
}
case <-time.After(util.ForeverTestTimeout):
}
assert.Equal(t, true, receivedInitialPodUpdate,
"executor should have sent an initial PodUpdate "+
"to the updates chan upon registration")
assert.Equal(t, true, executor.isConnected(), "executor should be connected") assert.Equal(t, true, executor.isConnected(), "executor should be connected")
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
} }
@ -85,7 +64,7 @@ func TestExecutorRegister(t *testing.T) {
// connected after a call to Disconnected has occurred. // connected after a call to Disconnected has occurred.
func TestExecutorDisconnect(t *testing.T) { func TestExecutorDisconnect(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor, _ := NewTestKubernetesExecutor() executor := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -100,7 +79,7 @@ func TestExecutorDisconnect(t *testing.T) {
// after a connection problem happens, followed by a call to Reregistered. // after a connection problem happens, followed by a call to Reregistered.
func TestExecutorReregister(t *testing.T) { func TestExecutorReregister(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor, _ := NewTestKubernetesExecutor() executor := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -111,65 +90,109 @@ func TestExecutorReregister(t *testing.T) {
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
} }
type fakeKubelet struct { type fakeRegistry struct {
*kubelet.Kubelet sync.Mutex
hostIP net.IP boundTasks map[string]*api.Pod
updates chan *PodEvent
} }
func (kl *fakeKubelet) GetHostIP() (net.IP, error) { func newFakeRegistry() *fakeRegistry {
return kl.hostIP, nil return &fakeRegistry{boundTasks: map[string]*api.Pod{}, updates: make(chan *PodEvent, 100)}
} }
// TestExecutorLaunchAndKillTask ensures that the executor is able to launch func (r *fakeRegistry) empty() bool {
// and kill tasks while properly bookkeping its tasks. r.Lock()
func TestExecutorLaunchAndKillTask(t *testing.T) { defer r.Unlock()
// create a fake pod watch. We use that below to submit new pods to the scheduler return len(r.boundTasks) == 0
podListWatch := NewMockPodsListWatch(api.PodList{}) }
// create fake apiserver func (r *fakeRegistry) pod(taskID string) *api.Pod {
testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list) r.Lock()
// TODO: Uncomment when fix #19254 defer r.Unlock()
// defer testApiServer.server.Close() return r.boundTasks[taskID]
}
mockDriver := &MockExecutorDriver{} func (r *fakeRegistry) watch() <-chan *PodEvent { return r.updates }
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{ func (r *fakeRegistry) shutdown() {
Docker: dockertools.ConnectToDockerOrDie("fake://"), r.Lock()
Updates: updates, defer r.Unlock()
NodeInfos: make(chan NodeInfo, 1), r.boundTasks = map[string]*api.Pod{}
APIClient: client.NewOrDie(&client.Config{ }
Host: testApiServer.server.URL,
GroupVersion: testapi.Default.GroupVersion(), func (r *fakeRegistry) bind(taskID string, pod *api.Pod) error {
}), r.Lock()
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { defer r.Unlock()
return &api.PodStatus{ pod.Annotations = map[string]string{
ContainerStatuses: []api.ContainerStatus{ "k8s.mesosphere.io/taskId": taskID,
{
Name: "foo",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) r.boundTasks[taskID] = pod
// the normal registry sends a bind..
r.updates <- &PodEvent{pod: pod, taskID: taskID, eventType: PodEventBound}
return nil
}
func (r *fakeRegistry) Update(pod *api.Pod) (*PodEvent, error) {
r.Lock()
defer r.Unlock()
taskID, err := taskIDFor(pod)
if err != nil {
return nil, err
}
if _, ok := r.boundTasks[taskID]; !ok {
return nil, errUnknownTask
}
rp := &PodEvent{pod: pod, taskID: taskID, eventType: PodEventUpdated}
r.updates <- rp
return rp, nil
}
func (r *fakeRegistry) Remove(taskID string) error {
r.Lock()
defer r.Unlock()
pod, ok := r.boundTasks[taskID]
if !ok {
return errUnknownTask
}
delete(r.boundTasks, taskID)
r.updates <- &PodEvent{pod: pod, taskID: taskID, eventType: PodEventDeleted}
return nil
}
// phaseChange simulates a pod source update; normally this update is generated from a watch
func (r *fakeRegistry) phaseChange(pod *api.Pod, phase api.PodPhase) error {
clone, err := api.Scheme.DeepCopy(pod)
if err != nil {
return err
}
phasedPod := clone.(*api.Pod)
phasedPod.Status.Phase = phase
_, err = r.Update(phasedPod)
return err
}
// TestExecutorLaunchAndKillTask ensures that the executor is able to launch tasks and generates
// appropriate status messages for mesos. It then kills the task and validates that appropriate
// actions are taken by the executor.
func TestExecutorLaunchAndKillTask(t *testing.T) {
var (
mockDriver = &MockExecutorDriver{}
registry = newFakeRegistry()
executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
NodeInfos: make(chan NodeInfo, 1),
Registry: registry,
})
mockKubeAPI = &mockKubeAPI{}
pod = NewTestPod(1)
executorinfo = &mesosproto.ExecutorInfo{}
)
executor.kubeAPI = mockKubeAPI
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
select {
case <-updates:
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("Executor should send an initial update on Registration")
}
pod := NewTestPod(1)
executorinfo := &mesosproto.ExecutorInfo{}
podTask, err := podtask.New( podTask, err := podtask.New(
api.NewDefaultContext(), api.NewDefaultContext(),
"", "",
@ -178,22 +201,24 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
nil, nil,
nil, nil,
) )
assert.Equal(t, nil, err, "must be able to create a task from a pod") assert.Equal(t, nil, err, "must be able to create a task from a pod")
podTask.Spec = &podtask.Spec{ pod.Annotations = map[string]string{
Executor: executorinfo, "k8s.mesosphere.io/taskId": podTask.ID,
} }
podTask.Spec = &podtask.Spec{Executor: executorinfo}
taskInfo, err := podTask.BuildTaskInfo() taskInfo, err := podTask.BuildTaskInfo()
assert.Equal(t, nil, err, "must be able to build task info") assert.Equal(t, nil, err, "must be able to build task info")
data, err := testapi.Default.Codec().Encode(pod) data, err := testapi.Default.Codec().Encode(pod)
assert.Equal(t, nil, err, "must be able to encode a pod's spec data") assert.Equal(t, nil, err, "must be able to encode a pod's spec data")
taskInfo.Data = data taskInfo.Data = data
var statusUpdateCalls sync.WaitGroup var statusUpdateCalls sync.WaitGroup
statusUpdateCalls.Add(1)
statusUpdateDone := func(_ mock.Arguments) { statusUpdateCalls.Done() } statusUpdateDone := func(_ mock.Arguments) { statusUpdateCalls.Done() }
statusUpdateCalls.Add(1)
mockDriver.On( mockDriver.On(
"SendStatusUpdate", "SendStatusUpdate",
mesosproto.TaskState_TASK_STARTING, mesosproto.TaskState_TASK_STARTING,
@ -210,20 +235,16 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock() executor.lock.Lock()
defer executor.lock.Unlock() defer executor.lock.Unlock()
return len(executor.tasks) == 1 && len(executor.pods) == 1 return !registry.empty()
}, "executor must be able to create a task and a pod") }, "executor must be able to create a task and a pod")
gotPodUpdate := false // simulate a pod source update; normally this update is generated when binding a pod
select { err = registry.phaseChange(pod, api.PodPending)
case update := <-updates: assert.NoError(t, err)
if len(update.Pods) == 1 {
gotPodUpdate = true // simulate a pod source update; normally this update is generated by the kubelet once the pod is healthy
} err = registry.phaseChange(pod, api.PodRunning)
case <-time.After(util.ForeverTestTimeout): assert.NoError(t, err)
}
assert.Equal(t, true, gotPodUpdate,
"the executor should send an update about a new pod to "+
"the updates chan when creating a new one.")
// Allow some time for asynchronous requests to the driver. // Allow some time for asynchronous requests to the driver.
finished := kmruntime.After(statusUpdateCalls.Wait) finished := kmruntime.After(statusUpdateCalls.Wait)
@ -239,12 +260,16 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mesosproto.TaskState_TASK_KILLED, mesosproto.TaskState_TASK_KILLED,
).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(statusUpdateDone).Once() ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(statusUpdateDone).Once()
executor.KillTask(mockDriver, taskInfo.TaskId) // simulate what happens when the apiserver is told to delete a pod
mockKubeAPI.On("killPod", pod.Namespace, pod.Name).Return(nil).Run(func(_ mock.Arguments) {
registry.Remove(podTask.ID)
})
executor.KillTask(mockDriver, taskInfo.TaskId)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock() executor.lock.Lock()
defer executor.lock.Unlock() defer executor.lock.Unlock()
return len(executor.tasks) == 0 && len(executor.pods) == 0 return registry.empty()
}, "executor must be able to kill a created task and pod") }, "executor must be able to kill a created task and pod")
// Allow some time for asynchronous requests to the driver. // Allow some time for asynchronous requests to the driver.
@ -254,7 +279,9 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
case <-time.After(util.ForeverTestTimeout): case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish") t.Fatalf("timed out waiting for status update calls to finish")
} }
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
mockKubeAPI.AssertExpectations(t)
} }
// TestExecutorStaticPods test that the ExecutorInfo.data is parsed // TestExecutorStaticPods test that the ExecutorInfo.data is parsed
@ -340,52 +367,30 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) {
// its state. When a Kamikaze message is received, the executor should // its state. When a Kamikaze message is received, the executor should
// attempt suicide. // attempt suicide.
func TestExecutorFrameworkMessage(t *testing.T) { func TestExecutorFrameworkMessage(t *testing.T) {
// create fake apiserver
podListWatch := NewMockPodsListWatch(api.PodList{})
testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list)
// TODO: Uncomment when fix #19254
// defer testApiServer.server.Close()
// create and start executor // create and start executor
mockDriver := &MockExecutorDriver{} var (
kubeletFinished := make(chan struct{}) mockDriver = &MockExecutorDriver{}
config := Config{ kubeletFinished = make(chan struct{})
Docker: dockertools.ConnectToDockerOrDie("fake://"), registry = newFakeRegistry()
Updates: make(chan kubetypes.PodUpdate, 1024), executor = New(Config{
NodeInfos: make(chan NodeInfo, 1), Docker: dockertools.ConnectToDockerOrDie("fake://"),
APIClient: client.NewOrDie(&client.Config{ NodeInfos: make(chan NodeInfo, 1),
Host: testApiServer.server.URL, ShutdownAlert: func() {
GroupVersion: testapi.Default.GroupVersion(), close(kubeletFinished)
}), },
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { KubeletFinished: kubeletFinished,
return &api.PodStatus{ Registry: registry,
ContainerStatuses: []api.ContainerStatus{ })
{ pod = NewTestPod(1)
Name: "foo", mockKubeAPI = &mockKubeAPI{}
State: api.ContainerState{ )
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
HostIP: "127.0.0.1",
}, nil
},
ShutdownAlert: func() {
close(kubeletFinished)
},
KubeletFinished: kubeletFinished,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
}
executor := New(config)
executor.kubeAPI = mockKubeAPI
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
executor.FrameworkMessage(mockDriver, "test framework message") executor.FrameworkMessage(mockDriver, "test framework message")
// set up a pod to then lose // set up a pod to then lose
pod := NewTestPod(1)
executorinfo := &mesosproto.ExecutorInfo{} executorinfo := &mesosproto.ExecutorInfo{}
podTask, _ := podtask.New( podTask, _ := podtask.New(
api.NewDefaultContext(), api.NewDefaultContext(),
@ -395,10 +400,13 @@ func TestExecutorFrameworkMessage(t *testing.T) {
nil, nil,
nil, nil,
) )
pod.Annotations = map[string]string{
"k8s.mesosphere.io/taskId": podTask.ID,
}
podTask.Spec = &podtask.Spec{ podTask.Spec = &podtask.Spec{
Executor: executorinfo, Executor: executorinfo,
} }
taskInfo, err := podTask.BuildTaskInfo() taskInfo, err := podTask.BuildTaskInfo()
assert.Equal(t, nil, err, "must be able to build task info") assert.Equal(t, nil, err, "must be able to build task info")
@ -418,9 +426,21 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executor.LaunchTask(mockDriver, taskInfo) executor.LaunchTask(mockDriver, taskInfo)
// must wait for this otherwise phase changes may not apply
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return !registry.empty()
}, "executor must be able to create a task and a pod")
err = registry.phaseChange(pod, api.PodPending)
assert.NoError(t, err)
err = registry.phaseChange(pod, api.PodRunning)
assert.NoError(t, err)
// waiting until the pod is really running b/c otherwise a TASK_FAILED could be // waiting until the pod is really running b/c otherwise a TASK_FAILED could be
// triggered by the asynchronously running _launchTask, __launchTask methods // triggered by the asynchronously running executor methods when removing the task
// when removing the task from k.tasks through the "task-lost:foo" message below. // from k.tasks through the "task-lost:foo" message below.
select { select {
case <-called: case <-called:
case <-time.After(util.ForeverTestTimeout): case <-time.After(util.ForeverTestTimeout):
@ -434,11 +454,17 @@ func TestExecutorFrameworkMessage(t *testing.T) {
mesosproto.TaskState_TASK_LOST, mesosproto.TaskState_TASK_LOST,
).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once() ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once()
// simulate what happens when the apiserver is told to delete a pod
mockKubeAPI.On("killPod", pod.Namespace, pod.Name).Return(nil).Run(func(_ mock.Arguments) {
registry.Remove(podTask.ID)
})
executor.FrameworkMessage(mockDriver, "task-lost:foo") executor.FrameworkMessage(mockDriver, "task-lost:foo")
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
executor.lock.Lock() executor.lock.Lock()
defer executor.lock.Unlock() defer executor.lock.Unlock()
return len(executor.tasks) == 0 && len(executor.pods) == 0 return registry.empty()
}, "executor must be able to kill a created task and pod") }, "executor must be able to kill a created task and pod")
select { select {
@ -454,6 +480,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
"executor should have shut down after receiving a Kamikaze message") "executor should have shut down after receiving a Kamikaze message")
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
mockKubeAPI.AssertExpectations(t)
} }
// Create a pod with a given index, requiring one port // Create a pod with a given index, requiring one port
@ -504,7 +531,7 @@ type TestServer struct {
lock sync.Mutex lock sync.Mutex
} }
func NewTestServer(t *testing.T, namespace string, pods *api.PodList) *TestServer { func NewTestServer(t *testing.T, namespace string) *TestServer {
ts := TestServer{ ts := TestServer{
Stats: map[string]uint{}, Stats: map[string]uint{},
} }
@ -537,62 +564,41 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch {
// TestExecutorShutdown ensures that the executor properly shuts down // TestExecutorShutdown ensures that the executor properly shuts down
// when Shutdown is called. // when Shutdown is called.
func TestExecutorShutdown(t *testing.T) { func TestExecutorShutdown(t *testing.T) {
mockDriver := &MockExecutorDriver{} var (
kubeletFinished := make(chan struct{}) mockDriver = &MockExecutorDriver{}
var exitCalled int32 = 0 kubeletFinished = make(chan struct{})
updates := make(chan kubetypes.PodUpdate, 1024) exitCalled = int32(0)
config := Config{ executor = New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates, NodeInfos: make(chan NodeInfo, 1),
NodeInfos: make(chan NodeInfo, 1), ShutdownAlert: func() {
ShutdownAlert: func() { close(kubeletFinished)
close(kubeletFinished) },
}, KubeletFinished: kubeletFinished,
KubeletFinished: kubeletFinished, ExitFunc: func(_ int) {
ExitFunc: func(_ int) { atomic.AddInt32(&exitCalled, 1)
atomic.AddInt32(&exitCalled, 1) },
}, Registry: newFakeRegistry(),
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, })
} )
executor := New(config)
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
mockDriver.On("Stop").Return(mesosproto.Status_DRIVER_STOPPED, nil).Once() mockDriver.On("Stop").Return(mesosproto.Status_DRIVER_STOPPED, nil).Once()
executor.Shutdown(mockDriver) executor.Shutdown(mockDriver)
assert.Equal(t, false, executor.isConnected(), assert.Equal(t, false, executor.isConnected(),
"executor should not be connected after Shutdown") "executor should not be connected after Shutdown")
assert.Equal(t, true, executor.isDone(), assert.Equal(t, true, executor.isDone(),
"executor should be in Done state after Shutdown") "executor should be in Done state after Shutdown")
// channel should be closed now, only a constant number of updates left
num := len(updates)
drainLoop:
for {
select {
case _, ok := <-updates:
if !ok {
break drainLoop
}
num -= 1
default:
t.Fatal("Updates chan should be closed after Shutdown")
}
}
assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown")
assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0,
"the executor should call its ExitFunc when it is ready to close down") "the executor should call its ExitFunc when it is ready to close down")
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
} }
func TestExecutorsendFrameworkMessage(t *testing.T) { func TestExecutorsendFrameworkMessage(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor, _ := NewTestKubernetesExecutor() executor := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -612,77 +618,3 @@ func TestExecutorsendFrameworkMessage(t *testing.T) {
} }
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
} }
func TestExecutor_updateMetaMap(t *testing.T) {
for i, tc := range []struct {
oldmap map[string]string
newmap map[string]string
wants bool
}{
{
oldmap: nil,
newmap: nil,
wants: false,
},
{
oldmap: nil,
newmap: map[string]string{},
wants: false,
},
{
oldmap: map[string]string{},
newmap: nil,
wants: false,
},
{
oldmap: nil,
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{},
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
},
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
},
newmap: nil,
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
"qwe": "iop",
},
newmap: map[string]string{
"foo": "bar",
"qwe": "iop",
},
wants: true,
},
} {
// do work here
actual := updateMetaMap(&tc.oldmap, tc.newmap)
if actual != tc.wants {
t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.wants, actual)
}
if len(tc.oldmap) != len(tc.newmap) || (len(tc.oldmap) > 0 && !reflect.DeepEqual(tc.oldmap, tc.newmap)) {
t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.newmap, tc.oldmap)
}
}
}

View File

@ -22,11 +22,18 @@ import (
"github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
) )
type mockKubeAPI struct {
mock.Mock
}
func (m *mockKubeAPI) killPod(ns, name string) error {
args := m.Called(ns, name)
return args.Error(0)
}
type MockExecutorDriver struct { type MockExecutorDriver struct {
mock.Mock mock.Mock
} }
@ -66,18 +73,16 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
return args.Get(0).(mesosproto.Status), args.Error(1) return args.Get(0).(mesosproto.Status), args.Error(1)
} }
func NewTestKubernetesExecutor() (*Executor, chan kubetypes.PodUpdate) { func NewTestKubernetesExecutor() *Executor {
updates := make(chan kubetypes.PodUpdate, 1024)
return New(Config{ return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates, Registry: newFakeRegistry(),
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, })
}), updates
} }
func TestExecutorNew(t *testing.T) { func TestExecutorNew(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor, _ := NewTestKubernetesExecutor() executor := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization") assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization")

View File

@ -1,171 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
)
// taskUpdateTx execute a task update transaction f for the task identified by
// taskId. if no such task exists then f is not invoked and an error is
// returned. if f is invoked then taskUpdateTx returns the bool result of f.
type taskUpdateTx func(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error)
// podObserver receives callbacks for every pod state change on the apiserver and
// for each decides whether to execute a task update transaction.
type podObserver struct {
podController *framework.Controller
terminate <-chan struct{}
taskUpdateTx taskUpdateTx
}
func newPodObserver(podLW cache.ListerWatcher, taskUpdateTx taskUpdateTx, terminate <-chan struct{}) *podObserver {
// watch pods from the given pod ListWatch
if podLW == nil {
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
p := &podObserver{
terminate: terminate,
taskUpdateTx: taskUpdateTx,
}
_, p.podController = framework.NewInformer(podLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*api.Pod)
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
},
})
return p
}
// run begins observing pod state changes; blocks until the terminate chan closes.
func (p *podObserver) run() {
p.podController.Run(p.terminate)
}
// handleChangedApiserverPod is invoked for pod add/update state changes and decides whether
// task updates are necessary. if so, a task update is executed via taskUpdateTx.
func (p *podObserver) handleChangedApiserverPod(pod *api.Pod) {
// Don't do anything for pods without task anotation which means:
// - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already.
// - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change.
// - all other pods that haven't passed through the launch-task-binding phase, which would set annotations.
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
// There also could be a race between the overall launch-task process and this update, but here we
// will never be able to process such a stale update because the "update pod" that we're receiving
// in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale
// update on the floor because we'll get another update later that, in addition to the changes that
// we're dropping now, will also include the changes from the binding process.
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return
}
_, err := p.taskUpdateTx(taskId, func(_ *kuberTask, relatedPod *api.Pod) (sendSnapshot bool) {
if relatedPod == nil {
// should never happen because:
// (a) the update pod record has already passed through the binding phase in launchTasks()
// (b) all remaining updates to executor.{pods,tasks} are sync'd in unison
log.Errorf("internal state error: pod not found for task %s", taskId)
return
}
// TODO(sttts): check whether we can and should send all "semantic" changes down to the kubelet
// see kubelet/config/config.go for semantic change detection
// check for updated labels/annotations: need to forward these for the downward API
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Labels, pod.Labels)
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Annotations, pod.Annotations)
// terminating pod?
if pod.Status.Phase == api.PodRunning {
timeModified := differentTime(relatedPod.DeletionTimestamp, pod.DeletionTimestamp)
graceModified := differentPeriod(relatedPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds)
if timeModified || graceModified {
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet",
pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
// modify the pod in our registry instead of sending the new pod. The later
// would allow that other changes bleed into the kubelet. For now we are
// very conservative changing this behaviour.
relatedPod.DeletionTimestamp = pod.DeletionTimestamp
relatedPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
sendSnapshot = true
}
}
return
})
if err != nil {
log.Errorf("failed to update pod %s/%s: %+v", pod.Namespace, pod.Name, err)
}
}
// updateMetaMap looks for differences between src and dest; if there are differences
// then dest is changed (possibly to point to src) and this func returns true.
func updateMetaMap(dest *map[string]string, src map[string]string) (changed bool) {
// check for things in dest that are missing in src
for k := range *dest {
if _, ok := src[k]; !ok {
changed = true
break
}
}
if !changed {
if len(*dest) == 0 {
if len(src) > 0 {
changed = true
goto finished
}
// no update needed
return
}
// check for things in src that are missing/different in dest
for k, v := range src {
if vv, ok := (*dest)[k]; !ok || vv != v {
changed = true
break
}
}
}
finished:
*dest = src
return
}
func differentTime(a, b *unversioned.Time) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func differentPeriod(a, b *int64) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}

View File

@ -0,0 +1,340 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"encoding/json"
"errors"
"sync"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
log "github.com/golang/glog"
)
type (
podEventType int
PodEvent struct {
pod *api.Pod
taskID string
eventType podEventType
}
// Registry is a state store for pod task metadata. Clients are expected to watch() the
// event stream to observe changes over time.
Registry interface {
// Update modifies the registry's iternal representation of the pod; it may also
// modify the pod argument itself. An update may fail because either a pod isn't
// labeled with a task ID, the task ID is unknown, or the nature of the update may
// be incompatible with what's supported in kubernetes-mesos.
Update(pod *api.Pod) (*PodEvent, error)
// Remove the task from this registry, returns an error if the taskID is unknown.
Remove(taskID string) error
// bind associates a taskID with a pod, triggers the binding API on the k8s apiserver
// and stores the resulting pod-task metadata.
bind(taskID string, pod *api.Pod) error
// watch returns the event stream of the registry. clients are expected to read this
// stream otherwise the event buffer will fill up and registry ops will block.
watch() <-chan *PodEvent
// return true if there are no tasks registered
empty() bool
// return the api.Pod registered to the given taskID or else nil
pod(taskID string) *api.Pod
// shutdown any related async processing and clear the internal state of the registry
shutdown()
}
registryImpl struct {
client *client.Client
updates chan *PodEvent
m sync.RWMutex
boundTasks map[string]*api.Pod
}
)
var (
errCreateBindingFailed = errors.New(messages.CreateBindingFailure)
errAnnotationUpdateFailure = errors.New(messages.AnnotationUpdateFailure)
errUnknownTask = errors.New("unknown task ID")
errUnsupportedUpdate = errors.New("pod update allowed by k8s is incompatible with this version of k8s-mesos")
)
const (
PodEventBound podEventType = iota
PodEventUpdated
PodEventDeleted
PodEventIncompatibleUpdate
updatesBacklogSize = 200
)
func IsUnsupportedUpdate(err error) bool {
return err == errUnsupportedUpdate
}
func (rp *PodEvent) Task() string {
return rp.taskID
}
func (rp *PodEvent) Pod() *api.Pod {
return rp.pod
}
func (rp *PodEvent) FormatShort() string {
return "task '" + rp.taskID + "' pod '" + rp.pod.Namespace + "/" + rp.pod.Name + "'"
}
func NewRegistry(client *client.Client) Registry {
r := &registryImpl{
client: client,
updates: make(chan *PodEvent, updatesBacklogSize),
boundTasks: make(map[string]*api.Pod),
}
return r
}
func (r registryImpl) watch() <-chan *PodEvent {
return r.updates
}
func taskIDFor(pod *api.Pod) (taskID string, err error) {
taskID = pod.Annotations[meta.TaskIdKey]
if taskID == "" {
err = errUnknownTask
}
return
}
func (r registryImpl) shutdown() {
//TODO(jdef) flesh this out
r.m.Lock()
defer r.m.Unlock()
r.boundTasks = map[string]*api.Pod{}
}
func (r registryImpl) empty() bool {
r.m.RLock()
defer r.m.RUnlock()
return len(r.boundTasks) == 0
}
func (r registryImpl) pod(taskID string) *api.Pod {
r.m.RLock()
defer r.m.RUnlock()
return r.boundTasks[taskID]
}
func (r registryImpl) Remove(taskID string) error {
r.m.Lock()
defer r.m.Unlock()
pod, ok := r.boundTasks[taskID]
if !ok {
return errUnknownTask
}
delete(r.boundTasks, taskID)
r.updates <- &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventDeleted,
}
log.V(1).Infof("unbound task %v from pod %v/%v", taskID, pod.Namespace, pod.Name)
return nil
}
func (r registryImpl) Update(pod *api.Pod) (*PodEvent, error) {
// Don't do anything for pods without task anotation which means:
// - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already.
// - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change.
// - all other pods that haven't passed through the launch-task-binding phase, which would set annotations.
taskID, err := taskIDFor(pod)
if err != nil {
// There also could be a race between the overall launch-task process and this update, but here we
// will never be able to process such a stale update because the "update pod" that we're receiving
// in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale
// update on the floor because we'll get another update later that, in addition to the changes that
// we're dropping now, will also include the changes from the binding process.
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return nil, err
}
// be a good citizen: copy the arg before making any changes to it
clone, err := api.Scheme.DeepCopy(pod)
if err != nil {
return nil, err
}
pod = clone.(*api.Pod)
r.m.Lock()
defer r.m.Unlock()
oldPod, ok := r.boundTasks[taskID]
if !ok {
return nil, errUnknownTask
}
registeredPod := &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventUpdated,
}
// TODO(jdef) would be nice to only execute this logic based on the presence of
// some particular annotation:
// - preserve the original container port spec since the k8sm scheduler
// has likely changed it.
if !copyPorts(pod, oldPod) {
// TODO(jdef) the state of "pod" is possibly inconsistent at this point.
// we don't care for the moment - we might later.
registeredPod.eventType = PodEventIncompatibleUpdate
r.updates <- registeredPod
log.Warningf("pod containers changed in an incompatible way; aborting update")
return registeredPod, errUnsupportedUpdate
}
// update our internal copy and broadcast the change
r.boundTasks[taskID] = pod
r.updates <- registeredPod
log.V(1).Infof("updated task %v pod %v/%v", taskID, pod.Namespace, pod.Name)
return registeredPod, nil
}
// copyPorts copies the container pod specs from src to dest and returns
// true if all ports (in both dest and src) are accounted for, otherwise
// false. if returning false then it's possible that only a partial copy
// has been performed.
func copyPorts(dest, src *api.Pod) bool {
containers := src.Spec.Containers
ctPorts := make(map[string][]api.ContainerPort, len(containers))
for i := range containers {
ctPorts[containers[i].Name] = containers[i].Ports
}
containers = dest.Spec.Containers
for i := range containers {
name := containers[i].Name
if ports, found := ctPorts[name]; found {
containers[i].Ports = ports
delete(ctPorts, name)
} else {
// old pod spec is missing this container?!
return false
}
}
if len(ctPorts) > 0 {
// new pod spec has containers that aren't in the old pod spec
return false
}
return true
}
func (r registryImpl) bind(taskID string, pod *api.Pod) error {
// validate taskID matches that of the annotation
annotatedTaskID, err := taskIDFor(pod)
if err != nil {
log.Warning("failed to bind: missing task ID annotation for pod ", pod.Namespace+"/"+pod.Name)
return errCreateBindingFailed
}
if annotatedTaskID != taskID {
log.Warningf("failed to bind: expected task-id %v instead of %v for pod %v/%v", taskID, annotatedTaskID, pod.Namespace, pod.Name)
return errCreateBindingFailed
}
// record this as a bound task for now so that we can avoid racing with the mesos pod source, who is
// watching the apiserver for pod updates and will verify pod-task validity with us upon receiving such
boundSuccessfully := false
defer func() {
if !boundSuccessfully {
r.m.Lock()
defer r.m.Unlock()
delete(r.boundTasks, taskID)
}
}()
func() {
r.m.Lock()
defer r.m.Unlock()
r.boundTasks[taskID] = pod
}()
if pod.Spec.NodeName == "" {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
Annotations: make(map[string]string),
},
Target: api.ObjectReference{
Kind: "Node",
Name: pod.Annotations[meta.BindingHostKey],
},
}
// forward the annotations that the scheduler wants to apply
for k, v := range pod.Annotations {
binding.Annotations[k] = v
}
// create binding on apiserver
log.Infof("Binding task %v pod '%v/%v' to '%v' with annotations %+v...",
taskID, pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
err := r.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
if err != nil {
log.Warningf("failed to bind task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err)
return errCreateBindingFailed
}
} else {
// post annotations update to apiserver
patch := struct {
Metadata struct {
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
}{}
patch.Metadata.Annotations = pod.Annotations
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching annotations %v of task %v pod %v/%v: %v", pod.Annotations, taskID, pod.Namespace, pod.Name, string(patchJson))
err := r.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error()
if err != nil {
log.Errorf("Error updating annotations of ready-to-launch task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err)
return errAnnotationUpdateFailure
}
}
boundSuccessfully = true
r.updates <- &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventBound,
}
log.V(1).Infof("bound task %v to pod %v/%v", taskID, pod.Namespace, pod.Name)
return nil
}

View File

@ -17,12 +17,12 @@ limitations under the License.
package service package service
import ( import (
"fmt" "k8s.io/kubernetes/contrib/mesos/pkg/executor"
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
log "github.com/golang/glog"
) )
const ( const (
@ -32,97 +32,95 @@ const (
mesosSource = kubetypes.ApiserverSource mesosSource = kubetypes.ApiserverSource
) )
// sourceMesos merges pods from mesos, and mirror pods from the apiserver. why? type (
// (a) can't have two sources with the same name; podName struct {
// (b) all sources, other than ApiserverSource are considered static/mirror namespace, name string
// sources, and; }
// (c) kubelet wants to see mirror pods reflected in a non-static source.
// sourceMesos struct {
// Mesos pods must appear to come from apiserver due to (b), while reflected stop <-chan struct{}
// static pods (mirror pods) must appear to come from apiserver due to (c). out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well
// registry executor.Registry
// The only option I could think of was creating a source that merges the pod priorPodNames map[podName]string // map podName to taskID
// streams. I don't like it. But I could think of anything else, other than }
// starting to hack up the kubelet's understanding of mirror/static pod )
// sources (ouch!)
type sourceMesos struct {
sourceFinished chan struct{} // sourceFinished closes when mergeAndForward exits
out chan<- interface{} // out is the sink for merged pod snapshots
mirrorPods chan []*api.Pod // mirrorPods communicates snapshots of the current set of mirror pods
execUpdates <-chan kubetypes.PodUpdate // execUpdates receives snapshots of the current set of mesos pods
}
// newSourceMesos creates a pod config source that merges pod updates from
// mesos (via execUpdates), and mirror pod updates from the apiserver (via
// podWatch) writing the merged update stream to the out chan. It is expected
// that execUpdates will only ever contain SET operations. The source takes
// ownership of the sourceFinished chan, closing it when the source terminates.
// Source termination happens when the execUpdates chan is closed and fully
// drained of updates.
func newSourceMesos( func newSourceMesos(
sourceFinished chan struct{}, stop <-chan struct{},
execUpdates <-chan kubetypes.PodUpdate,
out chan<- interface{}, out chan<- interface{},
podWatch *cache.ListWatch, podWatch *cache.ListWatch,
registry executor.Registry,
) { ) {
source := &sourceMesos{ source := &sourceMesos{
sourceFinished: sourceFinished, stop: stop,
mirrorPods: make(chan []*api.Pod), out: out,
execUpdates: execUpdates, registry: registry,
out: out, priorPodNames: make(map[podName]string),
} }
// reflect changes from the watch into a chan, filtered to include only mirror pods (have an ConfigMirrorAnnotationKey attr) // reflect changes from the watch into a chan, filtered to include only mirror pods
cache.NewReflector(podWatch, &api.Pod{}, cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), 0).RunUntil(sourceFinished) // (have an ConfigMirrorAnnotationKey attr)
go source.mergeAndForward() cache.NewReflector(
podWatch,
&api.Pod{},
cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc),
0,
).RunUntil(stop)
} }
// send is an update callback invoked by NewUndeltaStore
func (source *sourceMesos) send(objs []interface{}) { func (source *sourceMesos) send(objs []interface{}) {
var mirrors []*api.Pod var (
pods = make([]*api.Pod, 0, len(objs))
podNames = make(map[podName]string, len(objs))
)
for _, o := range objs { for _, o := range objs {
p := o.(*api.Pod) p := o.(*api.Pod)
addPod := false
if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok { if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok {
mirrors = append(mirrors, p) // pass through all mirror pods
addPod = true
} else if rpod, err := source.registry.Update(p); err == nil {
// pod is bound to a task, and the update is compatible
// so we'll allow it through
addPod = true
p = rpod.Pod() // use the (possibly) updated pod spec!
podNames[podName{p.Namespace, p.Name}] = rpod.Task()
} else if rpod != nil {
// we were able to ID the pod but the update still failed...
log.Warningf("failed to update registry for task %v pod %v/%v: %v",
rpod.Task(), p.Namespace, p.Name, err)
} else {
// unrecognized pod, skip!
log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name)
} }
if addPod {
pods = append(pods, p)
}
}
// detect when pods are deleted and notify the registry
for k, taskID := range source.priorPodNames {
if _, found := podNames[k]; !found {
source.registry.Remove(taskID)
}
}
source.priorPodNames = podNames
u := kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: pods,
Source: mesosSource,
} }
select { select {
case <-source.sourceFinished: case <-source.stop:
case source.mirrorPods <- mirrors: default:
}
}
func (source *sourceMesos) mergeAndForward() {
// execUpdates will be closed by the executor on shutdown
defer close(source.sourceFinished)
var (
mirrors = []*api.Pod{}
pods = []*api.Pod{}
)
eventLoop:
for {
select { select {
case m := <-source.mirrorPods: case <-source.stop:
mirrors = m[:] case source.out <- u:
u := kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: append(m, pods...),
Source: mesosSource,
}
log.V(3).Infof("mirror update, sending snapshot of size %d", len(u.Pods))
source.out <- u
case u, ok := <-source.execUpdates:
if !ok {
break eventLoop
}
if u.Op != kubetypes.SET {
panic(fmt.Sprintf("unexpected Op type: %v", u.Op))
}
pods = u.Pods[:]
u.Pods = append(u.Pods, mirrors...)
u.Source = mesosSource
log.V(3).Infof("pods update, sending snapshot of size %d", len(u.Pods))
source.out <- u
} }
} }
log.V(2).Infoln("mesos pod source terminating normally") log.V(2).Infof("sent %d pod updates", len(pods))
} }

View File

@ -46,11 +46,6 @@ type KubeletExecutorServer struct {
*options.KubeletServer *options.KubeletServer
SuicideTimeout time.Duration SuicideTimeout time.Duration
LaunchGracePeriod time.Duration LaunchGracePeriod time.Duration
// TODO(sttts): remove necessity to access the kubelet from the executor
klet *kubelet.Kubelet // once set, immutable
kletReady chan struct{} // once closed, klet is guaranteed to be valid and concurrently readable
} }
func NewKubeletExecutorServer() *KubeletExecutorServer { func NewKubeletExecutorServer() *KubeletExecutorServer {
@ -58,7 +53,6 @@ func NewKubeletExecutorServer() *KubeletExecutorServer {
KubeletServer: options.NewKubeletServer(), KubeletServer: options.NewKubeletServer(),
SuicideTimeout: config.DefaultSuicideTimeout, SuicideTimeout: config.DefaultSuicideTimeout,
LaunchGracePeriod: config.DefaultLaunchGracePeriod, LaunchGracePeriod: config.DefaultLaunchGracePeriod,
kletReady: make(chan struct{}),
} }
if pwd, err := os.Getwd(); err != nil { if pwd, err := os.Getwd(); err != nil {
log.Warningf("failed to determine current directory: %v", err) log.Warningf("failed to determine current directory: %v", err)
@ -77,43 +71,20 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
} }
func (s *KubeletExecutorServer) runExecutor( func (s *KubeletExecutorServer) runExecutor(
execUpdates chan<- kubetypes.PodUpdate,
nodeInfos chan<- executor.NodeInfo, nodeInfos chan<- executor.NodeInfo,
kubeletFinished <-chan struct{}, kubeletFinished <-chan struct{},
staticPodsConfigPath string, staticPodsConfigPath string,
apiclient *client.Client, apiclient *client.Client,
podLW *cache.ListWatch, registry executor.Registry,
) error { ) (<-chan struct{}, error) {
exec := executor.New(executor.Config{ exec := executor.New(executor.Config{
Updates: execUpdates, Registry: registry,
APIClient: apiclient, APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
SuicideTimeout: s.SuicideTimeout, SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished, KubeletFinished: kubeletFinished,
ExitFunc: os.Exit, ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
select {
case <-s.kletReady:
default:
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
status, err := s.klet.GetRuntime().GetAPIPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := s.klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath, StaticPodsConfigPath: staticPodsConfigPath,
PodLW: podLW,
NodeInfos: nodeInfos, NodeInfos: nodeInfos,
}) })
@ -125,7 +96,7 @@ func (s *KubeletExecutorServer) runExecutor(
} }
driver, err := bindings.NewMesosExecutorDriver(dconfig) driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil { if err != nil {
return fmt.Errorf("failed to create executor driver: %v", err) return nil, fmt.Errorf("failed to create executor driver: %v", err)
} }
log.V(2).Infof("Initialize executor driver...") log.V(2).Infof("Initialize executor driver...")
exec.Init(driver) exec.Init(driver)
@ -138,16 +109,17 @@ func (s *KubeletExecutorServer) runExecutor(
log.Info("executor Run completed") log.Info("executor Run completed")
}() }()
return nil return exec.Done(), nil
} }
func (s *KubeletExecutorServer) runKubelet( func (s *KubeletExecutorServer) runKubelet(
execUpdates <-chan kubetypes.PodUpdate,
nodeInfos <-chan executor.NodeInfo, nodeInfos <-chan executor.NodeInfo,
kubeletDone chan<- struct{}, kubeletDone chan<- struct{},
staticPodsConfigPath string, staticPodsConfigPath string,
apiclient *client.Client, apiclient *client.Client,
podLW *cache.ListWatch, podLW *cache.ListWatch,
registry executor.Registry,
executorDone <-chan struct{},
) (err error) { ) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
@ -163,19 +135,15 @@ func (s *KubeletExecutorServer) runKubelet(
} }
// apply Mesos specific settings // apply Mesos specific settings
executorDone := make(chan struct{})
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := kubeletapp.CreateAndInitKubelet(kc) k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
if err != nil { if err != nil {
return k, pc, err return k, pc, err
} }
s.klet = k.(*kubelet.Kubelet)
close(s.kletReady) // intentionally crash if this is called more than once
// decorate kubelet such that it shuts down when the executor is // decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{ decorated := &executorKubelet{
Kubelet: s.klet, Kubelet: k.(*kubelet.Kubelet),
kubeletDone: kubeletDone, kubeletDone: kubeletDone,
executorDone: executorDone, executorDone: executorDone,
} }
@ -230,21 +198,20 @@ func (s *KubeletExecutorServer) runKubelet(
} }
}() }()
// create main pod source, it will close executorDone when the executor updates stop flowing // create main pod source, it will stop generating events once executorDone is closed
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW) newSourceMesos(executorDone, kcfg.PodConfig.Channel(mesosSource), podLW, registry)
// create static-pods directory file source // create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
// run the kubelet, until execUpdates is closed // run the kubelet
// NOTE: because kcfg != nil holds, the upstream Run function will not // NOTE: because kcfg != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want // initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master // that because then every kubelet instance would query the master
// state.json which does not scale. // state.json which does not scale.
err = kubeletapp.Run(s.KubeletServer, kcfg) err = kubeletapp.Run(s.KubeletServer, kcfg)
return return
} }
@ -252,7 +219,6 @@ func (s *KubeletExecutorServer) runKubelet(
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
// create shared channels // create shared channels
kubeletFinished := make(chan struct{}) kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
nodeInfos := make(chan executor.NodeInfo, 1) nodeInfos := make(chan executor.NodeInfo, 1)
// create static pods directory // create static pods directory
@ -273,18 +239,22 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
return fmt.Errorf("cannot create API client: %v", err) return fmt.Errorf("cannot create API client: %v", err)
} }
pw := cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, var (
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), pw = cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
)
reg = executor.NewRegistry(apiclient)
) )
// start executor // start executor
err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) var executorDone <-chan struct{}
executorDone, err = s.runExecutor(nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, reg)
if err != nil { if err != nil {
return err return err
} }
// start kubelet, blocking // start kubelet, blocking
return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw) return s.runKubelet(nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient, pw, reg, executorDone)
} }
func defaultBindingAddress() string { func defaultBindingAddress() string {

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor" bindings "github.com/mesos/mesos-go/executor"
"k8s.io/kubernetes/pkg/api"
) )
type suicideTracker struct { type suicideTracker struct {
@ -67,7 +68,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper {
func TestSuicide_zeroTimeout(t *testing.T) { func TestSuicide_zeroTimeout(t *testing.T) {
defer glog.Flush() defer glog.Flush()
k, _ := NewTestKubernetesExecutor() k := NewTestKubernetesExecutor()
tracker := &suicideTracker{suicideWatcher: k.suicideWatch} tracker := &suicideTracker{suicideWatcher: k.suicideWatch}
k.suicideWatch = tracker k.suicideWatch = tracker
@ -92,14 +93,14 @@ func TestSuicide_zeroTimeout(t *testing.T) {
func TestSuicide_WithTasks(t *testing.T) { func TestSuicide_WithTasks(t *testing.T) {
defer glog.Flush() defer glog.Flush()
k, _ := NewTestKubernetesExecutor() k := NewTestKubernetesExecutor()
k.suicideTimeout = 50 * time.Millisecond k.suicideTimeout = 50 * time.Millisecond
jumps := uint32(0) jumps := uint32(0)
tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}
k.suicideWatch = tracker k.suicideWatch = tracker
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding k.registry.bind("foo", &api.Pod{}) // prevent suicide attempts from succeeding
// call reset with a nil timer // call reset with a nil timer
glog.Infoln("Resetting suicide watch with 1 task") glog.Infoln("Resetting suicide watch with 1 task")
@ -119,7 +120,7 @@ func TestSuicide_WithTasks(t *testing.T) {
t.Fatalf("initial suicide watch setup failed") t.Fatalf("initial suicide watch setup failed")
} }
delete(k.tasks, "foo") // zero remaining tasks k.registry.Remove("foo") // zero remaining tasks
k.suicideTimeout = 1500 * time.Millisecond k.suicideTimeout = 1500 * time.Millisecond
suicideStart := time.Now() suicideStart := time.Now()
@ -142,7 +143,7 @@ func TestSuicide_WithTasks(t *testing.T) {
} }
k.lock.Lock() k.lock.Lock()
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding k.registry.bind("foo", &api.Pod{}) // prevent suicide attempts from succeeding
k.lock.Unlock() k.lock.Unlock()
// reset the suicide watch, which should stop the existing timer // reset the suicide watch, which should stop the existing timer
@ -164,7 +165,7 @@ func TestSuicide_WithTasks(t *testing.T) {
} }
k.lock.Lock() k.lock.Lock()
delete(k.tasks, "foo") // allow suicide attempts to schedule k.registry.Remove("foo") // allow suicide attempts to schedule
k.lock.Unlock() k.lock.Unlock()
// reset the suicide watch, which should reset a stopped timer // reset the suicide watch, which should reset a stopped timer

View File

@ -0,0 +1,150 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"sync"
"time"
log "github.com/golang/glog"
)
type (
// filter registration events, return false to abort further processing of the event
watchFilter func(pod *PodEvent) (accept bool)
watchExpiration struct {
// timeout closes when the handler has expired; it delivers at most one Time.
timeout <-chan time.Time
// onEvent is an optional callback that is invoked if/when the expired chan
// closes
onEvent func(taskID string)
}
watchHandler struct {
// prevent callbacks from being invoked simultaneously
sync.Mutex
// handle registration events, return true to indicate the handler should be
// de-registered upon completion. If pod is nil then the associated handler
// has expired.
onEvent func(pod *PodEvent) (done bool, err error)
// expiration is an optional configuration that indicates when a handler should
// be considered to have expired, and what action to take upon such
expiration watchExpiration
}
// watcher observes PodEvent events and conditionally executes handlers that
// have been associated with the taskID of the PodEvent.
watcher struct {
updates <-chan *PodEvent
rw sync.RWMutex
handlers map[string]watchHandler
filters []watchFilter
runOnce chan struct{}
}
)
func newWatcher(updates <-chan *PodEvent) *watcher {
return &watcher{
updates: updates,
handlers: make(map[string]watchHandler),
runOnce: make(chan struct{}),
}
}
func (pw *watcher) run() {
select {
case <-pw.runOnce:
log.Error("run() has already been invoked for this pod-watcher")
return
default:
close(pw.runOnce)
}
updateLoop:
for u := range pw.updates {
log.V(2).Info("filtering " + u.FormatShort())
for _, f := range pw.filters {
if !f(u) {
continue updateLoop
}
}
log.V(1).Info("handling " + u.FormatShort())
h, ok := func() (h watchHandler, ok bool) {
pw.rw.RLock()
defer pw.rw.RUnlock()
h, ok = pw.handlers[u.taskID]
return
}()
if ok {
log.V(1).Info("executing action for " + u.FormatShort())
done, err := func() (bool, error) {
h.Lock()
defer h.Unlock()
return h.onEvent(u)
}()
if err != nil {
log.Error(err)
}
if done {
// de-register handler upon successful completion of action
log.V(1).Info("de-registering handler for " + u.FormatShort())
func() {
pw.rw.Lock()
delete(pw.handlers, u.taskID)
pw.rw.Unlock()
}()
}
}
}
}
func (pw *watcher) addFilter(f watchFilter) {
select {
case <-pw.runOnce:
log.Errorf("failed to add filter because pod-watcher is already running")
default:
pw.filters = append(pw.filters, f)
}
}
// forTask associates a handler `h` with the given taskID.
func (pw *watcher) forTask(taskID string, h watchHandler) {
pw.rw.Lock()
pw.handlers[taskID] = h
pw.rw.Unlock()
if exp := h.expiration; exp.timeout != nil {
go func() {
<-exp.timeout
log.V(1).Infof("expiring handler for task %v", taskID)
// de-register handler upon expiration
pw.rw.Lock()
delete(pw.handlers, taskID)
pw.rw.Unlock()
if exp.onEvent != nil {
h.Lock()
defer h.Unlock()
exp.onEvent(taskID)
}
}()
}
}