Merge pull request #2994 from brendandburns/exec

Track the sources that the kubelet has seen
This commit is contained in:
Dawn Chen 2014-12-17 16:17:48 -08:00
commit edfae8660e
14 changed files with 191 additions and 74 deletions

View File

@ -53,6 +53,10 @@ type PodConfig struct {
// the channel of denormalized changes passed to listeners // the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate updates chan kubelet.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
} }
// NewPodConfig creates an object that can merge many configuration sources into a stream // NewPodConfig creates an object that can merge many configuration sources into a stream
@ -64,6 +68,7 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
pods: storage, pods: storage,
mux: config.NewMux(storage), mux: config.NewMux(storage),
updates: updates, updates: updates,
sources: util.StringSet{},
} }
return podConfig return podConfig
} }
@ -71,9 +76,22 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
// Channel creates or returns a config source channel. The channel // Channel creates or returns a config source channel. The channel
// only accepts PodUpdates // only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} { func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source) return c.mux.Channel(source)
} }
// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
}
// Updates returns a channel of updates to the configuration, properly denormalized. // Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubelet.PodUpdate { func (c *PodConfig) Updates() <-chan kubelet.PodUpdate {
return c.updates return c.updates
@ -98,6 +116,10 @@ type podStorage struct {
// on the updates channel // on the updates channel
updateLock sync.Mutex updateLock sync.Mutex
updates chan<- kubelet.PodUpdate updates chan<- kubelet.PodUpdate
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.Mutex
sourcesSeen util.StringSet
} }
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
@ -105,9 +127,10 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version. // TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{ return &podStorage{
pods: make(map[string]map[string]*api.BoundPod), pods: make(map[string]map[string]*api.BoundPod),
mode: mode, mode: mode,
updates: updates, updates: updates,
sourcesSeen: util.StringSet{},
} }
} }
@ -138,12 +161,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates s.updates <- *updates
} }
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
} }
case PodConfigNotificationSnapshot: case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
} }
default: default:
@ -212,6 +235,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubelet.SET: case kubelet.SET:
glog.V(4).Infof("Setting pods for source %s : %v", source, update) glog.V(4).Infof("Setting pods for source %s : %v", source, update)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
oldPods := pods oldPods := pods
pods = make(map[string]*api.BoundPod) pods = make(map[string]*api.BoundPod)
@ -254,6 +278,18 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
return adds, updates, deletes return adds, updates, deletes
} }
func (s *podStorage) markSourceSet(source string) {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
s.sourcesSeen.Insert(source)
}
func (s *podStorage) seenSources(sources ...string) bool {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
return s.sourcesSeen.HasAll(sources...)
}
func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) { func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
names := util.StringSet{} names := util.StringSet{}
for i := range pods { for i := range pods {
@ -280,7 +316,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun
func (s *podStorage) Sync() { func (s *podStorage) Sync() {
s.updateLock.Lock() s.updateLock.Lock()
defer s.updateLock.Unlock() defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, kubelet.AllSource}
} }
// Object implements config.Accessor // Object implements config.Accessor

View File

@ -25,6 +25,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
) )
const (
NoneSource = ""
TestSource = "test"
)
func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
select { select {
case update := <-ch: case update := <-ch:
@ -58,23 +63,23 @@ func CreateValidPod(name, namespace, source string) api.BoundPod {
} }
} }
func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate { func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPod) kubelet.PodUpdate {
// We deliberately return an empty slice instead of a nil pointer here // We deliberately return an empty slice instead of a nil pointer here
// because reflect.DeepEqual differentiates between the two and we need to // because reflect.DeepEqual differentiates between the two and we need to
// pick one for consistency. // pick one for consistency.
newPods := make([]api.BoundPod, len(pods)) newPods := make([]api.BoundPod, len(pods))
if len(pods) == 0 { if len(pods) == 0 {
return kubelet.PodUpdate{newPods, op} return kubelet.PodUpdate{newPods, op, source}
} }
for i := range pods { for i := range pods {
newPods[i] = pods[i] newPods[i] = pods[i]
} }
return kubelet.PodUpdate{newPods, op} return kubelet.PodUpdate{newPods, op, source}
} }
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) {
config := NewPodConfig(mode) config := NewPodConfig(mode)
channel := config.Channel("test") channel := config.Channel(TestSource)
ch := config.Updates() ch := config.Updates()
return channel, ch, config return channel, ch, config
} }
@ -102,63 +107,63 @@ func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update // see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
} }
func TestNewPodAddedInvalidNamespace(t *testing.T) { func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update // see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "", ""))
channel <- podUpdate channel <- podUpdate
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET)) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource))
} }
func TestNewPodAddedDefaultNamespace(t *testing.T) { func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update // see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test")))
} }
func TestNewPodAddedDifferentNamespaces(t *testing.T) { func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update // see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))
// see an update in another namespace // see an update in another namespace
podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
} }
func TestInvalidPodFiltered(t *testing.T) { func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// see an update // see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
// add an invalid update // add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
channel <- podUpdate channel <- podUpdate
expectNoPodUpdate(t, ch) expectNoPodUpdate(t, ch)
} }
@ -167,45 +172,45 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
// see an set // see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE // container updates are separated as UPDATE
pod := podUpdate.Pods[0] pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod) channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
} }
func TestNewPodAddedSnapshot(t *testing.T) { func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
// see an set // see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))
config.Sync() config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
// container updates are separated as UPDATE // container updates are separated as UPDATE
pod := podUpdate.Pods[0] pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod) channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, pod))
} }
func TestNewPodAddedUpdatedRemoved(t *testing.T) { func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add // should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))
// should ignore ADDs that are identical // should ignore ADDs that are identical
expectNoPodUpdate(t, ch) expectNoPodUpdate(t, ch)
@ -213,22 +218,22 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
// an kubelet.ADD should be converted to kubelet.UPDATE // an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "new", "test") pod := CreateValidPod("foo", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod) podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, pod)
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}}) podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod))
} }
func TestNewPodAddedUpdatedSet(t *testing.T) { func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add // should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
// should ignore ADDs that are identical // should ignore ADDs that are identical
expectNoPodUpdate(t, ch) expectNoPodUpdate(t, ch)
@ -236,10 +241,10 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "new", "test") pod := CreateValidPod("foo2", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
channel <- podUpdate channel <- podUpdate
expectPodUpdate(t, ch, expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")), CreatePodUpdate(kubelet.REMOVE, NoneSource, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")), CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, pod)) CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
} }

