diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 328c85be6ba..151a629731c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -95,7 +95,7 @@ type SyncHandler interface { // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occuring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). - SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, + SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod, startTime time.Time) error } @@ -1480,7 +1480,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error { +func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, + mirrorPods map[string]*api.Pod, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() @@ -1528,7 +1529,8 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() { + _, hasMirrorPod := mirrorPods[podFullName] + kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() { metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) @@ -1597,7 +1599,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Remove any orphaned mirror pods. - kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods) + kl.podManager.DeleteOrphanedMirrorPods() return err } @@ -1723,7 +1725,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } - pods, mirrorPods := kl.GetPods() + pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } @@ -1788,8 +1790,8 @@ func (kl *Kubelet) GetHostname() string { } // GetPods returns all pods bound to the kubelet and their spec, and the mirror -// pod map. -func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) { +// pods. +func (kl *Kubelet) GetPods() []api.Pod { return kl.podManager.GetPods() } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f72140953cb..9ef2e0085f3 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -55,12 +55,12 @@ func init() { } type TestKubelet struct { - kubelet *Kubelet - fakeDocker *dockertools.FakeDockerClient - fakeCadvisor *cadvisor.Mock - fakeKubeClient *client.Fake - waitGroup *sync.WaitGroup - fakeMirrorManager *fakeMirrorManager + kubelet *Kubelet + fakeDocker *dockertools.FakeDockerClient + fakeCadvisor *cadvisor.Mock + fakeKubeClient *client.Fake + waitGroup *sync.WaitGroup + fakeMirrorClient *fakeMirrorClient } func newTestKubelet(t *testing.T) *TestKubelet { @@ -105,9 +105,9 @@ func newTestKubelet(t *testing.T) *TestKubelet { } mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor - podManager, fakeMirrorManager := newFakePodManager() + podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager - return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorManager} + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -450,7 +450,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { } kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -485,7 +485,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -536,7 +536,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -591,7 +591,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } waitGroup.Add(1) kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -643,7 +643,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { } waitGroup.Add(1) kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -702,7 +702,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { } waitGroup.Add(1) kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -773,7 +773,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { } waitGroup.Add(2) kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -814,7 +814,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -822,7 +822,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -861,7 +861,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1744,7 +1744,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, }, }, - }, emptyPodUIDs, *newMirrorPods(), time.Now()) + }, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3008,7 +3008,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { t.Fatalf("expected to have status cached for %q: %v", "pod2", err) } // Sync with empty pods so that the entry in status map will be removed. - kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()) + kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil { t.Fatalf("expected to not have status cached for %q: %v", "pod2", err) } @@ -3224,7 +3224,7 @@ func TestUpdateNodeStatusError(t *testing.T) { func TestCreateMirrorPod(t *testing.T) { testKubelet := newTestKubelet(t) kl := testKubelet.kubelet - manager := testKubelet.fakeMirrorManager + manager := testKubelet.fakeMirrorClient pod := api.Pod{ ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -3255,7 +3255,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kl := testKubelet.kubelet - manager := testKubelet.fakeMirrorManager + manager := testKubelet.fakeMirrorClient orphanPods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -3281,12 +3281,10 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { }, } - mirrorPods := newMirrorPods() - for _, pod := range orphanPods { - mirrorPods.Insert(&pod) - } + kl.podManager.SetPods(orphanPods) + pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap() // Sync with an empty pod list to delete all mirror pods. - err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now()) + err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/kubelet/mirror_client.go b/pkg/kubelet/mirror_client.go new file mode 100644 index 00000000000..c25f7d83601 --- /dev/null +++ b/pkg/kubelet/mirror_client.go @@ -0,0 +1,95 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/golang/glog" +) + +// Mirror client is used to create/delete a mirror pod. + +type mirrorClient interface { + CreateMirrorPod(api.Pod, string) error + DeleteMirrorPod(string) error +} + +type basicMirrorClient struct { + // mirror pods are stored in the kubelet directly because they need to be + // in sync with the internal pods. + apiserverClient client.Interface +} + +func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient { + return &basicMirrorClient{apiserverClient: apiserverClient} +} + +// Creates a mirror pod. +func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod, hostname string) error { + if self.apiserverClient == nil { + return nil + } + // Indicate that the pod should be scheduled to the current node. + pod.Spec.Host = hostname + pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType + + _, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod) + return err +} + +// Deletes a mirror pod. +func (self *basicMirrorClient) DeleteMirrorPod(podFullName string) error { + if self.apiserverClient == nil { + return nil + } + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + glog.Errorf("Failed to parse a pod full name %q", podFullName) + return err + } + glog.V(4).Infof("Deleting a mirror pod %q", podFullName) + if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil { + glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) + } + return nil +} + +// Helper functions. +func getPodSource(pod *api.Pod) (string, error) { + if pod.Annotations != nil { + if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok { + return source, nil + } + } + return "", fmt.Errorf("cannot get source of pod %q", pod.UID) +} + +func isStaticPod(pod *api.Pod) bool { + source, err := getPodSource(pod) + return err == nil && source != ApiserverSource +} + +func isMirrorPod(pod *api.Pod) bool { + if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok { + return false + } else { + return value == MirrorType + } +} diff --git a/pkg/kubelet/mirror_manager_test.go b/pkg/kubelet/mirror_client_test.go similarity index 62% rename from pkg/kubelet/mirror_manager_test.go rename to pkg/kubelet/mirror_client_test.go index da4222db60b..85ff6108d2e 100644 --- a/pkg/kubelet/mirror_manager_test.go +++ b/pkg/kubelet/mirror_client_test.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "reflect" "sync" "testing" @@ -25,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -type fakeMirrorManager struct { +type fakeMirrorClient struct { mirrorPodLock sync.RWMutex // Note that a real mirror manager does not store the mirror pods in // itself. This fake manager does this to track calls. @@ -34,7 +33,7 @@ type fakeMirrorManager struct { deleteCounts map[string]int } -func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error { +func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error { self.mirrorPodLock.Lock() defer self.mirrorPodLock.Unlock() podFullName := GetPodFullName(&pod) @@ -43,7 +42,7 @@ func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error { return nil } -func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error { +func (self *fakeMirrorClient) DeleteMirrorPod(podFullName string) error { self.mirrorPodLock.Lock() defer self.mirrorPodLock.Unlock() self.mirrorPods.Delete(podFullName) @@ -51,81 +50,38 @@ func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error { return nil } -func newFakeMirrorMananger() *fakeMirrorManager { - m := fakeMirrorManager{} +func newFakeMirrorClient() *fakeMirrorClient { + m := fakeMirrorClient{} m.mirrorPods = util.NewStringSet() m.createCounts = make(map[string]int) m.deleteCounts = make(map[string]int) return &m } -func (self *fakeMirrorManager) HasPod(podFullName string) bool { +func (self *fakeMirrorClient) HasPod(podFullName string) bool { self.mirrorPodLock.RLock() defer self.mirrorPodLock.RUnlock() return self.mirrorPods.Has(podFullName) } -func (self *fakeMirrorManager) NumOfPods() int { +func (self *fakeMirrorClient) NumOfPods() int { self.mirrorPodLock.RLock() defer self.mirrorPodLock.RUnlock() return self.mirrorPods.Len() } -func (self *fakeMirrorManager) GetPods() []string { +func (self *fakeMirrorClient) GetPods() []string { self.mirrorPodLock.RLock() defer self.mirrorPodLock.RUnlock() return self.mirrorPods.List() } -func (self *fakeMirrorManager) GetCounts(podFullName string) (int, int) { +func (self *fakeMirrorClient) GetCounts(podFullName string) (int, int) { self.mirrorPodLock.RLock() defer self.mirrorPodLock.RUnlock() return self.createCounts[podFullName], self.deleteCounts[podFullName] } -// Tests that mirror pods are filtered out properly from the pod update. -func TestFilterOutMirrorPods(t *testing.T) { - mirrorPod := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "987654321", - Name: "bar", - Namespace: "default", - Annotations: map[string]string{ - ConfigSourceAnnotationKey: "api", - ConfigMirrorAnnotationKey: "mirror", - }, - }, - } - staticPod := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "123456789", - Name: "bar", - Namespace: "default", - Annotations: map[string]string{ConfigSourceAnnotationKey: "file"}, - }, - } - - expectedPods := []api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - UID: "999999999", - Name: "taco", - Namespace: "default", - Annotations: map[string]string{ConfigSourceAnnotationKey: "api"}, - }, - }, - staticPod, - } - updates := append(expectedPods, mirrorPod) - actualPods, actualMirrorPods := filterAndCategorizePods(updates) - if !reflect.DeepEqual(expectedPods, actualPods) { - t.Errorf("expected %#v, got %#v", expectedPods, actualPods) - } - if _, ok := actualMirrorPods.mirror[GetPodFullName(&mirrorPod)]; !ok { - t.Errorf("mirror pod is not recorded") - } -} - func TestParsePodFullName(t *testing.T) { type nameTuple struct { Name string diff --git a/pkg/kubelet/mirror_manager.go b/pkg/kubelet/mirror_manager.go deleted file mode 100644 index 9ab68b03920..00000000000 --- a/pkg/kubelet/mirror_manager.go +++ /dev/null @@ -1,199 +0,0 @@ -/* -Copyright 2015 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "fmt" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/types" - "github.com/golang/glog" -) - -// Kubelet discover pod updates from 3 sources: file, http, and apiserver. -// Pods from non-apiserver sources are called static pods, and API server is -// not aware of the existence of static pods. In order to monitor the status of -// such pods, kubelet create a mirror pod for each static pod via the API -// server. -// -// A mirror pod has the same pod full name (name and namespace) as its static -// counterpart (albeit different metadata such as UID, etc). By leveraging the -// fact that kubelet reports the pod status using the pod full name, the status -// of the mirror pod always reflects the acutal status of the static pod. -// When a static pod gets deleted, the associated orphaned mirror pods will -// also be removed. -// -// This file includes functions to manage the mirror pods. - -type mirrorManager interface { - CreateMirrorPod(api.Pod, string) error - DeleteMirrorPod(string) error -} - -type basicMirrorManager struct { - // mirror pods are stored in the kubelet directly because they need to be - // in sync with the internal pods. - apiserverClient client.Interface -} - -func newBasicMirrorManager(apiserverClient client.Interface) *basicMirrorManager { - return &basicMirrorManager{apiserverClient: apiserverClient} -} - -// Creates a mirror pod. -func (self *basicMirrorManager) CreateMirrorPod(pod api.Pod, hostname string) error { - if self.apiserverClient == nil { - return nil - } - // Indicate that the pod should be scheduled to the current node. - pod.Spec.Host = hostname - pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType - - _, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod) - return err -} - -// Deletes a mirror pod. -func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error { - if self.apiserverClient == nil { - return nil - } - name, namespace, err := ParsePodFullName(podFullName) - if err != nil { - glog.Errorf("Failed to parse a pod full name %q", podFullName) - return err - } - glog.V(4).Infof("Deleting a mirror pod %q", podFullName) - if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil { - glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) - } - return nil -} - -// Helper functions. -func getPodSource(pod *api.Pod) (string, error) { - if pod.Annotations != nil { - if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok { - return source, nil - } - } - return "", fmt.Errorf("cannot get source of pod %q", pod.UID) -} - -func isStaticPod(pod *api.Pod) bool { - source, err := getPodSource(pod) - return err == nil && source != ApiserverSource -} - -func isMirrorPod(pod *api.Pod) bool { - if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok { - return false - } else { - return value == MirrorType - } -} - -// This function separate the mirror pods from regular pods to -// facilitate pods syncing and mirror pod creation/deletion. -func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, mirrorPods) { - filteredPods := []api.Pod{} - mirrorPods := newMirrorPods() - - for _, pod := range pods { - mirrorPods.Insert(&pod) - if !isMirrorPod(&pod) { - filteredPods = append(filteredPods, pod) - } - } - return filteredPods, *mirrorPods -} - -// mirrorPods is thread-compatible. -// TODO (yujuhong): Replace this with a pod manager that manages both regular -// pods and mirror pods. -type mirrorPods struct { - // Static pod UIDs indexed by pod full name. - static map[string]types.UID - // Mirror pod UIDs indexed by pod full name. - mirror map[string]types.UID - - // Bi-directional UID mappings. - staticToMirror map[types.UID]types.UID - mirrorToStatic map[types.UID]types.UID -} - -func newMirrorPods() *mirrorPods { - mirrorPods := mirrorPods{} - mirrorPods.static = make(map[string]types.UID) - mirrorPods.mirror = make(map[string]types.UID) - mirrorPods.staticToMirror = make(map[types.UID]types.UID) - mirrorPods.mirrorToStatic = make(map[types.UID]types.UID) - return &mirrorPods -} - -func (self *mirrorPods) Insert(pod *api.Pod) { - podFullName := GetPodFullName(pod) - if isMirrorPod(pod) { - self.mirror[podFullName] = pod.UID - } else if isStaticPod(pod) { - self.static[podFullName] = pod.UID - } - staticUID, found1 := self.static[podFullName] - mirrorUID, found2 := self.mirror[podFullName] - // Update the UID mappings. - if found1 && found2 { - self.staticToMirror[staticUID] = mirrorUID - self.mirrorToStatic[mirrorUID] = staticUID - } -} - -func (self *mirrorPods) HasStaticPod(key types.UID) bool { - _, ok := self.mirrorToStatic[key] - return ok -} - -func (self *mirrorPods) HasMirrorPod(key types.UID) bool { - _, ok := self.staticToMirror[key] - return ok -} - -func (self *mirrorPods) GetMirrorUID(key types.UID) (types.UID, bool) { - value, ok := self.staticToMirror[key] - if !ok { - return "", false - } - return value, true -} - -func (self *mirrorPods) GetStaticUID(key types.UID) (types.UID, bool) { - value, ok := self.mirrorToStatic[key] - if !ok { - return "", false - } - return value, true -} - -func (self *mirrorPods) GetOrphanedMirrorPodNames() []string { - orphanedPodNames := []string{} - for podFullName := range self.mirror { - if _, ok := self.static[podFullName]; !ok { - orphanedPodNames = append(orphanedPodNames, podFullName) - } - } - return orphanedPodNames -} diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index f1365277ce5..81951c74610 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -26,68 +26,82 @@ import ( "github.com/golang/glog" ) +// Pod manager stores and manages access to the pods. +// +// Kubelet discovers pod updates from 3 sources: file, http, and apiserver. +// Pods from non-apiserver sources are called static pods, and API server is +// not aware of the existence of static pods. In order to monitor the status of +// such pods, kubelet creates a mirror pod for each static pod via the API +// server. +// +// A mirror pod has the same pod full name (name and namespace) as its static +// counterpart (albeit different metadata such as UID, etc). By leveraging the +// fact that kubelet reports the pod status using the pod full name, the status +// of the mirror pod always reflects the actual status of the static pod. +// When a static pod gets deleted, the associated orphaned mirror pod will +// also be removed. + type podManager interface { - UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) - GetPods() ([]api.Pod, mirrorPods) - GetPodByName(namespace, name string) (*api.Pod, bool) + GetPods() []api.Pod GetPodByFullName(podFullName string) (*api.Pod, bool) - TranslatePodUID(uid types.UID) types.UID - DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) + GetPodByName(namespace, name string) (*api.Pod, bool) + GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod) SetPods(pods []api.Pod) - mirrorManager + UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) + DeleteOrphanedMirrorPods() + TranslatePodUID(uid types.UID) types.UID + mirrorClient } +// All maps in basicPodManager should be set by calling UpdatePods(); +// individual arrays/maps are not immutable and no other methods should attempt +// to modify them. type basicPodManager struct { - // Protects all internal pod storage/mappings. + // Protects all internal maps. lock sync.RWMutex - pods []api.Pod - // Record the set of mirror pods (see mirror_manager.go for more details); - // similar to pods, this is not immutable and is protected by the same podLock. - // Note that basicPodManager.pods do not contain mirror pods as they are - // filtered out beforehand. - mirrorPods mirrorPods - // A mirror pod manager which provides helper functions. - mirrorManager mirrorManager + // Regular pods indexed by UID. + podByUID map[types.UID]*api.Pod + // Mirror pods indexed by UID. + mirrorPodByUID map[types.UID]*api.Pod + + // Pods indexed by full name for easy access. + podByFullName map[string]*api.Pod + mirrorPodByFullName map[string]*api.Pod + + // A mirror pod client to create/delete mirror pods. + mirrorClient mirrorClient } func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { - podManager := &basicPodManager{} - podManager.mirrorManager = newBasicMirrorManager(apiserverClient) - podManager.mirrorPods = *newMirrorPods() - podManager.pods = []api.Pod{} - return podManager -} - -// This method is used only for testing to quickly set the internal pods. -func (self *basicPodManager) SetPods(pods []api.Pod) { - self.pods, self.mirrorPods = filterAndCategorizePods(pods) + pm := &basicPodManager{} + pm.mirrorClient = newBasicMirrorClient(apiserverClient) + pm.SetPods([]api.Pod{}) + return pm } // Update the internal pods with those provided by the update. -// Records new and updated pods in newPods and updatedPods. func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { self.lock.Lock() defer self.lock.Unlock() switch u.Op { case SET: glog.V(3).Infof("SET: Containers changed") - newPods, newMirrorPods := filterAndCategorizePods(u.Pods) - // Store the new pods. Don't worry about filtering host ports since those // pods will never be looked up. existingPods := make(map[types.UID]struct{}) - for i := range self.pods { - existingPods[self.pods[i].UID] = struct{}{} + for uid := range self.podByUID { + existingPods[uid] = struct{}{} } - for _, pod := range newPods { - if _, ok := existingPods[pod.UID]; !ok { - podSyncTypes[pod.UID] = metrics.SyncPodCreate + + // Update the internal pods. + self.setPods(u.Pods) + + for uid := range self.podByUID { + if _, ok := existingPods[uid]; !ok { + podSyncTypes[uid] = metrics.SyncPodCreate } } - // Actually update the pods. - self.pods = newPods - self.mirrorPods = newMirrorPods case UPDATE: glog.V(3).Infof("Update: Containers changed") @@ -96,21 +110,52 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID] for i := range u.Pods { podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate } - allPods := updatePods(u.Pods, self.pods) - self.pods, self.mirrorPods = filterAndCategorizePods(allPods) + allPods := applyUpdates(u.Pods, self.getPods()) + self.setPods(allPods) default: panic("syncLoop does not support incremental changes") } // Mark all remaining pods as sync. - for i := range self.pods { - if _, ok := podSyncTypes[self.pods[i].UID]; !ok { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync + for uid := range self.podByUID { + if _, ok := podSyncTypes[uid]; !ok { + podSyncTypes[uid] = metrics.SyncPodSync } } } -func updatePods(changed []api.Pod, current []api.Pod) []api.Pod { +// Set the internal pods based on the new pods. +func (self *basicPodManager) SetPods(newPods []api.Pod) { + self.lock.Lock() + defer self.lock.Unlock() + self.setPods(newPods) +} + +func (self *basicPodManager) setPods(newPods []api.Pod) { + podByUID := make(map[types.UID]*api.Pod) + mirrorPodByUID := make(map[types.UID]*api.Pod) + podByFullName := make(map[string]*api.Pod) + mirrorPodByFullName := make(map[string]*api.Pod) + + for i := range newPods { + pod := newPods[i] + podFullName := GetPodFullName(&pod) + if isMirrorPod(&pod) { + mirrorPodByUID[pod.UID] = &pod + mirrorPodByFullName[podFullName] = &pod + } else { + podByUID[pod.UID] = &pod + podByFullName[podFullName] = &pod + } + } + + self.podByUID = podByUID + self.podByFullName = podByFullName + self.mirrorPodByUID = mirrorPodByUID + self.mirrorPodByFullName = mirrorPodByFullName +} + +func applyUpdates(changed []api.Pod, current []api.Pod) []api.Pod { updated := []api.Pod{} m := map[types.UID]*api.Pod{} for i := range changed { @@ -132,36 +177,51 @@ func updatePods(changed []api.Pod, current []api.Pod) []api.Pod { return updated } -// GetPods returns all pods bound to the kubelet and their spec, and the mirror -// pod map. -func (self *basicPodManager) GetPods() ([]api.Pod, mirrorPods) { - self.lock.RLock() - defer self.lock.RUnlock() - return append([]api.Pod{}, self.pods...), self.mirrorPods +func (self *basicPodManager) getPods() []api.Pod { + pods := make([]api.Pod, 0, len(self.podByUID)) + for _, pod := range self.podByUID { + pods = append(pods, *pod) + } + return pods } -// GetPodByName provides the first pod that matches namespace and name, as well -// as whether the pod was found. -func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { +// GetPods returns the regular pods bound to the kubelet and their spec. +func (self *basicPodManager) GetPods() []api.Pod { self.lock.RLock() defer self.lock.RUnlock() - for i := range self.pods { - pod := self.pods[i] - if pod.Namespace == namespace && pod.Name == name { - return &pod, true - } + return self.getPods() +} + +// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror +// pod map indexed by full name for existence check. +func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod) { + self.lock.RLock() + defer self.lock.RUnlock() + mirrorPodByFullName := make(map[string]*api.Pod) + for key, value := range self.mirrorPodByFullName { + mirrorPodByFullName[key] = value + } + return self.getPods(), mirrorPodByFullName +} + +// GetPodByName provides the (non-mirror) pod that matches namespace and name, +// as well as whether the pod was found. +func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { + podFullName := BuildPodFullName(name, namespace) + return self.GetPodByFullName(podFullName) +} + +// GetPodByName returns the (non-mirror) pod that matches full name, as well as +// whether the pod was found. +func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { + self.lock.RLock() + defer self.lock.RUnlock() + if pod, ok := self.podByFullName[podFullName]; ok { + return pod, true } return nil, false } -func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { - name, namespace, err := ParsePodFullName(podFullName) - if err != nil { - return nil, false - } - return self.GetPodByName(namespace, name) -} - // If the UID belongs to a mirror pod, maps it to the UID of its static pod. // Otherwise, return the original UID. All public-facing functions should // perform this translation for UIDs because user may provide a mirror pod UID, @@ -173,27 +233,40 @@ func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID { self.lock.RLock() defer self.lock.RUnlock() - staticUID, ok := self.mirrorPods.GetStaticUID(uid) - if ok { - return staticUID - } else { - return uid - } -} - -// Delete all orphaned mirror pods. This method doesn't acquire the lock -// because it assumes the a copy of the mirrorPod is passed as an argument. -func (self *basicPodManager) DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) { - podFullNames := mirrorPods.GetOrphanedMirrorPodNames() - for _, podFullName := range podFullNames { - self.mirrorManager.DeleteMirrorPod(podFullName) + if mirrorPod, ok := self.mirrorPodByUID[uid]; ok { + podFullName := GetPodFullName(mirrorPod) + if pod, ok := self.podByFullName[podFullName]; ok { + return pod.UID + } + } + return uid +} + +func (self *basicPodManager) getFullNameMaps() (map[string]*api.Pod, map[string]*api.Pod) { + self.lock.RLock() + defer self.lock.RUnlock() + return self.podByFullName, self.mirrorPodByFullName +} + +// Delete all mirror pods which do not have associated static pods. This method +// sends deletion requets to the API server, but does NOT modify the internal +// pod storage in basicPodManager. +func (self *basicPodManager) DeleteOrphanedMirrorPods() { + podByFullName, mirrorPodByFullName := self.getFullNameMaps() + + for podFullName := range mirrorPodByFullName { + if _, ok := podByFullName[podFullName]; !ok { + self.mirrorClient.DeleteMirrorPod(podFullName) + } } } +// Creates a mirror pod for the given pod. func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error { - return self.mirrorManager.CreateMirrorPod(pod, hostname) + return self.mirrorClient.CreateMirrorPod(pod, hostname) } +// Delete a mirror pod by name. func (self *basicPodManager) DeleteMirrorPod(podFullName string) error { - return self.mirrorManager.DeleteMirrorPod(podFullName) + return self.mirrorClient.DeleteMirrorPod(podFullName) } diff --git a/pkg/kubelet/pod_manager_test.go b/pkg/kubelet/pod_manager_test.go index 2ca65901bdf..9fd15652bd4 100644 --- a/pkg/kubelet/pod_manager_test.go +++ b/pkg/kubelet/pod_manager_test.go @@ -16,10 +16,84 @@ limitations under the License. package kubelet -// Stub out mirror manager for testing purpose. -func newFakePodManager() (*basicPodManager, *fakeMirrorManager) { +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Stub out mirror client for testing purpose. +func newFakePodManager() (*basicPodManager, *fakeMirrorClient) { podManager := newBasicPodManager(nil) - fakeMirrorManager := newFakeMirrorMananger() - podManager.mirrorManager = fakeMirrorManager - return podManager, fakeMirrorManager + fakeMirrorClient := newFakeMirrorClient() + podManager.mirrorClient = fakeMirrorClient + return podManager, fakeMirrorClient +} + +// Tests that pods/maps are properly set after the pod update, and the basic +// methods work correctly. +func TestGetSetPods(t *testing.T) { + mirrorPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "987654321", + Name: "bar", + Namespace: "default", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + } + staticPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "123456789", + Name: "bar", + Namespace: "default", + Annotations: map[string]string{ConfigSourceAnnotationKey: "file"}, + }, + } + + expectedPods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "999999999", + Name: "taco", + Namespace: "default", + Annotations: map[string]string{ConfigSourceAnnotationKey: "api"}, + }, + }, + staticPod, + } + updates := append(expectedPods, mirrorPod) + podManager, _ := newFakePodManager() + podManager.SetPods(updates) + actualPods := podManager.GetPods() + if !reflect.DeepEqual(expectedPods, actualPods) { + t.Errorf("pods are not set correctly; expected %#v, got %#v", expectedPods, actualPods) + } + actualPod, ok := podManager.mirrorPodByUID[mirrorPod.UID] + if !ok { + t.Errorf("mirror pod %q is not found in the mirror pod map by UID", mirrorPod.UID) + } else if !reflect.DeepEqual(&mirrorPod, actualPod) { + t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod) + } + actualPod, ok = podManager.mirrorPodByFullName[GetPodFullName(&mirrorPod)] + if !ok { + t.Errorf("mirror pod %q is not found in the mirror pod map by full name", GetPodFullName(&mirrorPod)) + } else if !reflect.DeepEqual(&mirrorPod, actualPod) { + t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod) + } + if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != staticPod.UID { + t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v", mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID) + } + actualPod, ok = podManager.GetPodByFullName("bar_default") + if !ok || !reflect.DeepEqual(actualPod, &staticPod) { + t.Errorf("unable to get pod by full name; expected: %#v, got: %#v", staticPod, actualPod) + } + actualPod, ok = podManager.GetPodByName("default", "bar") + if !ok || !reflect.DeepEqual(actualPod, &staticPod) { + t.Errorf("unable to get pod by name; expected: %#v, got: %#v", staticPod, actualPod) + } + } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 272e6ef3793..fef25e02c3d 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -82,7 +82,7 @@ type HostInterface interface { GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetDockerVersion() ([]uint, error) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) - GetPods() ([]api.Pod, mirrorPods) + GetPods() []api.Pod GetPodByName(namespace, name string) (*api.Pod, bool) GetPodStatus(name string) (api.PodStatus, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) @@ -260,7 +260,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { // handlePods returns a list of pod bound to the Kubelet and their spec func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { - pods, _ := s.host.GetPods() + pods := s.host.GetPods() podList := &api.PodList{ Items: pods, } diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index a3409997c97..cc54bb7ddb7 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -44,7 +44,7 @@ type fakeKubelet struct { containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) - podsFunc func() ([]api.Pod, mirrorPods) + podsFunc func() []api.Pod logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) dockerVersionFunc func() ([]uint, error) @@ -79,7 +79,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) return fk.machineInfoFunc() } -func (fk *fakeKubelet) GetPods() ([]api.Pod, mirrorPods) { +func (fk *fakeKubelet) GetPods() []api.Pod { return fk.podsFunc() }