mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Watch pods in executor and tell kubelet about graceful terminations
- instantiate framework.Controller for pods in the executor using framework.NewInformer, in order to watch pod updates for pods on that host - forwards updates like graceful termination to the kubelet. This might also be the preparation for other updates which are supported by the kubelet.
This commit is contained in:
parent
e4dcd97ac3
commit
be57b2871e
@ -35,7 +35,9 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/kubelet"
|
"k8s.io/kubernetes/pkg/kubelet"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||||
@ -46,6 +48,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
containerPollTime = 1 * time.Second
|
containerPollTime = 1 * time.Second
|
||||||
launchGracePeriod = 5 * time.Minute
|
launchGracePeriod = 5 * time.Minute
|
||||||
|
podRelistPeriod = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
type stateType int32
|
type stateType int32
|
||||||
@ -120,6 +123,7 @@ type KubernetesExecutor struct {
|
|||||||
staticPodsConfig []byte
|
staticPodsConfig []byte
|
||||||
staticPodsConfigPath string
|
staticPodsConfigPath string
|
||||||
initialRegComplete chan struct{}
|
initialRegComplete chan struct{}
|
||||||
|
podController *framework.Controller
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -135,6 +139,7 @@ type Config struct {
|
|||||||
ExitFunc func(int)
|
ExitFunc func(int)
|
||||||
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
|
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
|
||||||
StaticPodsConfigPath string
|
StaticPodsConfigPath string
|
||||||
|
PodLW cache.ListerWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KubernetesExecutor) isConnected() bool {
|
func (k *KubernetesExecutor) isConnected() bool {
|
||||||
@ -163,6 +168,7 @@ func New(config Config) *KubernetesExecutor {
|
|||||||
initialRegComplete: make(chan struct{}),
|
initialRegComplete: make(chan struct{}),
|
||||||
staticPodsConfigPath: config.StaticPodsConfigPath,
|
staticPodsConfigPath: config.StaticPodsConfigPath,
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO(jdef) do something real with these events..
|
//TODO(jdef) do something real with these events..
|
||||||
if config.Watch != nil {
|
if config.Watch != nil {
|
||||||
events := config.Watch.ResultChan()
|
events := config.Watch.ResultChan()
|
||||||
@ -176,12 +182,33 @@ func New(config Config) *KubernetesExecutor {
|
|||||||
k.events = events
|
k.events = events
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watch pods from the given pod ListWatch
|
||||||
|
_, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
pod := obj.(*api.Pod)
|
||||||
|
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
|
||||||
|
k.handleChangedApiserverPod(pod)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
pod := newObj.(*api.Pod)
|
||||||
|
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
|
||||||
|
k.handleChangedApiserverPod(pod)
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
pod := obj.(*api.Pod)
|
||||||
|
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
return k
|
return k
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
|
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
|
||||||
k.killKubeletContainers()
|
k.killKubeletContainers()
|
||||||
k.resetSuicideWatch(driver)
|
k.resetSuicideWatch(driver)
|
||||||
|
|
||||||
|
go k.podController.Run(k.done)
|
||||||
go k.sendLoop()
|
go k.sendLoop()
|
||||||
//TODO(jdef) monitor kubeletFinished and shutdown if it happens
|
//TODO(jdef) monitor kubeletFinished and shutdown if it happens
|
||||||
}
|
}
|
||||||
@ -329,6 +356,45 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo
|
|||||||
go k.launchTask(driver, taskId, pod)
|
go k.launchTask(driver, taskId, pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) {
|
||||||
|
// exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already
|
||||||
|
taskId := pod.Annotations[meta.TaskIdKey]
|
||||||
|
if taskId == "" {
|
||||||
|
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
k.lock.Lock()
|
||||||
|
defer k.lock.Unlock()
|
||||||
|
|
||||||
|
// exclude tasks which are already deleted from our task registry
|
||||||
|
task := k.tasks[taskId]
|
||||||
|
if task == nil {
|
||||||
|
log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
oldPod := k.pods[task.podName]
|
||||||
|
|
||||||
|
// terminating pod?
|
||||||
|
if oldPod != nil && oldPod.DeletionTimestamp == nil &&
|
||||||
|
pod.DeletionTimestamp != nil && pod.Status.Phase == api.PodRunning &&
|
||||||
|
pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds > 0 {
|
||||||
|
|
||||||
|
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
|
||||||
|
|
||||||
|
// modify pod in our registry to avoid that other changed bleed into the kubelet
|
||||||
|
oldPod.DeletionTimestamp = pod.DeletionTimestamp
|
||||||
|
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
|
||||||
|
|
||||||
|
update := kubelet.PodUpdate{
|
||||||
|
Op: kubelet.UPDATE,
|
||||||
|
Pods: []*api.Pod{oldPod},
|
||||||
|
}
|
||||||
|
k.updateChan <- update
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// determine whether we need to start a suicide countdown. if so, then start
|
// determine whether we need to start a suicide countdown. if so, then start
|
||||||
// a timer that, upon expiration, causes this executor to commit suicide.
|
// a timer that, upon expiration, causes this executor to commit suicide.
|
||||||
// this implementation runs asynchronously. callers that wish to wait for the
|
// this implementation runs asynchronously. callers that wish to wait for the
|
||||||
|
@ -36,8 +36,10 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
|
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/redirfd"
|
"k8s.io/kubernetes/contrib/mesos/pkg/redirfd"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/credentialprovider"
|
"k8s.io/kubernetes/pkg/credentialprovider"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/healthz"
|
"k8s.io/kubernetes/pkg/healthz"
|
||||||
"k8s.io/kubernetes/pkg/kubelet"
|
"k8s.io/kubernetes/pkg/kubelet"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||||
@ -372,6 +374,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
|
|||||||
return klet.GetRuntime().GetPodStatus(pod)
|
return klet.GetRuntime().GetPodStatus(pod)
|
||||||
},
|
},
|
||||||
StaticPodsConfigPath: staticPodsConfigPath,
|
StaticPodsConfigPath: staticPodsConfigPath,
|
||||||
|
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
|
||||||
})
|
})
|
||||||
|
|
||||||
go exec.InitializeStaticPodsSource(func() {
|
go exec.InitializeStaticPodsSource(func() {
|
||||||
|
Loading…
Reference in New Issue
Block a user