diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 6aec2e97033..0c3be6bcf2c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -35,7 +35,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -46,6 +48,7 @@ import ( const ( containerPollTime = 1 * time.Second launchGracePeriod = 5 * time.Minute + podRelistPeriod = 5 * time.Minute ) type stateType int32 @@ -120,6 +123,7 @@ type KubernetesExecutor struct { staticPodsConfig []byte staticPodsConfigPath string initialRegComplete chan struct{} + podController *framework.Controller } type Config struct { @@ -135,6 +139,7 @@ type Config struct { ExitFunc func(int) PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string + PodLW cache.ListerWatcher } func (k *KubernetesExecutor) isConnected() bool { @@ -163,6 +168,7 @@ func New(config Config) *KubernetesExecutor { initialRegComplete: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, } + //TODO(jdef) do something real with these events.. if config.Watch != nil { events := config.Watch.ResultChan() @@ -176,12 +182,33 @@ func New(config Config) *KubernetesExecutor { k.events = events } } + + // watch pods from the given pod ListWatch + _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name) + k.handleChangedApiserverPod(pod) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod := newObj.(*api.Pod) + log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name) + k.handleChangedApiserverPod(pod) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name) + }, + }) + return k } func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) + + go k.podController.Run(k.done) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } @@ -329,6 +356,45 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo go k.launchTask(driver, taskId, pod) } +func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { + // exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already + taskId := pod.Annotations[meta.TaskIdKey] + if taskId == "" { + log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) + return + } + + k.lock.Lock() + defer k.lock.Unlock() + + // exclude tasks which are already deleted from our task registry + task := k.tasks[taskId] + if task == nil { + log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name) + return + } + + oldPod := k.pods[task.podName] + + // terminating pod? + if oldPod != nil && oldPod.DeletionTimestamp == nil && + pod.DeletionTimestamp != nil && pod.Status.Phase == api.PodRunning && + pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds > 0 { + + log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds) + + // modify pod in our registry to avoid that other changed bleed into the kubelet + oldPod.DeletionTimestamp = pod.DeletionTimestamp + oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds + + update := kubelet.PodUpdate{ + Op: kubelet.UPDATE, + Pods: []*api.Pod{oldPod}, + } + k.updateChan <- update + } +} + // determine whether we need to start a suicide countdown. if so, then start // a timer that, upon expiration, causes this executor to commit suicide. // this implementation runs asynchronously. callers that wish to wait for the diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 5037a7f661e..035c6dc2aa2 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -36,8 +36,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/redirfd" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -372,6 +374,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return klet.GetRuntime().GetPodStatus(pod) }, StaticPodsConfigPath: staticPodsConfigPath, + PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), }) go exec.InitializeStaticPodsSource(func() {