Split up kubelet "source seen" logic

This commit is contained in:
saadali 2015-01-12 21:47:49 -08:00
parent 3ca6c231b2
commit 110ab6f1bd
6 changed files with 141 additions and 26 deletions

View File

@ -55,10 +55,6 @@ type PodConfig struct {
// the channel of denormalized changes passed to listeners // the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate updates chan kubelet.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
} }
// NewPodConfig creates an object that can merge many configuration sources into a stream // NewPodConfig creates an object that can merge many configuration sources into a stream
@ -70,7 +66,6 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
pods: storage, pods: storage,
mux: config.NewMux(storage), mux: config.NewMux(storage),
updates: updates, updates: updates,
sources: util.StringSet{},
} }
return podConfig return podConfig
} }
@ -78,20 +73,15 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
// Channel creates or returns a config source channel. The channel // Channel creates or returns a config source channel. The channel
// only accepts PodUpdates // only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} { func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source) return c.mux.Channel(source)
} }
// SeenAllSources returns true if this config has received a SET func (c *PodConfig) SourceSeen(source string) bool {
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil { if c.pods == nil {
return false return false
} }
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...) return c.pods.seenSources(source)
} }
// Updates returns a channel of updates to the configuration, properly denormalized. // Updates returns a channel of updates to the configuration, properly denormalized.

View File

@ -56,7 +56,7 @@ type SyncHandler interface {
SyncPods([]api.BoundPod) error SyncPods([]api.BoundPod) error
} }
type SourcesReadyFn func() bool type SourceReadyFn func(source string) bool
type volumeMap map[string]volume.Interface type volumeMap map[string]volume.Interface
@ -73,7 +73,7 @@ func NewMainKubelet(
pullBurst int, pullBurst int,
minimumGCAge time.Duration, minimumGCAge time.Duration,
maxContainerCount int, maxContainerCount int,
sourcesReady SourcesReadyFn, sourceReady SourceReadyFn,
clusterDomain string, clusterDomain string,
clusterDNS net.IP) (*Kubelet, error) { clusterDNS net.IP) (*Kubelet, error) {
if resyncInterval <= 0 { if resyncInterval <= 0 {
@ -97,7 +97,7 @@ func NewMainKubelet(
pullBurst: pullBurst, pullBurst: pullBurst,
minimumGCAge: minimumGCAge, minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount, maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady, sourceReady: sourceReady,
clusterDomain: clusterDomain, clusterDomain: clusterDomain,
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
}, nil }, nil
@ -116,7 +116,7 @@ type Kubelet struct {
podWorkers *podWorkers podWorkers *podWorkers
resyncInterval time.Duration resyncInterval time.Duration
pods []api.BoundPod pods []api.BoundPod
sourcesReady SourcesReadyFn sourceReady SourceReadyFn
// Needed to report events for containers belonging to deleted/modified pods. // Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events // Tracks references for reporting events
@ -1062,12 +1062,6 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
} }
}) })
} }
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill any containers we don't need. // Kill any containers we don't need.
for _, container := range dockerContainers { for _, container := range dockerContainers {
// Don't kill containers that are in the desired pods. // Don't kill containers that are in the desired pods.
@ -1076,6 +1070,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
// syncPod() will handle this one. // syncPod() will handle this one.
continue continue
} }
_, _, podAnnotations := ParsePodFullName(podFullName)
if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) {
// If the source for this container is not ready, skip deletion, so that we don't accidentally
// delete containers for sources that haven't reported yet.
glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source)
continue
}
pc := podContainer{podFullName, uuid, containerName} pc := podContainer{podFullName, uuid, containerName}
if _, ok := desiredContainers[pc]; !ok { if _, ok := desiredContainers[pc]; !ok {
glog.V(1).Infof("Killing unwanted container %+v", pc) glog.V(1).Infof("Killing unwanted container %+v", pc)

View File

@ -57,7 +57,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet.etcdClient = fakeEtcdClient kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet" kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers() kubelet.podWorkers = newPodWorkers()
kubelet.sourcesReady = func() bool { return true } kubelet.sourceReady = func(source string) bool { return true }
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -617,7 +617,7 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false ready := false
kubelet, _, fakeDocker := newTestKubelet(t) kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.sourcesReady = func() bool { return ready } kubelet.sourceReady = func(source string) bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
@ -657,6 +657,67 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
} }
} }
func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ready := false
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool {
if source == "testSource" {
return ready
}
return false
}
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"},
ID: "7492",
},
{
// network container
Names: []string{"/k8s_net_boo.default.testSource_12345678_42"},
ID: "3542",
},
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s_net_foo.new.otherSource_12345678_42"},
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})
// Validate container for testSource are killed because testSource is reported as seen, but
// containers for otherSource are not killed because otherSource has not.
expectedToStop := map[string]bool{
"7492": true,
"3542": true,
"1234": false,
"9876": false,
}
if len(fakeDocker.Stopped) != 2 ||
!expectedToStop[fakeDocker.Stopped[0]] ||
!expectedToStop[fakeDocker.Stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}
func TestSyncPodsDeletes(t *testing.T) { func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t) kubelet, _, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{

View File

@ -18,8 +18,10 @@ package kubelet
import ( import (
"fmt" "fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
) )
const ConfigSourceAnnotationKey = "kubernetes.io/config.source" const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
@ -71,3 +73,20 @@ type PodUpdate struct {
func GetPodFullName(pod *api.BoundPod) string { func GetPodFullName(pod *api.BoundPod) string {
return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey]) return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
} }
// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations.
// If the pod full name is invalid, empty strings are returend.
func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) {
parts := strings.Split(podFullName, ".")
expectedNumFields := 3
actualNumFields := len(parts)
if actualNumFields != expectedNumFields {
glog.Warningf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields)
return
}
podName = parts[0]
podNamespace = parts[1]
podAnnotations = make(map[string]string)
podAnnotations[ConfigSourceAnnotationKey] = parts[2]
return
}

44
pkg/kubelet/types_test.go Normal file
View File

@ -0,0 +1,44 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
)
func TestParsePodFullName(t *testing.T) {
// Arrange
podFullName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1.default.etcd"
// Act
podName, podNamespace, podAnnotations := ParsePodFullName(podFullName)
// Assert
expectedPodName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1"
expectedPodNamespace := "default"
expectedSource := "etcd"
if podName != expectedPodName {
t.Errorf("Unexpected PodName. Expected: %q Actual: %q", expectedPodName, podName)
}
if podNamespace != expectedPodNamespace {
t.Errorf("Unexpected PodNamespace. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
}
if podAnnotations[ConfigSourceAnnotationKey] != expectedSource {
t.Errorf("Unexpected PodSource. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
}
}

View File

@ -277,7 +277,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.RegistryBurst, kc.RegistryBurst,
kc.MinimumGCAge, kc.MinimumGCAge,
kc.MaxContainerCount, kc.MaxContainerCount,
pc.SeenAllSources, pc.SourceSeen,
kc.ClusterDomain, kc.ClusterDomain,
net.IP(kc.ClusterDNS)) net.IP(kc.ClusterDNS))