Merge pull request #13966 from mesosphere/mesos-graceful-termination

MESOS: Send graceful termination update from executor to kubelet
This commit is contained in:
Marek Grabowski 2015-09-23 09:51:29 +02:00
commit 28585bc699
4 changed files with 80 additions and 1 deletions

View File

@ -35,7 +35,9 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
@ -46,6 +48,7 @@ import (
const (
containerPollTime = 1 * time.Second
launchGracePeriod = 5 * time.Minute
podRelistPeriod = 5 * time.Minute
)
type stateType int32
@ -120,6 +123,7 @@ type KubernetesExecutor struct {
staticPodsConfig []byte
staticPodsConfigPath string
initialRegComplete chan struct{}
podController *framework.Controller
}
type Config struct {
@ -135,6 +139,7 @@ type Config struct {
ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string
PodLW cache.ListerWatcher
}
func (k *KubernetesExecutor) isConnected() bool {
@ -163,6 +168,7 @@ func New(config Config) *KubernetesExecutor {
initialRegComplete: make(chan struct{}),
staticPodsConfigPath: config.StaticPodsConfigPath,
}
//TODO(jdef) do something real with these events..
if config.Watch != nil {
events := config.Watch.ResultChan()
@ -176,12 +182,33 @@ func New(config Config) *KubernetesExecutor {
k.events = events
}
}
// watch pods from the given pod ListWatch
_, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
k.handleChangedApiserverPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*api.Pod)
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
k.handleChangedApiserverPod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
},
})
return k
}
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers()
k.resetSuicideWatch(driver)
go k.podController.Run(k.done)
go k.sendLoop()
//TODO(jdef) monitor kubeletFinished and shutdown if it happens
}
@ -329,6 +356,45 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo
go k.launchTask(driver, taskId, pod)
}
func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) {
// exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return
}
k.lock.Lock()
defer k.lock.Unlock()
// exclude tasks which are already deleted from our task registry
task := k.tasks[taskId]
if task == nil {
log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name)
return
}
oldPod := k.pods[task.podName]
// terminating pod?
if oldPod != nil && oldPod.DeletionTimestamp == nil &&
pod.DeletionTimestamp != nil && pod.Status.Phase == api.PodRunning &&
pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds > 0 {
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
// modify pod in our registry to avoid that other changed bleed into the kubelet
oldPod.DeletionTimestamp = pod.DeletionTimestamp
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
update := kubelet.PodUpdate{
Op: kubelet.UPDATE,
Pods: []*api.Pod{oldPod},
}
k.updateChan <- update
}
}
// determine whether we need to start a suicide countdown. if so, then start
// a timer that, upon expiration, causes this executor to commit suicide.
// this implementation runs asynchronously. callers that wish to wait for the

View File

@ -36,8 +36,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
"k8s.io/kubernetes/contrib/mesos/pkg/redirfd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -372,6 +374,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
return klet.GetRuntime().GetPodStatus(pod)
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
})
go exec.InitializeStaticPodsSource(func() {

View File

@ -438,7 +438,12 @@ func (q *queuer) Run(done <-chan struct{}) {
pod := p.(*Pod)
if recoverAssignedSlave(pod.Pod) != "" {
log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name)
log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID())
} else if pod.InGracefulTermination() {
// pods which are pre-scheduled (i.e. NodeName is set) may be gracefully deleted,
// even though they are not running yet.
log.V(3).Infof("dequeuing graceful deleted pre-scheduled pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID())
} else {
// use ReplaceExisting because we are always pushing the latest state

View File

@ -78,3 +78,8 @@ func (p *Pod) String() string {
}
return fmt.Sprintf("{pod:%v, deadline:%v, delay:%v}", p.Pod.Name, displayDeadline, p.GetDelay())
}
func (p *Pod) InGracefulTermination() bool {
return p.Pod.DeletionTimestamp != nil &&
p.Pod.DeletionGracePeriodSeconds != nil && *p.Pod.DeletionGracePeriodSeconds > 0
}