From e4dcd97ac302b0e47c2df02a739ef18d384945e2 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 17 Sep 2015 13:50:32 +0200 Subject: [PATCH 1/2] Dequeue pods in scheduler which are terminating --- contrib/mesos/pkg/scheduler/plugin.go | 7 ++++++- contrib/mesos/pkg/scheduler/pod.go | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 4bb7ac770e5..3283597421b 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -438,7 +438,12 @@ func (q *queuer) Run(done <-chan struct{}) { pod := p.(*Pod) if recoverAssignedSlave(pod.Pod) != "" { - log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name) + log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name) + q.dequeue(pod.GetUID()) + } else if pod.InGracefulTermination() { + // pods which are pre-scheduled (i.e. NodeName is set) may be gracefully deleted, + // even though they are not running yet. + log.V(3).Infof("dequeuing graceful deleted pre-scheduled pod for scheduling: %v", pod.Pod.Name) q.dequeue(pod.GetUID()) } else { // use ReplaceExisting because we are always pushing the latest state diff --git a/contrib/mesos/pkg/scheduler/pod.go b/contrib/mesos/pkg/scheduler/pod.go index bf70100bcf9..b3b2ec1c503 100644 --- a/contrib/mesos/pkg/scheduler/pod.go +++ b/contrib/mesos/pkg/scheduler/pod.go @@ -78,3 +78,8 @@ func (p *Pod) String() string { } return fmt.Sprintf("{pod:%v, deadline:%v, delay:%v}", p.Pod.Name, displayDeadline, p.GetDelay()) } + +func (p *Pod) InGracefulTermination() bool { + return p.Pod.DeletionTimestamp != nil && + p.Pod.DeletionGracePeriodSeconds != nil && *p.Pod.DeletionGracePeriodSeconds > 0 +} From be57b2871e74d6a16b042e36abcb1a70270472cd Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 17 Sep 2015 13:51:38 +0200 Subject: [PATCH 2/2] Watch pods in executor and tell kubelet about graceful terminations - instantiate framework.Controller for pods in the executor using framework.NewInformer, in order to watch pod updates for pods on that host - forwards updates like graceful termination to the kubelet. This might also be the preparation for other updates which are supported by the kubelet. --- contrib/mesos/pkg/executor/executor.go | 66 +++++++++++++++++++ contrib/mesos/pkg/executor/service/service.go | 3 + 2 files changed, 69 insertions(+) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 6aec2e97033..0c3be6bcf2c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -35,7 +35,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -46,6 +48,7 @@ import ( const ( containerPollTime = 1 * time.Second launchGracePeriod = 5 * time.Minute + podRelistPeriod = 5 * time.Minute ) type stateType int32 @@ -120,6 +123,7 @@ type KubernetesExecutor struct { staticPodsConfig []byte staticPodsConfigPath string initialRegComplete chan struct{} + podController *framework.Controller } type Config struct { @@ -135,6 +139,7 @@ type Config struct { ExitFunc func(int) PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) StaticPodsConfigPath string + PodLW cache.ListerWatcher } func (k *KubernetesExecutor) isConnected() bool { @@ -163,6 +168,7 @@ func New(config Config) *KubernetesExecutor { initialRegComplete: make(chan struct{}), staticPodsConfigPath: config.StaticPodsConfigPath, } + //TODO(jdef) do something real with these events.. if config.Watch != nil { events := config.Watch.ResultChan() @@ -176,12 +182,33 @@ func New(config Config) *KubernetesExecutor { k.events = events } } + + // watch pods from the given pod ListWatch + _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name) + k.handleChangedApiserverPod(pod) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod := newObj.(*api.Pod) + log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name) + k.handleChangedApiserverPod(pod) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*api.Pod) + log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name) + }, + }) + return k } func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { k.killKubeletContainers() k.resetSuicideWatch(driver) + + go k.podController.Run(k.done) go k.sendLoop() //TODO(jdef) monitor kubeletFinished and shutdown if it happens } @@ -329,6 +356,45 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo go k.launchTask(driver, taskId, pod) } +func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { + // exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already + taskId := pod.Annotations[meta.TaskIdKey] + if taskId == "" { + log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey) + return + } + + k.lock.Lock() + defer k.lock.Unlock() + + // exclude tasks which are already deleted from our task registry + task := k.tasks[taskId] + if task == nil { + log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name) + return + } + + oldPod := k.pods[task.podName] + + // terminating pod? + if oldPod != nil && oldPod.DeletionTimestamp == nil && + pod.DeletionTimestamp != nil && pod.Status.Phase == api.PodRunning && + pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds > 0 { + + log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds) + + // modify pod in our registry to avoid that other changed bleed into the kubelet + oldPod.DeletionTimestamp = pod.DeletionTimestamp + oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds + + update := kubelet.PodUpdate{ + Op: kubelet.UPDATE, + Pods: []*api.Pod{oldPod}, + } + k.updateChan <- update + } +} + // determine whether we need to start a suicide countdown. if so, then start // a timer that, upon expiration, causes this executor to commit suicide. // this implementation runs asynchronously. callers that wish to wait for the diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 5037a7f661e..035c6dc2aa2 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -36,8 +36,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/redirfd" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/credentialprovider" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -372,6 +374,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( return klet.GetRuntime().GetPodStatus(pod) }, StaticPodsConfigPath: staticPodsConfigPath, + PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)), }) go exec.InitializeStaticPodsSource(func() {