From 5cfeb53057b879c8ed9e3c402a7802f50a90ef05 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 14 Oct 2015 14:22:13 +0200 Subject: [PATCH 1/3] Enforce an initial empty SET PodConfig In PodConfigNotificationIncremental PodConfig mode, when no pods are available for a source, the Merge function correctly concluded that neither ADD, UPDATE nor REMOVE updates are to be sent to the kubelet. But as a consequence the kubelet will not mark that source as seen. This is usually not a problem for the apiserver source. But it is a problem for an empty "file" source, e.g. by passing an empty directory to the kubelet for static pods. Then the file source will never be seen and the kubelet will stay in its special not-all-source-seen mode. --- pkg/kubelet/config/config.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 4791a4ce72f..efe86e2eebb 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -152,11 +152,16 @@ func (s *podStorage) Merge(source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() + seenBefore := s.sourcesSeen.Has(source) adds, updates, deletes := s.merge(source, change) + firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications switch s.mode { case PodConfigNotificationIncremental: + if firstSet { + s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} + } if len(deletes.Pods) > 0 { s.updates <- *deletes } @@ -168,15 +173,15 @@ func (s *podStorage) Merge(source string, change interface{}) error { } case PodConfigNotificationSnapshotAndUpdates: + if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { + s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} + } if len(updates.Pods) > 0 { s.updates <- *updates } - if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} - } case PodConfigNotificationSnapshot: - if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} } From 7dddec679957199abcacaa8a4dbf654ca8abc02c Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 14 Oct 2015 15:38:44 +0200 Subject: [PATCH 2/3] Switch to empty ADD PodUpdate for PodConfigNotificationIncremental mode --- pkg/kubelet/config/config.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index efe86e2eebb..f486124d0e2 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -159,13 +159,10 @@ func (s *podStorage) Merge(source string, change interface{}) error { // deliver update notifications switch s.mode { case PodConfigNotificationIncremental: - if firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} - } if len(deletes.Pods) > 0 { s.updates <- *deletes } - if len(adds.Pods) > 0 { + if len(adds.Pods) > 0 || firstSet { s.updates <- *adds } if len(updates.Pods) > 0 { From 651f02aec25f1b8c1be3d24ec346bd4bf318e365 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 14 Oct 2015 16:35:25 +0200 Subject: [PATCH 3/3] Add unit tests for first empty SET --- pkg/kubelet/config/config_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 00f9007db99..362a488aca7 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -267,6 +267,31 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } +func TestInitialEmptySet(t *testing.T) { + for _, test := range []struct { + mode PodConfigNotificationMode + op kubetypes.PodOperation + }{ + {PodConfigNotificationIncremental, kubetypes.ADD}, + {PodConfigNotificationSnapshot, kubetypes.SET}, + {PodConfigNotificationSnapshotAndUpdates, kubetypes.SET}, + } { + channel, ch, _ := createPodConfigTester(test.mode) + + // should register an empty PodUpdate operation + podUpdate := CreatePodUpdate(kubetypes.SET, TestSource) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource)) + + // should ignore following empty sets + podUpdate = CreatePodUpdate(kubetypes.SET, TestSource) + channel <- podUpdate + podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new"))) + } +} + func TestPodUpdateAnnotations(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)