From 7da0378f3c6526d998cd205539382bb89d500e0e Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Tue, 16 Dec 2014 21:11:27 -0800 Subject: [PATCH] Track the sources that the kubelet has seen, and only delete pods when every source has been seen at least once. --- pkg/kubelet/config/config.go | 48 ++++++++-- pkg/kubelet/config/config_test.go | 89 ++++++++++--------- pkg/kubelet/config/etcd.go | 2 +- pkg/kubelet/config/file.go | 4 +- pkg/kubelet/config/file_test.go | 8 +- pkg/kubelet/config/http.go | 4 +- pkg/kubelet/config/http_test.go | 4 +- pkg/kubelet/dockertools/docker_test.go | 2 +- pkg/kubelet/dockertools/fake_docker_client.go | 6 +- pkg/kubelet/kubelet.go | 14 ++- pkg/kubelet/kubelet_test.go | 44 +++++++++ pkg/kubelet/server.go | 6 +- pkg/kubelet/types.go | 17 +++- pkg/standalone/standalone.go | 17 ++-- 14 files changed, 191 insertions(+), 74 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 01e29c7afda..2b1b4cefc21 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -53,6 +53,10 @@ type PodConfig struct { // the channel of denormalized changes passed to listeners updates chan kubelet.PodUpdate + + // contains the list of all configured sources + sourcesLock sync.Mutex + sources util.StringSet } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -64,6 +68,7 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { pods: storage, mux: config.NewMux(storage), updates: updates, + sources: util.StringSet{}, } return podConfig } @@ -71,9 +76,22 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { // Channel creates or returns a config source channel. The channel // only accepts PodUpdates func (c *PodConfig) Channel(source string) chan<- interface{} { + c.sourcesLock.Lock() + defer c.sourcesLock.Unlock() + c.sources.Insert(source) return c.mux.Channel(source) } +// SeenAllSources returns true if this config has received a SET +// message from all configured sources, false otherwise. +func (c *PodConfig) SeenAllSources() bool { + if c.pods == nil { + return false + } + glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) + return c.pods.seenSources(c.sources.List()...) +} + // Updates returns a channel of updates to the configuration, properly denormalized. func (c *PodConfig) Updates() <-chan kubelet.PodUpdate { return c.updates @@ -98,6 +116,10 @@ type podStorage struct { // on the updates channel updateLock sync.Mutex updates chan<- kubelet.PodUpdate + + // contains the set of all sources that have sent at least one SET + sourcesSeenLock sync.Mutex + sourcesSeen util.StringSet } // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel @@ -105,9 +127,10 @@ type podStorage struct { // TODO: allow initialization of the current state of the store with snapshotted version. func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { return &podStorage{ - pods: make(map[string]map[string]*api.BoundPod), - mode: mode, - updates: updates, + pods: make(map[string]map[string]*api.BoundPod), + mode: mode, + updates: updates, + sourcesSeen: util.StringSet{}, } } @@ -138,12 +161,12 @@ func (s *podStorage) Merge(source string, change interface{}) error { s.updates <- *updates } if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source} } case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source} } default: @@ -212,6 +235,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de case kubelet.SET: glog.V(4).Infof("Setting pods for source %s : %v", source, update) + s.markSourceSet(source) // Clear the old map entries by just creating a new map oldPods := pods pods = make(map[string]*api.BoundPod) @@ -254,6 +278,18 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de return adds, updates, deletes } +func (s *podStorage) markSourceSet(source string) { + s.sourcesSeenLock.Lock() + defer s.sourcesSeenLock.Unlock() + s.sourcesSeen.Insert(source) +} + +func (s *podStorage) seenSources(sources ...string) bool { + s.sourcesSeenLock.Lock() + defer s.sourcesSeenLock.Unlock() + return s.sourcesSeen.HasAll(sources...) +} + func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) { names := util.StringSet{} for i := range pods { @@ -280,7 +316,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun func (s *podStorage) Sync() { s.updateLock.Lock() defer s.updateLock.Unlock() - s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, kubelet.AllSource} } // Object implements config.Accessor diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 30921d814fe..c06f9e9e8ec 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -25,6 +25,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" ) +const ( + NoneSource = "" + TestSource = "test" +) + func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { select { case update := <-ch: @@ -58,23 +63,23 @@ func CreateValidPod(name, namespace, source string) api.BoundPod { } } -func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate { +func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPod) kubelet.PodUpdate { // We deliberately return an empty slice instead of a nil pointer here // because reflect.DeepEqual differentiates between the two and we need to // pick one for consistency. newPods := make([]api.BoundPod, len(pods)) if len(pods) == 0 { - return kubelet.PodUpdate{newPods, op} + return kubelet.PodUpdate{newPods, op, source} } for i := range pods { newPods[i] = pods[i] } - return kubelet.PodUpdate{newPods, op} + return kubelet.PodUpdate{newPods, op, source} } func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { config := NewPodConfig(mode) - channel := config.Channel("test") + channel := config.Channel(TestSource) ch := config.Updates() return channel, ch, config } @@ -102,63 +107,63 @@ func TestNewPodAdded(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) } func TestNewPodAddedInvalidNamespace(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "", "")) channel <- podUpdate config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource)) } func TestNewPodAddedDefaultNamespace(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"))) } func TestNewPodAddedDifferentNamespaces(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test"))) // see an update in another namespace - podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) } func TestInvalidPodFiltered(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) // add an invalid update - podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) channel <- podUpdate expectNoPodUpdate(t, ch) } @@ -167,45 +172,45 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} - channel <- CreatePodUpdate(kubelet.ADD, pod) - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) + channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) } func TestNewPodAddedSnapshot(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} - channel <- CreatePodUpdate(kubelet.ADD, pod) - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) + channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, pod)) } func TestNewPodAddedUpdatedRemoved(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) @@ -213,22 +218,22 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { // an kubelet.ADD should be converted to kubelet.UPDATE pod := CreateValidPod("foo", "new", "test") pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} - podUpdate = CreatePodUpdate(kubelet.ADD, pod) + podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, pod) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) - podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}}) + podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}}) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod)) } func TestNewPodAddedUpdatedSet(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) @@ -236,10 +241,10 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE pod := CreateValidPod("foo2", "new", "test") pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} - podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) + podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) channel <- podUpdate expectPodUpdate(t, ch, - CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")), - CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")), - CreatePodUpdate(kubelet.UPDATE, pod)) + CreatePodUpdate(kubelet.REMOVE, NoneSource, CreateValidPod("foo", "new", "test")), + CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")), + CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) } diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index f9bc23cf668..9bf8ecac5a5 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -79,7 +79,7 @@ func (s *sourceEtcd) run() { } glog.V(4).Infof("Received state from etcd watch: %+v", pods) - s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource} } } } diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 6f45ea5d945..8fcbfc3dceb 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -73,14 +73,14 @@ func (s *sourceFile) extractFromPath() error { if err != nil { return err } - s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.FileSource} case statInfo.Mode().IsRegular(): pod, err := extractFromFile(path) if err != nil { return err } - s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.FileSource} default: return fmt.Errorf("path is not a directory or file") diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 7e47e7d350e..c0642a936d6 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -119,7 +119,7 @@ func TestReadFromFile(t *testing.T) { select { case got := <-ch: update := got.(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, api.BoundPod{ + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{ ObjectMeta: api.ObjectMeta{ Name: simpleSubdomainSafeHash(file.Name()), UID: simpleSubdomainSafeHash(file.Name()), @@ -170,7 +170,7 @@ func TestExtractFromValidDataFile(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, expectedPod) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, expectedPod) if !reflect.DeepEqual(expected, update) { t.Errorf("Expected %#v, Got %#v", expected, update) } @@ -191,7 +191,7 @@ func TestExtractFromEmptyDir(t *testing.T) { } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource) if !reflect.DeepEqual(expected, update) { t.Errorf("Expected %#v, Got %#v", expected, update) } @@ -239,7 +239,7 @@ func TestExtractFromDir(t *testing.T) { } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, pods...) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, pods...) sort.Sort(sortedPods(update.Pods)) sort.Sort(sortedPods(expected.Pods)) if !reflect.DeepEqual(expected, update) { diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index b9694f3cfbc..79610caa25b 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -97,7 +97,7 @@ func (s *sourceURL) extractFromURL() error { if len(pod.Namespace) == 0 { pod.Namespace = api.NamespaceDefault } - s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource} return nil } @@ -138,7 +138,7 @@ func (s *sourceURL) extractFromURL() error { pod.Namespace = api.NamespaceDefault } } - s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET} + s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource} return nil } diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index c03e950e512..0592cd71ae6 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -124,6 +124,7 @@ func TestExtractFromHTTP(t *testing.T) { desc: "Single manifest", manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"}, expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, api.BoundPod{ ObjectMeta: api.ObjectMeta{ Name: "foo", @@ -141,6 +142,7 @@ func TestExtractFromHTTP(t *testing.T) { {Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, }, expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, api.BoundPod{ ObjectMeta: api.ObjectMeta{ Name: "1", @@ -169,7 +171,7 @@ func TestExtractFromHTTP(t *testing.T) { { desc: "Empty Array", manifests: []api.ContainerManifest{}, - expected: CreatePodUpdate(kubelet.SET), + expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource), }, } for _, testCase := range testCases { diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index b93279c1250..f51b811629a 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -76,7 +76,7 @@ func TestGetContainerID(t *testing.T) { t.Errorf("Failed to find container %#v", dockerContainer) } - fakeDocker.clearCalls() + fakeDocker.ClearCalls() dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "", "foo") verifyCalls(t, fakeDocker, []string{}) if dockerContainer != nil || found { diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 22fb88fa1d3..d41dcba754b 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -40,10 +40,14 @@ type FakeDockerClient struct { VersionInfo docker.Env } -func (f *FakeDockerClient) clearCalls() { +func (f *FakeDockerClient) ClearCalls() { f.Lock() defer f.Unlock() f.called = []string{} + f.Stopped = []string{} + f.pulled = []string{} + f.Created = []string{} + f.Removed = []string{} } func (f *FakeDockerClient) AssertCalls(calls []string) (err error) { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0b1123a7556..f075647de2d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -53,6 +53,8 @@ type SyncHandler interface { SyncPods([]api.BoundPod) error } +type SourcesReadyFn func() bool + type volumeMap map[string]volume.Interface // New creates a new Kubelet for use in main @@ -66,7 +68,8 @@ func NewMainKubelet( pullQPS float32, pullBurst int, minimumGCAge time.Duration, - maxContainerCount int) *Kubelet { + maxContainerCount int, + sourcesReady SourcesReadyFn) *Kubelet { return &Kubelet{ hostname: hn, dockerClient: dc, @@ -82,6 +85,7 @@ func NewMainKubelet( pullBurst: pullBurst, minimumGCAge: minimumGCAge, maxContainerCount: maxContainerCount, + sourcesReady: sourcesReady, } } @@ -112,6 +116,7 @@ type Kubelet struct { podWorkers *podWorkers resyncInterval time.Duration pods []api.BoundPod + sourcesReady SourcesReadyFn // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events @@ -907,7 +912,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { } }) } - + if !kl.sourcesReady() { + // If the sources aren't ready, skip deletion, as we may accidentally delete pods + // for sources that haven't reported yet. + glog.V(4).Infof("Skipping deletes, sources aren't ready yet.") + return nil + } // Kill any containers we don't need. for _, container := range dockerContainers { // Don't kill containers that are in the desired pods. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5dfa69df7b6..105e256be3f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -55,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" kubelet.podWorkers = newPodWorkers() + kubelet.sourcesReady = func() bool { return true } return kubelet, fakeEtcdClient, fakeDocker } @@ -513,6 +514,49 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { fakeDocker.Unlock() } +func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { + ready := false + kubelet, _, fakeDocker := newTestKubelet(t) + kubelet.sourcesReady = func() bool { return ready } + + fakeDocker.ContainerList = []docker.APIContainers{ + { + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s_foo_bar.new.test"}, + ID: "1234", + }, + { + // network container + Names: []string{"/k8s_net_foo.new.test_"}, + ID: "9876", + }, + } + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + t.Errorf("unexpected error: %v", err) + } + // Validate nothing happened. + verifyCalls(t, fakeDocker, []string{"list"}) + fakeDocker.ClearCalls() + + ready = true + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + t.Errorf("unexpected error: %v", err) + } + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"}) + + // A map iteration is used to delete containers, so must not depend on + // order here. + expectedToStop := map[string]bool{ + "1234": true, + "9876": true, + } + if len(fakeDocker.Stopped) != 2 || + !expectedToStop[fakeDocker.Stopped[0]] || + !expectedToStop[fakeDocker.Stopped[1]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + } +} + func TestSyncPodsDeletes(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 36122bbf284..7de667fd140 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -59,6 +59,7 @@ func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, WriteTimeout: 5 * time.Minute, MaxHeaderBytes: 1 << 20, } + updates <- PodUpdate{[]api.BoundPod{}, SET, ServerSource} glog.Fatal(s.ListenAndServe()) } @@ -143,7 +144,7 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) { if pod.UID == "" { pod.UID = "1" } - s.updates <- PodUpdate{[]api.BoundPod{pod}, SET} + s.updates <- PodUpdate{[]api.BoundPod{pod}, SET, ServerSource} } @@ -166,8 +167,7 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { pods[i].Name = fmt.Sprintf("%d", i+1) pods[i].Spec = specs[i] } - s.updates <- PodUpdate{pods, SET} - + s.updates <- PodUpdate{pods, SET, ServerSource} } // handleContainerLogs handles containerLogs request against the Kubelet diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index bb2d87f7d19..28787fb0ac1 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -36,6 +36,18 @@ const ( REMOVE // Pods with the given ids have been updated in this source UPDATE + + // These constants identify the sources of pods + // Updates from a file + FileSource = "file" + // Updates from etcd + EtcdSource = "etcd" + // Updates from querying a web page + HTTPSource = "http" + // Updates received to the kubelet server + ServerSource = "server" + // Updates from all sources + AllSource = "*" ) // PodUpdate defines an operation sent on the channel. You can add or remove single services by @@ -48,8 +60,9 @@ const ( // functionally similar, this helps our unit tests properly check that the correct PodUpdates // are generated. type PodUpdate struct { - Pods []api.BoundPod - Op PodOperation + Pods []api.BoundPod + Op PodOperation + Source string } // GetPodFullName returns a name that uniquely identifies a pod across all config sources. diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 7d2fcbd7bd5..19ca1708813 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -176,7 +176,7 @@ func RunKubelet(kcfg *KubeletConfig) { } cfg := makePodSourceConfig(kcfg) - k := createAndInitKubelet(kcfg) + k := createAndInitKubelet(kcfg, cfg) // process pods and exit. if kcfg.Runonce { if _, err := k.RunOnce(cfg.Updates()); err != nil { @@ -194,7 +194,7 @@ func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig) // start the kubelet server if kc.EnableServer { go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers) + kubelet.ListenAndServeKubeletServer(k, cfg.Channel(kubelet.ServerSource), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers) }, 0) } } @@ -205,17 +205,19 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // define file config source if kc.ConfigFile != "" { - config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel("file")) + glog.Infof("Adding manifest file: %v", kc.ConfigFile) + config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource)) } // define url config source if kc.ManifestURL != "" { - config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel("http")) + glog.Infof("Adding manifest url: %v", kc.ManifestURL) + config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } if kc.EtcdClient != nil { glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) - config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel("etcd")) + config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource)) } return cfg } @@ -247,7 +249,7 @@ type KubeletConfig struct { Runonce bool } -func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet { +func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kubelet { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations @@ -261,7 +263,8 @@ func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet { float32(kc.RegistryPullQPS), kc.RegistryBurst, kc.MinimumGCAge, - kc.MaxContainerCount) + kc.MaxContainerCount, + pc.SeenAllSources) k.BirthCry()