mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #7980 from yifan-gu/fix_kubelet_tests
kubelet: Fix racy kubelet tests.
This commit is contained in:
commit
8b3130b112
45
pkg/kubelet/fake_pod_workers.go
Normal file
45
pkg/kubelet/fake_pod_workers.go
Normal file
@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// fakePodWorkers runs sync pod function in serial, so we can have
|
||||
// deterministic behaviour in testing.
|
||||
type fakePodWorkers struct {
|
||||
syncPodFn syncPodFnType
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
|
||||
pods, err := f.runtimeCache.GetPods()
|
||||
if err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID)); err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
|
@ -332,7 +332,7 @@ type Kubelet struct {
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
kubeClient client.Interface
|
||||
rootDirectory string
|
||||
podWorkers *podWorkers
|
||||
podWorkers PodWorkers
|
||||
resyncInterval time.Duration
|
||||
sourcesReady SourcesReadyFn
|
||||
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -64,7 +63,6 @@ type TestKubelet struct {
|
||||
fakeDocker *dockertools.FakeDockerClient
|
||||
fakeCadvisor *cadvisor.Mock
|
||||
fakeKubeClient *testclient.Fake
|
||||
waitGroup *sync.WaitGroup
|
||||
fakeMirrorClient *fakeMirrorClient
|
||||
}
|
||||
|
||||
@ -91,7 +89,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
|
||||
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
|
||||
}
|
||||
waitGroup := new(sync.WaitGroup)
|
||||
kubelet.sourcesReady = func() bool { return true }
|
||||
kubelet.masterServiceNamespace = api.NamespaceDefault
|
||||
kubelet.serviceLister = testServiceLister{}
|
||||
@ -111,16 +108,13 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
|
||||
kubelet.containerRuntime = dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, kubelet, &fakeHTTP{}, runtimeHooks)
|
||||
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime)
|
||||
kubelet.podWorkers = newPodWorkers(
|
||||
kubelet.runtimeCache,
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
|
||||
err := kubelet.syncPod(pod, mirrorPod, runningPod)
|
||||
waitGroup.Done()
|
||||
return err
|
||||
},
|
||||
fakeRecorder)
|
||||
kubelet.podWorkers = &fakePodWorkers{
|
||||
syncPodFn: kubelet.syncPod,
|
||||
runtimeCache: kubelet.runtimeCache,
|
||||
t: t,
|
||||
}
|
||||
kubelet.volumeManager = newVolumeManager()
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient}
|
||||
}
|
||||
|
||||
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
||||
@ -373,7 +367,6 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
container := api.Container{Name: "bar"}
|
||||
pods := []*api.Pod{
|
||||
@ -417,20 +410,21 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra contianer.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
@ -438,7 +432,6 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
container := api.Container{
|
||||
Name: "bar",
|
||||
TerminationMessagePath: "/dev/somepath",
|
||||
@ -459,14 +452,12 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
@ -474,7 +465,10 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
|
||||
@ -500,7 +494,6 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
@ -521,15 +514,13 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
@ -537,7 +528,10 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@ -564,7 +558,6 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
@ -586,16 +579,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
@ -603,7 +594,10 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@ -624,7 +618,6 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -653,16 +646,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
@ -670,7 +661,10 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
@ -695,7 +689,6 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
fakeHttp := fakeHTTP{}
|
||||
|
||||
// Simulate HTTP failure. Re-create the containerRuntime to inject the failure.
|
||||
@ -742,16 +735,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
@ -759,7 +750,10 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
@ -777,7 +771,6 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
@ -840,19 +833,17 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
waitGroup.Add(2)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyUnorderedCalls(t, fakeDocker, []string{
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list",
|
||||
|
||||
// foo1
|
||||
"list",
|
||||
"list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container",
|
||||
// Kill the container since pod infra container is not running.
|
||||
@ -866,12 +857,16 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
|
||||
// foo2
|
||||
"list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
// A map iteration is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
@ -977,7 +972,6 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
@ -1030,15 +1024,13 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
||||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -1046,7 +1038,10 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
||||
// Kill the duplicated container.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
// Expect one of the duplicates to be killed.
|
||||
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
@ -1058,7 +1053,6 @@ func TestSyncPodsBadHash(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
@ -1101,15 +1095,13 @@ func TestSyncPodsBadHash(t *testing.T) {
|
||||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -1117,7 +1109,10 @@ func TestSyncPodsBadHash(t *testing.T) {
|
||||
// Kill and restart the bad hash container.
|
||||
"stop", "create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
@ -1129,7 +1124,6 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
@ -1175,15 +1169,13 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -1193,7 +1185,10 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
// Restart the unhealthy container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
@ -1646,7 +1641,6 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Simulate HTTP failure. Re-create the containerRuntime to inject the failure.
|
||||
kubelet.httpClient = &fakeHTTP{
|
||||
@ -1695,15 +1689,13 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
@ -1713,7 +1705,10 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
// Kill the container since event handler fails.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
// TODO(yifan): Check the stopped container's name.
|
||||
if len(fakeDocker.Stopped) != 1 {
|
||||
@ -1733,7 +1728,6 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
@ -1761,12 +1755,10 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@ -3662,7 +3654,6 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
@ -3682,12 +3673,10 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, ok := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if ok {
|
||||
@ -3812,7 +3801,6 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
containers := []api.Container{
|
||||
{Name: "succeeded"},
|
||||
@ -3891,7 +3879,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
api.RestartPolicyAlways,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -3899,13 +3887,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
// Restart both containers.
|
||||
"create", "start", "create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{"succeeded", "failed"},
|
||||
[]string{},
|
||||
},
|
||||
{
|
||||
api.RestartPolicyOnFailure,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -3913,13 +3904,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
// Restart the failed container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{"failed"},
|
||||
[]string{},
|
||||
},
|
||||
{
|
||||
api.RestartPolicyNever,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
@ -3927,7 +3921,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
// Stop the last pod infra container.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{},
|
||||
[]string{"9876"},
|
||||
},
|
||||
@ -3941,12 +3938,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
||||
pods[0].Spec.RestartPolicy = tt.policy
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
// 'stop' is because the pod infra container is killed when no container is running.
|
||||
verifyCalls(t, fakeDocker, tt.calls)
|
||||
@ -3965,7 +3960,6 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
containers := []api.Container{
|
||||
{Name: "succeeded"},
|
||||
@ -4072,12 +4066,10 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
podName := kubecontainer.GetPodFullName(pods[0])
|
||||
@ -4112,7 +4104,6 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Inject the creation failure error to docker.
|
||||
failureReason := "creation failure"
|
||||
@ -4150,12 +4141,10 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
||||
pods := []*api.Pod{pod}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{})
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
@ -4178,7 +4167,6 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Initialize the FakeDockerPuller so that it'd try to pull non-existent
|
||||
// images.
|
||||
@ -4219,12 +4207,10 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
||||
pods := []*api.Pod{pod}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{})
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
|
@ -28,6 +28,12 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// PodWorkers is an abstract interface for testability.
|
||||
type PodWorkers interface {
|
||||
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
|
||||
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
|
||||
}
|
||||
|
||||
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error
|
||||
|
||||
type podWorkers struct {
|
||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -27,6 +29,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
func newPod(uid, name string) *api.Pod {
|
||||
@ -147,3 +150,227 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
|
||||
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
|
||||
}
|
||||
}
|
||||
|
||||
type simpleFakeKubelet struct {
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
runningPod kubecontainer.Pod
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
|
||||
kl.wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
// byContainerName sort the containers in a running pod by their names.
|
||||
type byContainerName kubecontainer.Pod
|
||||
|
||||
func (b byContainerName) Len() int { return len(b.Containers) }
|
||||
|
||||
func (b byContainerName) Swap(i, j int) {
|
||||
b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i]
|
||||
}
|
||||
|
||||
func (b byContainerName) Less(i, j int) bool {
|
||||
return b.Containers[i].Name < b.Containers[j].Name
|
||||
}
|
||||
|
||||
// TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
|
||||
// for their invocation of the syncPodFn.
|
||||
func TestFakePodWorkers(t *testing.T) {
|
||||
// Create components for pod workers.
|
||||
fakeDocker := &dockertools.FakeDockerClient{}
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder))
|
||||
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
|
||||
|
||||
kubeletForRealWorkers := &simpleFakeKubelet{}
|
||||
kubeletForFakeWorkers := &simpleFakeKubelet{}
|
||||
|
||||
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder)
|
||||
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
|
||||
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
containerList []docker.APIContainers
|
||||
containersInRunningPod int
|
||||
}{
|
||||
{
|
||||
&api.Pod{},
|
||||
&api.Pod{},
|
||||
[]docker.APIContainers{},
|
||||
0,
|
||||
},
|
||||
|
||||
{
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "fooContainer",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "fooMirror",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "fooContainerMirror",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]docker.APIContainers{
|
||||
{
|
||||
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>_<random>
|
||||
Names: []string{"/k8s_bar.hash123_foo_new_12345678_0"},
|
||||
ID: "1234",
|
||||
},
|
||||
{
|
||||
// pod infra container
|
||||
Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"},
|
||||
ID: "9876",
|
||||
},
|
||||
},
|
||||
2,
|
||||
},
|
||||
|
||||
{
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "98765",
|
||||
Name: "bar",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "fooContainer",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "98765",
|
||||
Name: "fooMirror",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "fooContainerMirror",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]docker.APIContainers{
|
||||
{
|
||||
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>_<random>
|
||||
Names: []string{"/k8s_bar.hash123_bar_new_98765_0"},
|
||||
ID: "1234",
|
||||
},
|
||||
{
|
||||
// pod infra container
|
||||
Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"},
|
||||
ID: "9876",
|
||||
},
|
||||
},
|
||||
1,
|
||||
},
|
||||
|
||||
// Empty running pod.
|
||||
{
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "98765",
|
||||
Name: "baz",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "bazContainer",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "98765",
|
||||
Name: "bazMirror",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "bazContainerMirror",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]docker.APIContainers{
|
||||
{
|
||||
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>_<random>
|
||||
Names: []string{"/k8s_bar.hash123_bar_new_12345678_0"},
|
||||
ID: "1234",
|
||||
},
|
||||
{
|
||||
// pod infra container
|
||||
Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"},
|
||||
ID: "9876",
|
||||
},
|
||||
},
|
||||
0,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
kubeletForRealWorkers.wg.Add(1)
|
||||
|
||||
fakeDocker.ContainerList = tt.containerList
|
||||
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
|
||||
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {})
|
||||
|
||||
kubeletForRealWorkers.wg.Wait()
|
||||
|
||||
if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
|
||||
}
|
||||
|
||||
if tt.containersInRunningPod != len(kubeletForFakeWorkers.runningPod.Containers) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, tt.containersInRunningPod, len(kubeletForFakeWorkers.runningPod.Containers))
|
||||
}
|
||||
|
||||
sort.Sort(byContainerName(kubeletForRealWorkers.runningPod))
|
||||
sort.Sort(byContainerName(kubeletForFakeWorkers.runningPod))
|
||||
if !reflect.DeepEqual(kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user