Merge pull request #18110 from mesosphere/cherrypick_jdef_fix_executor_state_locks

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-02 15:44:19 -08:00
commit 5a5fa60dd3
2 changed files with 24 additions and 16 deletions

View File

@ -728,19 +728,28 @@ func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullN
// in Docker, then we'll also send a TASK_LOST event. // in Docker, then we'll also send a TASK_LOST event.
func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool { func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool {
// TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks) // TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks)
// isKnownPod() can block so we take special precaution to avoid locking this mutex while calling it
knownTask := func() (ok bool) {
k.lock.Lock() k.lock.Lock()
defer k.lock.Unlock() defer k.lock.Unlock()
_, ok = k.tasks[taskId]
return
}()
// TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the // TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the
// kubelet may constantly attempt to instantiate a pod as long as it's in the pod state that we're // kubelet may constantly attempt to instantiate a pod as long as it's in the pod state that we're
// handing to it. otherwise, we're probably reporting a TASK_LOST prematurely. Should probably // handing to it. otherwise, we're probably reporting a TASK_LOST prematurely. Should probably
// consult RestartPolicy to determine appropriate behavior. Should probably also gracefully handle // consult RestartPolicy to determine appropriate behavior. Should probably also gracefully handle
// docker daemon restarts. // docker daemon restarts.
if _, ok := k.tasks[taskId]; ok { if knownTask {
if isKnownPod() { if isKnownPod() {
return false return false
} else { } else {
log.Warningf("Detected lost pod, reporting lost task %v", taskId) log.Warningf("Detected lost pod, reporting lost task %v", taskId)
k.lock.Lock()
defer k.lock.Unlock()
k.reportLostTask(driver, taskId, messages.ContainersDisappeared) k.reportLostTask(driver, taskId, messages.ContainersDisappeared)
} }
} else { } else {

View File

@ -21,7 +21,6 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
@ -47,8 +46,10 @@ type KubeletExecutorServer struct {
SuicideTimeout time.Duration SuicideTimeout time.Duration
LaunchGracePeriod time.Duration LaunchGracePeriod time.Duration
kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor // TODO(sttts): remove necessity to access the kubelet from the executor
klet *kubelet.Kubelet
klet *kubelet.Kubelet // once set, immutable
kletReady chan struct{} // once closed, klet is guaranteed to be valid and concurrently readable
} }
func NewKubeletExecutorServer() *KubeletExecutorServer { func NewKubeletExecutorServer() *KubeletExecutorServer {
@ -56,6 +57,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer {
KubeletServer: kubeletapp.NewKubeletServer(), KubeletServer: kubeletapp.NewKubeletServer(),
SuicideTimeout: config.DefaultSuicideTimeout, SuicideTimeout: config.DefaultSuicideTimeout,
LaunchGracePeriod: config.DefaultLaunchGracePeriod, LaunchGracePeriod: config.DefaultLaunchGracePeriod,
kletReady: make(chan struct{}),
} }
if pwd, err := os.Getwd(); err != nil { if pwd, err := os.Getwd(); err != nil {
log.Warningf("failed to determine current directory: %v", err) log.Warningf("failed to determine current directory: %v", err)
@ -89,10 +91,9 @@ func (s *KubeletExecutorServer) runExecutor(
KubeletFinished: kubeletFinished, KubeletFinished: kubeletFinished,
ExitFunc: os.Exit, ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
s.kletLock.Lock() select {
defer s.kletLock.Unlock() case <-s.kletReady:
default:
if s.klet == nil {
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
} }
@ -149,22 +150,20 @@ func (s *KubeletExecutorServer) runKubelet(
) error { ) error {
kcfg, err := s.UnsecuredKubeletConfig() kcfg, err := s.UnsecuredKubeletConfig()
if err == nil { if err == nil {
// apply Messo specific settings // apply Mesos specific settings
executorDone := make(chan struct{}) executorDone := make(chan struct{})
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := kubeletapp.CreateAndInitKubelet(kc) k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
if err != nil { if err != nil {
return k, pc, err return k, pc, err
} }
klet := k.(*kubelet.Kubelet)
s.kletLock.Lock() s.klet = k.(*kubelet.Kubelet)
s.klet = klet close(s.kletReady) // intentionally crash if this is called more than once
s.kletLock.Unlock()
// decorate kubelet such that it shuts down when the executor is // decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{ decorated := &executorKubelet{
Kubelet: klet, Kubelet: s.klet,
kubeletDone: kubeletDone, kubeletDone: kubeletDone,
executorDone: executorDone, executorDone: executorDone,
} }