View File

@ -79,7 +79,7 @@ func (s *sourceEtcd) run() {
} }
glog.V(4).Infof("Received state from etcd watch: %+v", pods) glog.V(4).Infof("Received state from etcd watch: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET} s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource}
} }
} }
} }

View File

@ -73,14 +73,14 @@ func (s *sourceFile) extractFromPath() error {
if err != nil { if err != nil {
return err return err
} }
s.updates <- kubelet.PodUpdate{pods, kubelet.SET} s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.FileSource}
case statInfo.Mode().IsRegular(): case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path) pod, err := extractFromFile(path)
if err != nil { if err != nil {
return err return err
} }
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.FileSource}
default: default:
return fmt.Errorf("path is not a directory or file") return fmt.Errorf("path is not a directory or file")

View File

@ -119,7 +119,7 @@ func TestReadFromFile(t *testing.T) {
select { select {
case got := <-ch: case got := <-ch:
update := got.(kubelet.PodUpdate) update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, api.BoundPod{ expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: simpleSubdomainSafeHash(file.Name()), Name: simpleSubdomainSafeHash(file.Name()),
UID: simpleSubdomainSafeHash(file.Name()), UID: simpleSubdomainSafeHash(file.Name()),
@ -170,7 +170,7 @@ func TestExtractFromValidDataFile(t *testing.T) {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
update := (<-ch).(kubelet.PodUpdate) update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, expectedPod) expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, expectedPod)
if !reflect.DeepEqual(expected, update) { if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update) t.Errorf("Expected %#v, Got %#v", expected, update)
} }
@ -191,7 +191,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
} }
update := (<-ch).(kubelet.PodUpdate) update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET) expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource)
if !reflect.DeepEqual(expected, update) { if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update) t.Errorf("Expected %#v, Got %#v", expected, update)
} }
@ -239,7 +239,7 @@ func TestExtractFromDir(t *testing.T) {
} }
update := (<-ch).(kubelet.PodUpdate) update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, pods...) expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, pods...)
sort.Sort(sortedPods(update.Pods)) sort.Sort(sortedPods(update.Pods))
sort.Sort(sortedPods(expected.Pods)) sort.Sort(sortedPods(expected.Pods))
if !reflect.DeepEqual(expected, update) { if !reflect.DeepEqual(expected, update) {

View File

@ -97,7 +97,7 @@ func (s *sourceURL) extractFromURL() error {
if len(pod.Namespace) == 0 { if len(pod.Namespace) == 0 {
pod.Namespace = api.NamespaceDefault pod.Namespace = api.NamespaceDefault
} }
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource}
return nil return nil
} }
@ -138,7 +138,7 @@ func (s *sourceURL) extractFromURL() error {
pod.Namespace = api.NamespaceDefault pod.Namespace = api.NamespaceDefault
} }
} }
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET} s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource}
return nil return nil
} }

