diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c7a5e321b54..345f49c2e9b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,10 +61,7 @@ const podOomScoreAdj = -100 // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - // Syncs current state to match the specified pods. SyncPodType specified what - // type of sync is occuring per pod. StartTime specifies the time at which - // syncing began (for use in monitoring). - SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error + SyncPods([]api.BoundPod) error } type SourceReadyFn func(source string) bool @@ -114,6 +111,7 @@ func NewMainKubelet( rootDirectory: rootDirectory, resyncInterval: resyncInterval, podInfraContainerImage: podInfraContainerImage, + podWorkers: newPodWorkers(), dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, runner: dockertools.NewDockerContainerCommandRunner(dockerClient), httpClient: &http.Client{}, @@ -136,7 +134,6 @@ func NewMainKubelet( return nil, err } klet.dockerCache = dockerCache - klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod) metrics.Register(dockerCache) @@ -456,6 +453,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } +// Per-pod workers. +type podWorkers struct { + lock sync.Mutex + + // Set of pods with existing workers. + workers util.StringSet +} + +func newPodWorkers() *podWorkers { + return &podWorkers{ + workers: util.NewStringSet(), + } +} + +// Runs a worker for "podFullName" asynchronously with the specified "action". +// If the worker for the "podFullName" is already running, functions as a no-op. +func (self *podWorkers) Run(podFullName string, action func()) { + self.lock.Lock() + defer self.lock.Unlock() + + // This worker is already running, let it finish. + if self.workers.Has(podFullName) { + return + } + self.workers.Insert(podFullName) + + // Run worker async. + go func() { + defer util.HandleCrash() + action() + + self.lock.Lock() + defer self.lock.Unlock() + self.workers.Delete(podFullName) + }() +} + func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { @@ -945,7 +979,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { start := time.Now() defer func() { - metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start)) + metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())) }() if err := kl.dockerPuller.Pull(img); err != nil { @@ -1273,7 +1307,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { +func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { glog.V(4).Infof("Desired: %#v", pods) var err error desiredContainers := make(map[podContainer]empty) @@ -1299,14 +1333,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metr } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, func() { - metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) + kl.podWorkers.Run(podFullName, func() { + if err := kl.syncPod(pod, dockerContainers); err != nil { + glog.Errorf("Error syncing pod, skipping: %v", err) + record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err) + } }) } - - // Stop the workers for no-longer existing pods. - kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) - // Kill any containers we don't need. killed := []string{} for ix := range dockerContainers { @@ -1421,21 +1454,19 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { for { unsyncedPod := false - podSyncTypes := make(map[types.UID]metrics.SyncPodType) select { case u := <-updates: - kl.updatePods(u, podSyncTypes) + kl.updatePods(u) unsyncedPod = true case <-time.After(kl.resyncInterval): glog.V(4).Infof("Periodic sync") } - start := time.Now() // If we already caught some update, try to wait for some short time // to possibly batch it with other incoming updates. for unsyncedPod { select { case u := <-updates: - kl.updatePods(u, podSyncTypes) + kl.updatePods(u) case <-time.After(5 * time.Millisecond): // Break the for loop. unsyncedPod = false @@ -1447,54 +1478,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Errorf("Failed to get bound pods.") return } - if err := handler.SyncPods(pods, podSyncTypes, start); err != nil { + if err := handler.SyncPods(pods); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } } } -// Updated the Kubelet's internal pods with those provided by the update. -// Records new and updated pods in newPods and updatedPods. -func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { +func (kl *Kubelet) updatePods(u PodUpdate) { switch u.Op { case SET: glog.V(3).Infof("SET: Containers changed") - - // Store the new pods. Don't worry about filtering host ports since those - // pods will never be looked up. - existingPods := make(map[types.UID]struct{}) - for i := range kl.pods { - existingPods[kl.pods[i].UID] = struct{}{} - } - for i := range u.Pods { - if _, ok := existingPods[u.Pods[i].UID]; !ok { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate - } - } - kl.pods = u.Pods kl.pods = filterHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") - - // Store the updated pods. Don't worry about filtering host ports since those - // pods will never be looked up. - for i := range u.Pods { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate - } - kl.pods = updateBoundPods(u.Pods, kl.pods) kl.pods = filterHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } - - // Mark all remaining pods as sync. - for i := range kl.pods { - if _, ok := podSyncTypes[kl.pods[i].UID]; !ok { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync - } - } } // Returns Docker version for this Kubelet. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 805e45196ab..84c7eb2e6c5 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,7 +34,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -49,15 +48,14 @@ func init() { util.ReallyCrash = true } -func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) { +func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { fakeDocker := &dockertools.FakeDockerClient{ RemovedImages: util.StringSet{}, } - fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker - kubelet.dockerCache = fakeDockerCache + kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker) kubelet.dockerPuller = &dockertools.FakeDockerPuller{} if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) @@ -67,14 +65,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - waitGroup := new(sync.WaitGroup) - kubelet.podWorkers = newPodWorkers( - fakeDockerCache, - func(pod *api.BoundPod, containers dockertools.DockerContainers) error { - err := kubelet.syncPod(pod, containers) - waitGroup.Done() - return err - }) + kubelet.podWorkers = newPodWorkers() kubelet.sourceReady = func(source string) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} @@ -83,7 +74,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn t.Fatalf("can't initialize kubelet data dirs: %v", err) } - return kubelet, fakeDocker, waitGroup + return kubelet, fakeDocker } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -135,7 +126,7 @@ func verifyBoolean(t *testing.T, expected, value bool) { } func TestKubeletDirs(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) root := kubelet.rootDirectory var exp, got string @@ -196,7 +187,7 @@ func TestKubeletDirs(t *testing.T) { } func TestKubeletDirsCompat(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) root := kubelet.rootDirectory if err := os.MkdirAll(root, 0750); err != nil { t.Fatalf("can't mkdir(%q): %s", root, err) @@ -302,7 +293,7 @@ func TestKillContainerWithError(t *testing.T) { Err: fmt.Errorf("sample error"), ContainerList: append([]docker.APIContainers{}, containers...), } - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) for _, c := range fakeDocker.ContainerList { kubelet.readiness.set(c.ID, true) } @@ -333,7 +324,7 @@ func TestKillContainer(t *testing.T) { Names: []string{"/k8s_bar_qux_5678_42"}, }, } - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...) fakeDocker.Container = &docker.Container{ Name: "foobar", @@ -383,10 +374,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod { return cr.list } -var emptyPodUIDs map[types.UID]metrics.SyncPodType - func TestSyncPodsDoesNothing(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) container := api.Container{Name: "bar"} fakeDocker.ContainerList = []docker.APIContainers{ { @@ -415,17 +404,16 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"}) + kubelet.drainWorkers() + verifyCalls(t, fakeDocker, []string{"list", "list", "inspect_container", "inspect_container"}) } func TestSyncPodsWithTerminationLog(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) container := api.Container{ Name: "bar", TerminationMessagePath: "/dev/somepath", @@ -446,14 +434,13 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -466,6 +453,19 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { fakeDocker.Unlock() } +// drainWorkers waits until all workers are done. Should only used for testing. +func (kl *Kubelet) drainWorkers() { + for { + kl.podWorkers.lock.Lock() + length := len(kl.podWorkers.workers) + kl.podWorkers.lock.Unlock() + if length == 0 { + return + } + time.Sleep(time.Millisecond * 100) + } +} + func matchString(t *testing.T, pattern, str string) bool { match, err := regexp.MatchString(pattern, str) if err != nil { @@ -475,7 +475,7 @@ func matchString(t *testing.T, pattern, str string) bool { } func TestSyncPodsCreatesNetAndContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} kubelet.pods = []api.BoundPod{ @@ -493,15 +493,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() @@ -524,7 +523,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{} kubelet.podInfraContainerImage = "custom_image_name" @@ -544,15 +543,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() @@ -569,7 +567,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // pod infra container @@ -592,15 +590,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -611,7 +608,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeHttp := fakeHTTP{} kubelet.httpClient = &fakeHttp fakeDocker.ContainerList = []docker.APIContainers{ @@ -647,15 +644,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -669,7 +665,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { } func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // format is // k8s___ @@ -692,15 +688,14 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, }, } - waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() verifyCalls(t, fakeDocker, []string{ - "list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -716,7 +711,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.sourceReady = func(source string) bool { return ready } fakeDocker.ContainerList = []docker.APIContainers{ @@ -731,7 +726,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -739,7 +734,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -759,7 +754,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { ready := false - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.sourceReady = func(source string) bool { if source == "testSource" { return ready @@ -790,7 +785,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -798,7 +793,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { + if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -819,7 +814,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { } func TestSyncPodsDeletes(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container @@ -836,7 +831,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()) + err := kubelet.SyncPods([]api.BoundPod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -857,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) { } func TestSyncPodDeletesDuplicate(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -907,7 +902,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { } func TestSyncPodBadHash(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -956,7 +951,7 @@ func TestSyncPodBadHash(t *testing.T) { } func TestSyncPodUnhealthy(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -1006,7 +1001,7 @@ func TestSyncPodUnhealthy(t *testing.T) { } func TestMountExternalVolumes(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet}) pod := api.BoundPod{ @@ -1040,7 +1035,7 @@ func TestMountExternalVolumes(t *testing.T) { } func TestGetPodVolumesFromDisk(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) plug := &volume.FakePlugin{"fake", nil} kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet}) @@ -1315,7 +1310,7 @@ func TestGetContainerInfo(t *testing.T) { cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{ { @@ -1353,6 +1348,7 @@ func TestGetRootInfo(t *testing.T) { dockerClient: &fakeDocker, dockerPuller: &dockertools.FakeDockerPuller{}, cadvisorClient: mockCadvisor, + podWorkers: newPodWorkers(), } // If the container name is an empty string, then it means the root container. @@ -1364,7 +1360,7 @@ func TestGetRootInfo(t *testing.T) { } func TestGetContainerInfoWithoutCadvisor(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { ID: "foobar", @@ -1389,7 +1385,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure) - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{ { @@ -1417,7 +1413,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { func TestGetContainerInfoOnNonExistContainer(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{} @@ -1431,7 +1427,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor expectedErr := fmt.Errorf("List containers error") kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr} @@ -1451,7 +1447,7 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { func TestGetContainerInfoWithNoContainers(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil} @@ -1470,7 +1466,7 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) { func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor containerList := []docker.APIContainers{ @@ -1534,7 +1530,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por func TestRunInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -1556,7 +1552,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { func TestRunInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1597,7 +1593,7 @@ func TestRunInContainer(t *testing.T) { func TestRunHandlerExec(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1645,7 +1641,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) { func TestRunHandlerHttp(t *testing.T) { fakeHttp := fakeHTTP{} - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) kubelet.httpClient = &fakeHttp podName := "podFoo" @@ -1674,7 +1670,7 @@ func TestRunHandlerHttp(t *testing.T) { } func TestNewHandler(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) + kubelet, _ := newTestKubelet(t) handler := &api.Handler{ HTTPGet: &api.HTTPGetAction{ Host: "foo", @@ -1705,7 +1701,7 @@ func TestNewHandler(t *testing.T) { } func TestSyncPodEventHandlerFails(t *testing.T) { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.httpClient = &fakeHTTP{ err: fmt.Errorf("test error"), } @@ -1894,7 +1890,7 @@ func TestKubeletGarbageCollection(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.maxContainerCount = 2 fakeDocker.ContainerList = test.containers fakeDocker.ContainerMap = test.containerDetails @@ -2059,7 +2055,7 @@ func TestPurgeOldest(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.maxContainerCount = 5 fakeDocker.ContainerMap = test.containerDetails kubelet.purgeOldest(test.ids) @@ -2070,12 +2066,11 @@ func TestPurgeOldest(t *testing.T) { } func TestSyncPodsWithPullPolicy(t *testing.T) { - kubelet, fakeDocker, waitGroup := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - waitGroup.Add(1) err := kubelet.SyncPods([]api.BoundPod{ { ObjectMeta: api.ObjectMeta{ @@ -2094,11 +2089,11 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, }, }, - }, emptyPodUIDs, time.Now()) + }) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() + kubelet.drainWorkers() fakeDocker.Lock() @@ -2404,7 +2399,7 @@ func TestMakeEnvironmentVariables(t *testing.T) { } for _, tc := range testCases { - kl, _, _ := newTestKubelet(t) + kl, _ := newTestKubelet(t) kl.masterServiceNamespace = tc.masterServiceNamespace if tc.nilLister { kl.serviceLister = nil @@ -2841,7 +2836,7 @@ func TestGetPodReadyCondition(t *testing.T) { func TestExecInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -2868,7 +2863,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { func TestExecInContainerNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2921,7 +2916,7 @@ func (f *fakeReadWriteCloser) Close() error { func TestExecInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2980,7 +2975,7 @@ func TestExecInContainer(t *testing.T) { func TestPortForwardNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -3004,7 +2999,7 @@ func TestPortForwardNoSuchPod(t *testing.T) { func TestPortForwardNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -3039,7 +3034,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) { func TestPortForward(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _ := newTestKubelet(t) + kubelet, fakeDocker := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index bb6a5267822..1ce8d29e2ab 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -18,7 +18,6 @@ package metrics import ( "sync" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -36,16 +35,8 @@ var ( Help: "Image pull latency in microseconds.", }, ) - // TODO(vmarmol): Break down by number of containers in pod? - SyncPodLatency = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Subsystem: kubeletSubsystem, - Name: "sync_pod_latency_microseconds", - Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync", - }, - []string{"operation_type"}, - ) // TODO(vmarmol): Containers per pod + // TODO(vmarmol): Latency of pod startup // TODO(vmarmol): Latency of SyncPods ) @@ -56,37 +47,10 @@ func Register(containerCache dockertools.DockerCache) { // Register the metrics. registerMetrics.Do(func() { prometheus.MustRegister(ImagePullLatency) - prometheus.MustRegister(SyncPodLatency) prometheus.MustRegister(newPodAndContainerCollector(containerCache)) }) } -type SyncPodType int - -const ( - SyncPodCreate SyncPodType = iota - SyncPodUpdate - SyncPodSync -) - -func (self SyncPodType) String() string { - switch self { - case SyncPodCreate: - return "create" - case SyncPodUpdate: - return "update" - case SyncPodSync: - return "sync" - default: - return "unknown" - } -} - -// Gets the time since the specified start in microseconds. -func SinceInMicroseconds(start time.Time) float64 { - return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) -} - func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector { return &podAndContainerCollector{ containerCache: containerCache, diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go deleted file mode 100644 index a4eab949799..00000000000 --- a/pkg/kubelet/pod_workers.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/types" - "github.com/golang/glog" -) - -type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error - -// TODO(wojtek-t) Add unit tests for this type. -type podWorkers struct { - // Protects podUpdates field. - podLock sync.Mutex - - // Tracks all running per-pod goroutines - per-pod goroutine will be - // processing updates received through its corresponding channel. - podUpdates map[types.UID]chan workUpdate - // DockerCache is used for listing running containers. - dockerCache dockertools.DockerCache - - // This function is run to sync the desired stated of pod. - // NOTE: This function has to be thread-safe - it can be called for - // different pods at the same time. - syncPodFun syncPodFunType -} - -type workUpdate struct { - // The pod state to reflect. - pod *api.BoundPod - - // Function to call when the update is complete. - updateCompleteFun func() -} - -func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers { - return &podWorkers{ - podUpdates: map[types.UID]chan workUpdate{}, - dockerCache: dockerCache, - syncPodFun: syncPodFun, - } -} - -func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { - for newWork := range podUpdates { - // Since we use docker cache, getting current state shouldn't cause - // performance overhead on Docker. Moreover, as long as we run syncPod - // no matter if it changes anything, having an old version of "containers" - // can cause starting eunended containers. - containers, err := p.dockerCache.RunningContainers() - if err != nil { - glog.Errorf("Error listing containers while syncing pod: %v", err) - continue - } - err = p.syncPodFun(newWork.pod, containers) - if err != nil { - glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) - record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) - continue - } - } -} - -// Apply the new setting to the specified pod. updateComplete is called when the update is completed. -func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) { - uid := pod.UID - var podUpdates chan workUpdate - var exists bool - - p.podLock.Lock() - defer p.podLock.Unlock() - if podUpdates, exists = p.podUpdates[uid]; !exists { - // TODO(wojtek-t): Adjust the size of the buffer in this channel - podUpdates = make(chan workUpdate, 5) - p.podUpdates[uid] = podUpdates - go p.managePodLoop(podUpdates) - } - podUpdates <- workUpdate{ - pod: pod, - updateCompleteFun: updateComplete, - } -} - -func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { - p.podLock.Lock() - defer p.podLock.Unlock() - for key, channel := range p.podUpdates { - if _, exists := desiredPods[key]; !exists { - close(channel) - delete(p.podUpdates, key) - } - } -}