diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 123ea71919c..caa3062fc91 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -152,7 +152,9 @@ func (s *podStorage) Merge(source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() + seenBefore := s.sourcesSeen.Has(source) adds, updates, deletes := s.merge(source, change) + firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications switch s.mode { @@ -160,7 +162,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { if len(deletes.Pods) > 0 { s.updates <- *deletes } - if len(adds.Pods) > 0 { + if len(adds.Pods) > 0 || firstSet { s.updates <- *adds } if len(updates.Pods) > 0 { @@ -168,15 +170,15 @@ func (s *podStorage) Merge(source string, change interface{}) error { } case PodConfigNotificationSnapshotAndUpdates: + if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { + s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} + } if len(updates.Pods) > 0 { s.updates <- *updates } - if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} - } case PodConfigNotificationSnapshot: - if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} } diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 00f9007db99..362a488aca7 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -267,6 +267,31 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } +func TestInitialEmptySet(t *testing.T) { + for _, test := range []struct { + mode PodConfigNotificationMode + op kubetypes.PodOperation + }{ + {PodConfigNotificationIncremental, kubetypes.ADD}, + {PodConfigNotificationSnapshot, kubetypes.SET}, + {PodConfigNotificationSnapshotAndUpdates, kubetypes.SET}, + } { + channel, ch, _ := createPodConfigTester(test.mode) + + // should register an empty PodUpdate operation + podUpdate := CreatePodUpdate(kubetypes.SET, TestSource) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource)) + + // should ignore following empty sets + podUpdate = CreatePodUpdate(kubetypes.SET, TestSource) + channel <- podUpdate + podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new"))) + } +} + func TestPodUpdateAnnotations(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)