diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 261eed87b33..e4460558220 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -55,10 +55,6 @@ type PodConfig struct { // the channel of denormalized changes passed to listeners updates chan kubelet.PodUpdate - - // contains the list of all configured sources - sourcesLock sync.Mutex - sources util.StringSet } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -70,7 +66,6 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { pods: storage, mux: config.NewMux(storage), updates: updates, - sources: util.StringSet{}, } return podConfig } @@ -78,20 +73,15 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { // Channel creates or returns a config source channel. The channel // only accepts PodUpdates func (c *PodConfig) Channel(source string) chan<- interface{} { - c.sourcesLock.Lock() - defer c.sourcesLock.Unlock() - c.sources.Insert(source) return c.mux.Channel(source) } -// SeenAllSources returns true if this config has received a SET -// message from all configured sources, false otherwise. -func (c *PodConfig) SeenAllSources() bool { +func (c *PodConfig) SourceSeen(source string) bool { if c.pods == nil { return false } - glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) - return c.pods.seenSources(c.sources.List()...) + glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen) + return c.pods.seenSources(source) } // Updates returns a channel of updates to the configuration, properly denormalized. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 229bd230037..9929ba7c2e0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -56,7 +56,7 @@ type SyncHandler interface { SyncPods([]api.BoundPod) error } -type SourcesReadyFn func() bool +type SourceReadyFn func(source string) bool type volumeMap map[string]volume.Interface @@ -73,7 +73,7 @@ func NewMainKubelet( pullBurst int, minimumGCAge time.Duration, maxContainerCount int, - sourcesReady SourcesReadyFn, + sourceReady SourceReadyFn, clusterDomain string, clusterDNS net.IP) (*Kubelet, error) { if resyncInterval <= 0 { @@ -97,7 +97,7 @@ func NewMainKubelet( pullBurst: pullBurst, minimumGCAge: minimumGCAge, maxContainerCount: maxContainerCount, - sourcesReady: sourcesReady, + sourceReady: sourceReady, clusterDomain: clusterDomain, clusterDNS: clusterDNS, }, nil @@ -116,7 +116,7 @@ type Kubelet struct { podWorkers *podWorkers resyncInterval time.Duration pods []api.BoundPod - sourcesReady SourcesReadyFn + sourceReady SourceReadyFn // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events @@ -1062,12 +1062,6 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { } }) } - if !kl.sourcesReady() { - // If the sources aren't ready, skip deletion, as we may accidentally delete pods - // for sources that haven't reported yet. - glog.V(4).Infof("Skipping deletes, sources aren't ready yet.") - return nil - } // Kill any containers we don't need. for _, container := range dockerContainers { // Don't kill containers that are in the desired pods. @@ -1076,6 +1070,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { // syncPod() will handle this one. continue } + _, _, podAnnotations := ParsePodFullName(podFullName) + if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) { + // If the source for this container is not ready, skip deletion, so that we don't accidentally + // delete containers for sources that haven't reported yet. + glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source) + continue + } pc := podContainer{podFullName, uuid, containerName} if _, ok := desiredContainers[pc]; !ok { glog.V(1).Infof("Killing unwanted container %+v", pc) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5e28360af61..e7babf7fd3a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -57,7 +57,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" kubelet.podWorkers = newPodWorkers() - kubelet.sourcesReady = func() bool { return true } + kubelet.sourceReady = func(source string) bool { return true } return kubelet, fakeEtcdClient, fakeDocker } @@ -617,7 +617,7 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false kubelet, _, fakeDocker := newTestKubelet(t) - kubelet.sourcesReady = func() bool { return ready } + kubelet.sourceReady = func(source string) bool { return ready } fakeDocker.ContainerList = []docker.APIContainers{ { @@ -657,6 +657,67 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { } } +func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { + ready := false + kubelet, _, fakeDocker := newTestKubelet(t) + kubelet.sourceReady = func(source string) bool { + if source == "testSource" { + return ready + } + return false + } + + fakeDocker.ContainerList = []docker.APIContainers{ + { + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"}, + ID: "7492", + }, + { + // network container + Names: []string{"/k8s_net_boo.default.testSource_12345678_42"}, + ID: "3542", + }, + + { + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"}, + ID: "1234", + }, + { + // network container + Names: []string{"/k8s_net_foo.new.otherSource_12345678_42"}, + ID: "9876", + }, + } + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + t.Errorf("unexpected error: %v", err) + } + // Validate nothing happened. + verifyCalls(t, fakeDocker, []string{"list"}) + fakeDocker.ClearCalls() + + ready = true + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { + t.Errorf("unexpected error: %v", err) + } + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"}) + + // Validate container for testSource are killed because testSource is reported as seen, but + // containers for otherSource are not killed because otherSource has not. + expectedToStop := map[string]bool{ + "7492": true, + "3542": true, + "1234": false, + "9876": false, + } + if len(fakeDocker.Stopped) != 2 || + !expectedToStop[fakeDocker.Stopped[0]] || + !expectedToStop[fakeDocker.Stopped[1]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + } +} + func TestSyncPodsDeletes(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 211e32a9227..884380ce801 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -18,8 +18,10 @@ package kubelet import ( "fmt" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" ) const ConfigSourceAnnotationKey = "kubernetes.io/config.source" @@ -71,3 +73,20 @@ type PodUpdate struct { func GetPodFullName(pod *api.BoundPod) string { return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey]) } + +// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations. +// If the pod full name is invalid, empty strings are returend. +func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) { + parts := strings.Split(podFullName, ".") + expectedNumFields := 3 + actualNumFields := len(parts) + if actualNumFields != expectedNumFields { + glog.Warningf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields) + return + } + podName = parts[0] + podNamespace = parts[1] + podAnnotations = make(map[string]string) + podAnnotations[ConfigSourceAnnotationKey] = parts[2] + return +} diff --git a/pkg/kubelet/types_test.go b/pkg/kubelet/types_test.go new file mode 100644 index 00000000000..b655aaf411e --- /dev/null +++ b/pkg/kubelet/types_test.go @@ -0,0 +1,44 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" +) + +func TestParsePodFullName(t *testing.T) { + // Arrange + podFullName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1.default.etcd" + + // Act + podName, podNamespace, podAnnotations := ParsePodFullName(podFullName) + + // Assert + expectedPodName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1" + expectedPodNamespace := "default" + expectedSource := "etcd" + if podName != expectedPodName { + t.Errorf("Unexpected PodName. Expected: %q Actual: %q", expectedPodName, podName) + } + if podNamespace != expectedPodNamespace { + t.Errorf("Unexpected PodNamespace. Expected: %q Actual: %q", expectedPodNamespace, podNamespace) + } + if podAnnotations[ConfigSourceAnnotationKey] != expectedSource { + t.Errorf("Unexpected PodSource. Expected: %q Actual: %q", expectedPodNamespace, podNamespace) + } + +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 1df2948316a..012de76669e 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -277,7 +277,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.RegistryBurst, kc.MinimumGCAge, kc.MaxContainerCount, - pc.SeenAllSources, + pc.SourceSeen, kc.ClusterDomain, net.IP(kc.ClusterDNS))