From 02a11f7a2e8b6d3dcf14a6764c57cc9329f2c8df Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Wed, 2 Dec 2015 16:58:55 -0500 Subject: [PATCH] Merge pull request #53 from mesosphere/jdef_fix_executor_state_locks MESOS: avoid mutex locking around blocking calls in kubelet-executor Conflicts: contrib/mesos/pkg/executor/service/service.go --- contrib/mesos/pkg/executor/executor.go | 15 ++++++++--- contrib/mesos/pkg/executor/service/service.go | 25 +++++++++---------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 1a219fe9ba4..b92e3e12a43 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -728,19 +728,28 @@ func (k *Executor) __launchTask(driver bindings.ExecutorDriver, taskId, podFullN // in Docker, then we'll also send a TASK_LOST event. func (k *Executor) checkForLostPodTask(driver bindings.ExecutorDriver, taskId string, isKnownPod func() bool) bool { // TODO (jdefelice) don't send false alarms for deleted pods (KILLED tasks) - k.lock.Lock() - defer k.lock.Unlock() + + // isKnownPod() can block so we take special precaution to avoid locking this mutex while calling it + knownTask := func() (ok bool) { + k.lock.Lock() + defer k.lock.Unlock() + _, ok = k.tasks[taskId] + return + }() // TODO(jdef) we should really consider k.pods here, along with what docker is reporting, since the // kubelet may constantly attempt to instantiate a pod as long as it's in the pod state that we're // handing to it. otherwise, we're probably reporting a TASK_LOST prematurely. Should probably // consult RestartPolicy to determine appropriate behavior. Should probably also gracefully handle // docker daemon restarts. - if _, ok := k.tasks[taskId]; ok { + if knownTask { if isKnownPod() { return false } else { log.Warningf("Detected lost pod, reporting lost task %v", taskId) + + k.lock.Lock() + defer k.lock.Unlock() k.reportLostTask(driver, taskId, messages.ContainersDisappeared) } } else { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 1d3f858dd56..4b3028e771c 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -21,7 +21,6 @@ import ( "net" "os" "path/filepath" - "sync" "time" log "github.com/golang/glog" @@ -47,8 +46,10 @@ type KubeletExecutorServer struct { SuicideTimeout time.Duration LaunchGracePeriod time.Duration - kletLock sync.Mutex // TODO(sttts): remove necessity to access the kubelet from the executor - klet *kubelet.Kubelet + // TODO(sttts): remove necessity to access the kubelet from the executor + + klet *kubelet.Kubelet // once set, immutable + kletReady chan struct{} // once closed, klet is guaranteed to be valid and concurrently readable } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -56,6 +57,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { KubeletServer: kubeletapp.NewKubeletServer(), SuicideTimeout: config.DefaultSuicideTimeout, LaunchGracePeriod: config.DefaultLaunchGracePeriod, + kletReady: make(chan struct{}), } if pwd, err := os.Getwd(); err != nil { log.Warningf("failed to determine current directory: %v", err) @@ -89,10 +91,9 @@ func (s *KubeletExecutorServer) runExecutor( KubeletFinished: kubeletFinished, ExitFunc: os.Exit, PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) { - s.kletLock.Lock() - defer s.kletLock.Unlock() - - if s.klet == nil { + select { + case <-s.kletReady: + default: return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized") } @@ -149,22 +150,20 @@ func (s *KubeletExecutorServer) runKubelet( ) error { kcfg, err := s.UnsecuredKubeletConfig() if err == nil { - // apply Messo specific settings + // apply Mesos specific settings executorDone := make(chan struct{}) kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { k, pc, err := kubeletapp.CreateAndInitKubelet(kc) if err != nil { return k, pc, err } - klet := k.(*kubelet.Kubelet) - s.kletLock.Lock() - s.klet = klet - s.kletLock.Unlock() + s.klet = k.(*kubelet.Kubelet) + close(s.kletReady) // intentionally crash if this is called more than once // decorate kubelet such that it shuts down when the executor is decorated := &executorKubelet{ - Kubelet: klet, + Kubelet: s.klet, kubeletDone: kubeletDone, executorDone: executorDone, }