Fix possible data race in prober

This commit is contained in:
Tim St. Clair 2016-02-17 18:15:11 -08:00
parent 3042f1d1c8
commit 15d44d182a
2 changed files with 32 additions and 11 deletions

View File

@ -109,8 +109,6 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager) Mana
// This method normalizes the status before comparing so as to make sure that meaningless // This method normalizes the status before comparing so as to make sure that meaningless
// changes will be ignored. // changes will be ignored.
func isStatusEqual(oldStatus, status *api.PodStatus) bool { func isStatusEqual(oldStatus, status *api.PodStatus) bool {
normalizeStatus(oldStatus)
normalizeStatus(status)
return api.Semantic.DeepEqual(status, oldStatus) return api.Semantic.DeepEqual(status, oldStatus)
} }
@ -146,7 +144,11 @@ func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
// Make sure we're caching a deep copy.
status, err := copyStatus(&status)
if err != nil {
return
}
m.updateStatusInternal(pod, status) m.updateStatusInternal(pod, status)
} }
@ -188,12 +190,10 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
} }
// Make sure we're not updating the cached version. // Make sure we're not updating the cached version.
clone, err := api.Scheme.DeepCopy(&oldStatus.status) status, err := copyStatus(&oldStatus.status)
if err != nil { if err != nil {
glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err)
return return
} }
status := *clone.(*api.PodStatus)
status.ContainerStatuses[containerIndex].Ready = ready status.ContainerStatuses[containerIndex].Ready = ready
// Update pod condition. // Update pod condition.
@ -267,6 +267,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
status.StartTime = &now status.StartTime = &now
} }
normalizeStatus(&status)
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
@ -435,13 +436,19 @@ func (m *manager) needsReconcile(uid types.UID, status api.PodStatus) bool {
pod = mirrorPod pod = mirrorPod
} }
if isStatusEqual(&pod.Status, &status) { podStatus, err := copyStatus(&pod.Status)
if err != nil {
return false
}
normalizeStatus(&podStatus)
if isStatusEqual(&podStatus, &status) {
// If the status from the source is the same with the cached status, // If the status from the source is the same with the cached status,
// reconcile is not needed. Just return. // reconcile is not needed. Just return.
return false return false
} }
glog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %+v", format.Pod(pod), glog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %+v", format.Pod(pod),
util.ObjectDiff(pod.Status, status)) util.ObjectDiff(podStatus, status))
return true return true
} }
@ -495,3 +502,13 @@ func notRunning(statuses []api.ContainerStatus) bool {
} }
return true return true
} }
func copyStatus(source *api.PodStatus) (api.PodStatus, error) {
clone, err := api.Scheme.DeepCopy(source)
if err != nil {
glog.Errorf("Failed to clone status %+v: %v", source, err)
return api.PodStatus{}, err
}
status := *clone.(*api.PodStatus)
return status, nil
}

View File

@ -205,8 +205,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
if finalStatus.StartTime.IsZero() { if finalStatus.StartTime.IsZero() {
t.Errorf("StartTime should not be zero") t.Errorf("StartTime should not be zero")
} }
if !finalStatus.StartTime.Time.Equal(now.Time) { expected := now.Rfc3339Copy()
t.Errorf("Expected %v, but got %v", now.Time, finalStatus.StartTime.Time) if !finalStatus.StartTime.Equal(expected) {
t.Errorf("Expected %v, but got %v", expected, finalStatus.StartTime)
} }
} }
@ -464,8 +465,10 @@ func TestStatusEquality(t *testing.T) {
oldPodStatus := api.PodStatus{ oldPodStatus := api.PodStatus{
ContainerStatuses: shuffle(podStatus.ContainerStatuses), ContainerStatuses: shuffle(podStatus.ContainerStatuses),
} }
normalizeStatus(&oldPodStatus)
normalizeStatus(&podStatus)
if !isStatusEqual(&oldPodStatus, &podStatus) { if !isStatusEqual(&oldPodStatus, &podStatus) {
t.Fatalf("Order of container statuses should not affect equality.") t.Fatalf("Order of container statuses should not affect normalized equality.")
} }
} }
} }
@ -494,6 +497,7 @@ func TestStaticPodStatus(t *testing.T) {
m.SetPodStatus(staticPod, status) m.SetPodStatus(staticPod, status)
retrievedStatus := expectPodStatus(t, m, staticPod) retrievedStatus := expectPodStatus(t, m, staticPod)
normalizeStatus(&status)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)