Revert "Split up kubelet "source seen" logic"

We want to sync pods from file/http/etcd sources to the apiserver, hence
differentiating sources is no longer desired.

This reverts commit 110ab6f1bd.
This commit is contained in:
Yu-Ju Hong 2015-03-05 10:49:36 -08:00
parent 2d0743b143
commit 32fd331e73
6 changed files with 26 additions and 144 deletions

View File

@ -402,7 +402,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount,
pc.IsSourceSeen,
pc.SeenAllSources,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,

View File

@ -55,6 +55,10 @@ type PodConfig struct {
// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
@ -66,6 +70,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: util.StringSet{},
}
return podConfig
}
@ -73,17 +78,20 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
// IsSourceSeen returns true if the specified source string has previously
// been marked as seen.
func (c *PodConfig) IsSourceSeen(source string) bool {
// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen)
return c.pods.seenSources(source)
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
}
// Updates returns a channel of updates to the configuration, properly denormalized.

View File

@ -67,7 +67,7 @@ type SyncHandler interface {
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
}
type SourceReadyFn func(source string) bool
type SourcesReadyFn func() bool
type volumeMap map[string]volume.Interface
@ -84,7 +84,7 @@ func NewMainKubelet(
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int,
sourceReady SourceReadyFn,
sourcesReady SourcesReadyFn,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
@ -128,7 +128,7 @@ func NewMainKubelet(
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourceReady: sourceReady,
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
@ -178,7 +178,7 @@ type Kubelet struct {
podInfraContainerImage string
podWorkers *podWorkers
resyncInterval time.Duration
sourceReady SourceReadyFn
sourcesReady SourcesReadyFn
// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
@ -1395,10 +1395,15 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill any containers we don't need.
killed := []string{}
for ix := range dockerContainers {
@ -1408,13 +1413,6 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m
// syncPod() will handle this one.
continue
}
_, _, podAnnotations := ParsePodFullName(podFullName)
if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) {
// If the source for this container is not ready, skip deletion, so that we don't accidentally
// delete containers for sources that haven't reported yet.
glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source)
continue
}
pc := podContainer{podFullName, uid, containerName}
if _, ok := desiredContainers[pc]; !ok {
glog.V(1).Infof("Killing unwanted container %+v", pc)

View File

@ -78,7 +78,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
return err
},
recorder)
kubelet.sourceReady = func(source string) bool { return true }
kubelet.sourcesReady = func() bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
@ -722,7 +722,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool { return ready }
kubelet.sourcesReady = func() bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{
{
@ -762,67 +762,6 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
}
}
func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool {
if source == "testSource" {
return ready
}
return false
}
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"},
ID: "7492",
},
{
// pod infra container
Names: []string{"/k8s_POD_boo.default.testSource_12345678_42"},
ID: "3542",
},
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"},
ID: "1234",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo.new.otherSource_12345678_42"},
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
// Validate container for testSource are killed because testSource is reported as seen, but
// containers for otherSource are not killed because otherSource has not.
expectedToStop := map[string]bool{
"7492": true,
"3542": true,
"1234": false,
"9876": false,
}
if len(fakeDocker.Stopped) != 2 ||
!expectedToStop[fakeDocker.Stopped[0]] ||
!expectedToStop[fakeDocker.Stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}
func TestSyncPodsDeletes(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{

View File

@ -18,10 +18,8 @@ package kubelet
import (
"fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
@ -73,20 +71,3 @@ type PodUpdate struct {
func GetPodFullName(pod *api.BoundPod) string {
return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
}
// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations.
// If the pod full name is invalid, empty strings are returend.
func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) {
parts := strings.Split(podFullName, ".")
expectedNumFields := 3
actualNumFields := len(parts)
if actualNumFields != expectedNumFields {
glog.Errorf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields)
return
}
podName = parts[0]
podNamespace = parts[1]
podAnnotations = make(map[string]string)
podAnnotations[ConfigSourceAnnotationKey] = parts[2]
return
}

View File

@ -1,44 +0,0 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
)
func TestParsePodFullName(t *testing.T) {
// Arrange
podFullName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1.default.etcd"
// Act
podName, podNamespace, podAnnotations := ParsePodFullName(podFullName)
// Assert
expectedPodName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1"
expectedPodNamespace := "default"
expectedSource := "etcd"
if podName != expectedPodName {
t.Errorf("Unexpected PodName. Expected: %q Actual: %q", expectedPodName, podName)
}
if podNamespace != expectedPodNamespace {
t.Errorf("Unexpected PodNamespace. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
}
if podAnnotations[ConfigSourceAnnotationKey] != expectedSource {
t.Errorf("Unexpected PodSource. Expected: %q Actual: %q", expectedPodNamespace, podNamespace)
}
}