View File

@ -124,6 +124,7 @@ func TestExtractFromHTTP(t *testing.T) {
desc: "Single manifest", desc: "Single manifest",
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"}, manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"},
expected: CreatePodUpdate(kubelet.SET, expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{ api.BoundPod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
@ -141,6 +142,7 @@ func TestExtractFromHTTP(t *testing.T) {
{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, {Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}},
}, },
expected: CreatePodUpdate(kubelet.SET, expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{ api.BoundPod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "1", Name: "1",
@ -169,7 +171,7 @@ func TestExtractFromHTTP(t *testing.T) {
{ {
desc: "Empty Array", desc: "Empty Array",
manifests: []api.ContainerManifest{}, manifests: []api.ContainerManifest{},
expected: CreatePodUpdate(kubelet.SET), expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {

View File

@ -76,7 +76,7 @@ func TestGetContainerID(t *testing.T) {
t.Errorf("Failed to find container %#v", dockerContainer) t.Errorf("Failed to find container %#v", dockerContainer)
} }
fakeDocker.clearCalls() fakeDocker.ClearCalls()
dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "", "foo") dockerContainer, found, _ = dockerContainers.FindPodContainer("foobar", "", "foo")
verifyCalls(t, fakeDocker, []string{}) verifyCalls(t, fakeDocker, []string{})
if dockerContainer != nil || found { if dockerContainer != nil || found {

View File

@ -40,10 +40,14 @@ type FakeDockerClient struct {
VersionInfo docker.Env VersionInfo docker.Env
} }
func (f *FakeDockerClient) clearCalls() { func (f *FakeDockerClient) ClearCalls() {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.called = []string{} f.called = []string{}
f.Stopped = []string{}
f.pulled = []string{}
f.Created = []string{}
f.Removed = []string{}
} }
func (f *FakeDockerClient) AssertCalls(calls []string) (err error) { func (f *FakeDockerClient) AssertCalls(calls []string) (err error) {

View File

@ -53,6 +53,8 @@ type SyncHandler interface {
SyncPods([]api.BoundPod) error SyncPods([]api.BoundPod) error
} }
type SourcesReadyFn func() bool
type volumeMap map[string]volume.Interface type volumeMap map[string]volume.Interface
// New creates a new Kubelet for use in main // New creates a new Kubelet for use in main
@ -66,7 +68,8 @@ func NewMainKubelet(
pullQPS float32, pullQPS float32,
pullBurst int, pullBurst int,
minimumGCAge time.Duration, minimumGCAge time.Duration,
maxContainerCount int) *Kubelet { maxContainerCount int,
sourcesReady SourcesReadyFn) *Kubelet {
return &Kubelet{ return &Kubelet{
hostname: hn, hostname: hn,
dockerClient: dc, dockerClient: dc,
@ -82,6 +85,7 @@ func NewMainKubelet(
pullBurst: pullBurst, pullBurst: pullBurst,
minimumGCAge: minimumGCAge, minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount, maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady,
} }
} }
@ -112,6 +116,7 @@ type Kubelet struct {
podWorkers *podWorkers podWorkers *podWorkers
resyncInterval time.Duration resyncInterval time.Duration
pods []api.BoundPod pods []api.BoundPod
sourcesReady SourcesReadyFn
// Needed to report events for containers belonging to deleted/modified pods. // Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events // Tracks references for reporting events
@ -907,7 +912,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
} }
}) })
} }
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill any containers we don't need. // Kill any containers we don't need.
for _, container := range dockerContainers { for _, container := range dockerContainers {
// Don't kill containers that are in the desired pods. // Don't kill containers that are in the desired pods.

View File

@ -55,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet.etcdClient = fakeEtcdClient kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet" kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers() kubelet.podWorkers = newPodWorkers()
kubelet.sourcesReady = func() bool { return true }
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -513,6 +514,49 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
fakeDocker.Unlock() fakeDocker.Unlock()
} }
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.sourcesReady = func() bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.new.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s_net_foo.new.test_"},
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})
// A map iteration is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
"9876": true,
}
if len(fakeDocker.Stopped) != 2 ||
!expectedToStop[fakeDocker.Stopped[0]] ||
!expectedToStop[fakeDocker.Stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}
func TestSyncPodsDeletes(t *testing.T) { func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t) kubelet, _, fakeDocker := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{

View File

@ -59,6 +59,7 @@ func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{},
WriteTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20, MaxHeaderBytes: 1 << 20,
} }
updates <- PodUpdate{[]api.BoundPod{}, SET, ServerSource}
glog.Fatal(s.ListenAndServe()) glog.Fatal(s.ListenAndServe())
} }
@ -143,7 +144,7 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
if pod.UID == "" { if pod.UID == "" {
pod.UID = "1" pod.UID = "1"
} }
s.updates <- PodUpdate{[]api.BoundPod{pod}, SET} s.updates <- PodUpdate{[]api.BoundPod{pod}, SET, ServerSource}
} }
@ -166,8 +167,7 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
pods[i].Name = fmt.Sprintf("%d", i+1) pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Spec = specs[i] pods[i].Spec = specs[i]
} }
s.updates <- PodUpdate{pods, SET} s.updates <- PodUpdate{pods, SET, ServerSource}
} }
// handleContainerLogs handles containerLogs request against the Kubelet // handleContainerLogs handles containerLogs request against the Kubelet

