Merge pull request #4804 from wojtek-t/separate_pod_workers

Thread-per-pod model in Kubelet.
This commit is contained in:
Victor Marmol 2015-02-26 09:22:20 -08:00
commit e41d0bd544
3 changed files with 184 additions and 119 deletions

View File

@ -111,7 +111,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
resyncInterval: resyncInterval, resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage, podInfraContainerImage: podInfraContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient), runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{}, httpClient: &http.Client{},
@ -134,6 +133,7 @@ func NewMainKubelet(
return nil, err return nil, err
} }
klet.dockerCache = dockerCache klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod)
metrics.Register(dockerCache) metrics.Register(dockerCache)
@ -453,43 +453,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
} }
// Per-pod workers.
type podWorkers struct {
lock sync.Mutex
// Set of pods with existing workers.
workers util.StringSet
}
func newPodWorkers() *podWorkers {
return &podWorkers{
workers: util.NewStringSet(),
}
}
// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()
// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)
// Run worker async.
go func() {
defer util.HandleCrash()
action()
self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{} binds := []string{}
for _, mount := range container.VolumeMounts { for _, mount := range container.VolumeMounts {
@ -1333,13 +1296,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
} }
// Run the sync in an async manifest worker. // Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() { kl.podWorkers.UpdatePod(*pod)
if err := kl.syncPod(pod, dockerContainers); err != nil {
glog.Errorf("Error syncing pod, skipping: %v", err)
record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err)
}
})
} }
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
// Kill any containers we don't need. // Kill any containers we don't need.
killed := []string{} killed := []string{}
for ix := range dockerContainers { for ix := range dockerContainers {

View File

@ -48,14 +48,15 @@ func init() {
util.ReallyCrash = true util.ReallyCrash = true
} }
func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) {
fakeDocker := &dockertools.FakeDockerClient{ fakeDocker := &dockertools.FakeDockerClient{
RemovedImages: util.StringSet{}, RemovedImages: util.StringSet{},
} }
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker)
kubelet := &Kubelet{} kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker kubelet.dockerClient = fakeDocker
kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker) kubelet.dockerCache = fakeDockerCache
kubelet.dockerPuller = &dockertools.FakeDockerPuller{} kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err) t.Fatalf("can't make a temp rootdir: %v", err)
@ -65,7 +66,14 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) {
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
} }
kubelet.podWorkers = newPodWorkers() waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers(
fakeDockerCache,
func(pod *api.BoundPod, containers dockertools.DockerContainers) error {
err := kubelet.syncPod(pod, containers)
waitGroup.Done()
return err
})
kubelet.sourceReady = func(source string) bool { return true } kubelet.sourceReady = func(source string) bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{} kubelet.serviceLister = testServiceLister{}
@ -74,7 +82,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) {
t.Fatalf("can't initialize kubelet data dirs: %v", err) t.Fatalf("can't initialize kubelet data dirs: %v", err)
} }
return kubelet, fakeDocker return kubelet, fakeDocker, waitGroup
} }
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
@ -126,7 +134,7 @@ func verifyBoolean(t *testing.T, expected, value bool) {
} }
func TestKubeletDirs(t *testing.T) { func TestKubeletDirs(t *testing.T) {
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
root := kubelet.rootDirectory root := kubelet.rootDirectory
var exp, got string var exp, got string
@ -187,7 +195,7 @@ func TestKubeletDirs(t *testing.T) {
} }
func TestKubeletDirsCompat(t *testing.T) { func TestKubeletDirsCompat(t *testing.T) {
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
root := kubelet.rootDirectory root := kubelet.rootDirectory
if err := os.MkdirAll(root, 0750); err != nil { if err := os.MkdirAll(root, 0750); err != nil {
t.Fatalf("can't mkdir(%q): %s", root, err) t.Fatalf("can't mkdir(%q): %s", root, err)
@ -293,7 +301,7 @@ func TestKillContainerWithError(t *testing.T) {
Err: fmt.Errorf("sample error"), Err: fmt.Errorf("sample error"),
ContainerList: append([]docker.APIContainers{}, containers...), ContainerList: append([]docker.APIContainers{}, containers...),
} }
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
for _, c := range fakeDocker.ContainerList { for _, c := range fakeDocker.ContainerList {
kubelet.readiness.set(c.ID, true) kubelet.readiness.set(c.ID, true)
} }
@ -324,7 +332,7 @@ func TestKillContainer(t *testing.T) {
Names: []string{"/k8s_bar_qux_5678_42"}, Names: []string{"/k8s_bar_qux_5678_42"},
}, },
} }
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...) fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...)
fakeDocker.Container = &docker.Container{ fakeDocker.Container = &docker.Container{
Name: "foobar", Name: "foobar",
@ -375,7 +383,7 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
} }
func TestSyncPodsDoesNothing(t *testing.T) { func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
container := api.Container{Name: "bar"} container := api.Container{Name: "bar"}
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
@ -404,16 +412,17 @@ func TestSyncPodsDoesNothing(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{"list", "list", "inspect_container", "inspect_container"}) verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
} }
func TestSyncPodsWithTerminationLog(t *testing.T) { func TestSyncPodsWithTerminationLog(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
container := api.Container{ container := api.Container{
Name: "bar", Name: "bar",
TerminationMessagePath: "/dev/somepath", TerminationMessagePath: "/dev/somepath",
@ -434,13 +443,14 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
@ -453,19 +463,6 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
fakeDocker.Unlock() fakeDocker.Unlock()
} }
// drainWorkers waits until all workers are done. Should only used for testing.
func (kl *Kubelet) drainWorkers() {
for {
kl.podWorkers.lock.Lock()
length := len(kl.podWorkers.workers)
kl.podWorkers.lock.Unlock()
if length == 0 {
return
}
time.Sleep(time.Millisecond * 100)
}
}
func matchString(t *testing.T, pattern, str string) bool { func matchString(t *testing.T, pattern, str string) bool {
match, err := regexp.MatchString(pattern, str) match, err := regexp.MatchString(pattern, str)
if err != nil { if err != nil {
@ -475,7 +472,7 @@ func matchString(t *testing.T, pattern, str string) bool {
} }
func TestSyncPodsCreatesNetAndContainer(t *testing.T) { func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
kubelet.podInfraContainerImage = "custom_image_name" kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.BoundPod{ kubelet.pods = []api.BoundPod{
@ -493,14 +490,15 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
@ -523,7 +521,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
} }
func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{} puller.HasImages = []string{}
kubelet.podInfraContainerImage = "custom_image_name" kubelet.podInfraContainerImage = "custom_image_name"
@ -543,14 +541,15 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
@ -567,7 +566,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
} }
func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
// pod infra container // pod infra container
@ -590,14 +589,15 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
if len(fakeDocker.Created) != 1 || if len(fakeDocker.Created) != 1 ||
@ -608,7 +608,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
} }
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
fakeHttp := fakeHTTP{} fakeHttp := fakeHTTP{}
kubelet.httpClient = &fakeHttp kubelet.httpClient = &fakeHttp
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -644,14 +644,15 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
if len(fakeDocker.Created) != 1 || if len(fakeDocker.Created) != 1 ||
@ -665,7 +666,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
} }
func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid> // format is // k8s_<container-id>_<pod-fullname>_<pod-uid>
@ -688,14 +689,15 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
}, },
}, },
} }
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods) err := kubelet.SyncPods(kubelet.pods)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
"list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) "list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
// A map iteration is used to delete containers, so must not depend on // A map iteration is used to delete containers, so must not depend on
// order here. // order here.
@ -711,7 +713,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false ready := false
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool { return ready } kubelet.sourceReady = func(source string) bool { return ready }
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -754,7 +756,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ready := false ready := false
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool { kubelet.sourceReady = func(source string) bool {
if source == "testSource" { if source == "testSource" {
return ready return ready
@ -814,7 +816,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
} }
func TestSyncPodsDeletes(t *testing.T) { func TestSyncPodsDeletes(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@ -852,7 +854,7 @@ func TestSyncPodsDeletes(t *testing.T) {
} }
func TestSyncPodDeletesDuplicate(t *testing.T) { func TestSyncPodDeletesDuplicate(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{ dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@ -902,7 +904,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
} }
func TestSyncPodBadHash(t *testing.T) { func TestSyncPodBadHash(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{ dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@ -951,7 +953,7 @@ func TestSyncPodBadHash(t *testing.T) {
} }
func TestSyncPodUnhealthy(t *testing.T) { func TestSyncPodUnhealthy(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
dockerContainers := dockertools.DockerContainers{ dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@ -1001,7 +1003,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
} }
func TestMountExternalVolumes(t *testing.T) { func TestMountExternalVolumes(t *testing.T) {
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet}) kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet})
pod := api.BoundPod{ pod := api.BoundPod{
@ -1035,7 +1037,7 @@ func TestMountExternalVolumes(t *testing.T) {
} }
func TestGetPodVolumesFromDisk(t *testing.T) { func TestGetPodVolumesFromDisk(t *testing.T) {
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
plug := &volume.FakePlugin{"fake", nil} plug := &volume.FakePlugin{"fake", nil}
kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet}) kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet})
@ -1310,7 +1312,7 @@ func TestGetContainerInfo(t *testing.T) {
cadvisorReq := &info.ContainerInfoRequest{} cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
@ -1348,7 +1350,6 @@ func TestGetRootInfo(t *testing.T) {
dockerClient: &fakeDocker, dockerClient: &fakeDocker,
dockerPuller: &dockertools.FakeDockerPuller{}, dockerPuller: &dockertools.FakeDockerPuller{},
cadvisorClient: mockCadvisor, cadvisorClient: mockCadvisor,
podWorkers: newPodWorkers(),
} }
// If the container name is an empty string, then it means the root container. // If the container name is an empty string, then it means the root container.
@ -1360,7 +1361,7 @@ func TestGetRootInfo(t *testing.T) {
} }
func TestGetContainerInfoWithoutCadvisor(t *testing.T) { func TestGetContainerInfoWithoutCadvisor(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
ID: "foobar", ID: "foobar",
@ -1385,7 +1386,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
cadvisorReq := &info.ContainerInfoRequest{} cadvisorReq := &info.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure) mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure)
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
{ {
@ -1413,7 +1414,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
func TestGetContainerInfoOnNonExistContainer(t *testing.T) { func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
mockCadvisor := &mockCadvisorClient{} mockCadvisor := &mockCadvisorClient{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
@ -1427,7 +1428,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
mockCadvisor := &mockCadvisorClient{} mockCadvisor := &mockCadvisorClient{}
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
expectedErr := fmt.Errorf("List containers error") expectedErr := fmt.Errorf("List containers error")
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr} kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr}
@ -1447,7 +1448,7 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) {
func TestGetContainerInfoWithNoContainers(t *testing.T) { func TestGetContainerInfoWithNoContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{} mockCadvisor := &mockCadvisorClient{}
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil} kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil}
@ -1466,7 +1467,7 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) {
func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
mockCadvisor := &mockCadvisorClient{} mockCadvisor := &mockCadvisorClient{}
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
kubelet.cadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
containerList := []docker.APIContainers{ containerList := []docker.APIContainers{
@ -1530,7 +1531,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por
func TestRunInContainerNoSuchPod(t *testing.T) { func TestRunInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
@ -1552,7 +1553,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
func TestRunInContainer(t *testing.T) { func TestRunInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
containerID := "abc1234" containerID := "abc1234"
@ -1593,7 +1594,7 @@ func TestRunInContainer(t *testing.T) {
func TestRunHandlerExec(t *testing.T) { func TestRunHandlerExec(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
containerID := "abc1234" containerID := "abc1234"
@ -1641,7 +1642,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
func TestRunHandlerHttp(t *testing.T) { func TestRunHandlerHttp(t *testing.T) {
fakeHttp := fakeHTTP{} fakeHttp := fakeHTTP{}
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHttp kubelet.httpClient = &fakeHttp
podName := "podFoo" podName := "podFoo"
@ -1670,7 +1671,7 @@ func TestRunHandlerHttp(t *testing.T) {
} }
func TestNewHandler(t *testing.T) { func TestNewHandler(t *testing.T) {
kubelet, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
handler := &api.Handler{ handler := &api.Handler{
HTTPGet: &api.HTTPGetAction{ HTTPGet: &api.HTTPGetAction{
Host: "foo", Host: "foo",
@ -1701,7 +1702,7 @@ func TestNewHandler(t *testing.T) {
} }
func TestSyncPodEventHandlerFails(t *testing.T) { func TestSyncPodEventHandlerFails(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.httpClient = &fakeHTTP{ kubelet.httpClient = &fakeHTTP{
err: fmt.Errorf("test error"), err: fmt.Errorf("test error"),
} }
@ -1890,7 +1891,7 @@ func TestKubeletGarbageCollection(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.maxContainerCount = 2 kubelet.maxContainerCount = 2
fakeDocker.ContainerList = test.containers fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails fakeDocker.ContainerMap = test.containerDetails
@ -2055,7 +2056,7 @@ func TestPurgeOldest(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.maxContainerCount = 5 kubelet.maxContainerCount = 5
fakeDocker.ContainerMap = test.containerDetails fakeDocker.ContainerMap = test.containerDetails
kubelet.purgeOldest(test.ids) kubelet.purgeOldest(test.ids)
@ -2066,11 +2067,12 @@ func TestPurgeOldest(t *testing.T) {
} }
func TestSyncPodsWithPullPolicy(t *testing.T) { func TestSyncPodsWithPullPolicy(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, waitGroup := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{"existing_one", "want:latest"} puller.HasImages = []string{"existing_one", "want:latest"}
kubelet.podInfraContainerImage = "custom_image_name" kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
waitGroup.Add(1)
err := kubelet.SyncPods([]api.BoundPod{ err := kubelet.SyncPods([]api.BoundPod{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -2093,7 +2095,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
kubelet.drainWorkers() waitGroup.Wait()
fakeDocker.Lock() fakeDocker.Lock()
@ -2399,7 +2401,7 @@ func TestMakeEnvironmentVariables(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
kl, _ := newTestKubelet(t) kl, _, _ := newTestKubelet(t)
kl.masterServiceNamespace = tc.masterServiceNamespace kl.masterServiceNamespace = tc.masterServiceNamespace
if tc.nilLister { if tc.nilLister {
kl.serviceLister = nil kl.serviceLister = nil
@ -2836,7 +2838,7 @@ func TestGetPodReadyCondition(t *testing.T) {
func TestExecInContainerNoSuchPod(t *testing.T) { func TestExecInContainerNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
@ -2863,7 +2865,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
func TestExecInContainerNoSuchContainer(t *testing.T) { func TestExecInContainerNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
@ -2916,7 +2918,7 @@ func (f *fakeReadWriteCloser) Close() error {
func TestExecInContainer(t *testing.T) { func TestExecInContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
@ -2975,7 +2977,7 @@ func TestExecInContainer(t *testing.T) {
func TestPortForwardNoSuchPod(t *testing.T) { func TestPortForwardNoSuchPod(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
@ -2999,7 +3001,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
func TestPortForwardNoSuchContainer(t *testing.T) { func TestPortForwardNoSuchContainer(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
@ -3034,7 +3036,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
func TestPortForward(t *testing.T) { func TestPortForward(t *testing.T) {
fakeCommandRunner := fakeContainerCommandRunner{} fakeCommandRunner := fakeContainerCommandRunner{}
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"

101
pkg/kubelet/pod_workers.go Normal file
View File

@ -0,0 +1,101 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error
// TODO(wojtek-t) Add unit tests for this type.
type podWorkers struct {
// Protects podUpdates field.
podLock sync.Mutex
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan api.BoundPod
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFun syncPodFunType
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan api.BoundPod{},
dockerCache: dockerCache,
syncPodFun: syncPodFun,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) {
for newPod := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
continue
}
err = p.syncPodFun(&newPod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err)
record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err)
continue
}
}
}
func (p *podWorkers) UpdatePod(pod api.BoundPod) {
uid := pod.UID
var podUpdates chan api.BoundPod
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// TODO(wojtek-t): Adjust the size of the buffer in this channel
podUpdates = make(chan api.BoundPod, 5)
p.podUpdates[uid] = podUpdates
go p.managePodLoop(podUpdates)
}
podUpdates <- pod
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key, channel := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
}
}
}