From 6af86cbaadfb6c1c9cd3c58cd44deafb8fedc28e Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 16:37:25 +0200 Subject: [PATCH] Avoid panics during executor shutdown due to write to closed channel --- contrib/mesos/pkg/executor/executor.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 254e0b219e5..02b17cddee3 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -107,7 +107,7 @@ type KubernetesExecutor struct { state stateType tasks map[string]*kuberTask pods map[string]*api.Pod - lock sync.RWMutex + lock sync.Mutex sourcename string client *client.Client done chan struct{} // signals shutdown @@ -269,6 +269,11 @@ func (k *KubernetesExecutor) onInitialRegistration() { defer close(k.initialRegComplete) // emit an empty update to allow the mesos "source" to be marked as seen + k.lock.Lock() + defer k.lock.Unlock() + if k.isDone() { + return + } k.updateChan <- kubetypes.PodUpdate{ Pods: []*api.Pod{}, Op: kubetypes.SET, @@ -397,6 +402,9 @@ func (k *KubernetesExecutor) handleChangedApiserverPod(pod *api.Pod) { oldPod.DeletionTimestamp = pod.DeletionTimestamp oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.UPDATE, Pods: []*api.Pod{oldPod}, @@ -570,6 +578,9 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s k.pods[podFullName] = pod // send the new pod to the kubelet which will spin it up + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.ADD, Pods: []*api.Pod{pod}, @@ -775,6 +786,9 @@ func (k *KubernetesExecutor) removePodTask(driver bindings.ExecutorDriver, tid, delete(k.pods, pid) // tell the kubelet to remove the pod + if k.isDone() { + return + } update := kubetypes.PodUpdate{ Op: kubetypes.REMOVE, Pods: []*api.Pod{pod},