View File

@ -36,6 +36,18 @@ const (
REMOVE REMOVE
// Pods with the given ids have been updated in this source // Pods with the given ids have been updated in this source
UPDATE UPDATE
// These constants identify the sources of pods
// Updates from a file
FileSource = "file"
// Updates from etcd
EtcdSource = "etcd"
// Updates from querying a web page
HTTPSource = "http"
// Updates received to the kubelet server
ServerSource = "server"
// Updates from all sources
AllSource = "*"
) )
// PodUpdate defines an operation sent on the channel. You can add or remove single services by // PodUpdate defines an operation sent on the channel. You can add or remove single services by
@ -48,8 +60,9 @@ const (
// functionally similar, this helps our unit tests properly check that the correct PodUpdates // functionally similar, this helps our unit tests properly check that the correct PodUpdates
// are generated. // are generated.
type PodUpdate struct { type PodUpdate struct {
Pods []api.BoundPod Pods []api.BoundPod
Op PodOperation Op PodOperation
Source string
} }
// GetPodFullName returns a name that uniquely identifies a pod across all config sources. // GetPodFullName returns a name that uniquely identifies a pod across all config sources.

View File

@ -176,7 +176,7 @@ func RunKubelet(kcfg *KubeletConfig) {
} }
cfg := makePodSourceConfig(kcfg) cfg := makePodSourceConfig(kcfg)
k := createAndInitKubelet(kcfg) k := createAndInitKubelet(kcfg, cfg)
// process pods and exit. // process pods and exit.
if kcfg.Runonce { if kcfg.Runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil { if _, err := k.RunOnce(cfg.Updates()); err != nil {
@ -194,7 +194,7 @@ func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig)
// start the kubelet server // start the kubelet server
if kc.EnableServer { if kc.EnableServer {
go util.Forever(func() { go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers) kubelet.ListenAndServeKubeletServer(k, cfg.Channel(kubelet.ServerSource), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0) }, 0)
} }
} }
@ -205,17 +205,19 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// define file config source // define file config source
if kc.ConfigFile != "" { if kc.ConfigFile != "" {
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel("file")) glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
} }
// define url config source // define url config source
if kc.ManifestURL != "" { if kc.ManifestURL != "" {
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel("http")) glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource))
} }
if kc.EtcdClient != nil { if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel("etcd")) config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
} }
return cfg return cfg
} }
@ -247,7 +249,7 @@ type KubeletConfig struct {
Runonce bool Runonce bool
} }
func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet { func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kubelet {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations // up into "per source" synchronizations
@ -261,7 +263,8 @@ func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet {
float32(kc.RegistryPullQPS), float32(kc.RegistryPullQPS),
kc.RegistryBurst, kc.RegistryBurst,
kc.MinimumGCAge, kc.MinimumGCAge,
kc.MaxContainerCount) kc.MaxContainerCount,
pc.SeenAllSources)
k.BirthCry() k.BirthCry()