mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-24 11:01:26 +00:00
Merge pull request #5826 from yujuhong/pod_manager
Kubelet: refactor pod manager
This commit is contained in:
commit
270c850b3b
@ -107,7 +107,7 @@ type SyncHandler interface {
|
|||||||
// Syncs current state to match the specified pods. SyncPodType specified what
|
// Syncs current state to match the specified pods. SyncPodType specified what
|
||||||
// type of sync is occuring per pod. StartTime specifies the time at which
|
// type of sync is occuring per pod. StartTime specifies the time at which
|
||||||
// syncing began (for use in monitoring).
|
// syncing began (for use in monitoring).
|
||||||
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
|
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod,
|
||||||
startTime time.Time) error
|
startTime time.Time) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1490,7 +1490,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
||||||
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
|
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
|
||||||
|
mirrorPods map[string]*api.Pod, start time.Time) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
|
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||||
}()
|
}()
|
||||||
@ -1538,7 +1539,8 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run the sync in an async manifest worker.
|
// Run the sync in an async manifest worker.
|
||||||
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
|
_, hasMirrorPod := mirrorPods[podFullName]
|
||||||
|
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
|
||||||
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
|
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -1607,7 +1609,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove any orphaned mirror pods.
|
// Remove any orphaned mirror pods.
|
||||||
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)
|
kl.podManager.DeleteOrphanedMirrorPods()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1733,7 +1735,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pods, mirrorPods := kl.GetPods()
|
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
|
||||||
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
||||||
glog.Errorf("Couldn't sync containers: %v", err)
|
glog.Errorf("Couldn't sync containers: %v", err)
|
||||||
}
|
}
|
||||||
@ -1798,8 +1800,8 @@ func (kl *Kubelet) GetHostname() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||||
// pod map.
|
// pods.
|
||||||
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
|
func (kl *Kubelet) GetPods() []api.Pod {
|
||||||
return kl.podManager.GetPods()
|
return kl.podManager.GetPods()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ type TestKubelet struct {
|
|||||||
fakeCadvisor *cadvisor.Mock
|
fakeCadvisor *cadvisor.Mock
|
||||||
fakeKubeClient *client.Fake
|
fakeKubeClient *client.Fake
|
||||||
waitGroup *sync.WaitGroup
|
waitGroup *sync.WaitGroup
|
||||||
fakeMirrorManager *fakeMirrorManager
|
fakeMirrorClient *fakeMirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestKubelet(t *testing.T) *TestKubelet {
|
func newTestKubelet(t *testing.T) *TestKubelet {
|
||||||
@ -105,9 +105,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
}
|
}
|
||||||
mockCadvisor := &cadvisor.Mock{}
|
mockCadvisor := &cadvisor.Mock{}
|
||||||
kubelet.cadvisor = mockCadvisor
|
kubelet.cadvisor = mockCadvisor
|
||||||
podManager, fakeMirrorManager := newFakePodManager()
|
podManager, fakeMirrorClient := newFakePodManager()
|
||||||
kubelet.podManager = podManager
|
kubelet.podManager = podManager
|
||||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorManager}
|
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
||||||
@ -450,7 +450,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -485,7 +485,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -536,7 +536,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -591,7 +591,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -643,7 +643,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -702,7 +702,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitGroup.Add(1)
|
waitGroup.Add(1)
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -773,7 +773,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitGroup.Add(2)
|
waitGroup.Add(2)
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -814,7 +814,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
|||||||
ID: "9876",
|
ID: "9876",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
|
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
// Validate nothing happened.
|
// Validate nothing happened.
|
||||||
@ -822,7 +822,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
|||||||
fakeDocker.ClearCalls()
|
fakeDocker.ClearCalls()
|
||||||
|
|
||||||
ready = true
|
ready = true
|
||||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
|
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
|
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
|
||||||
@ -861,7 +861,7 @@ func TestSyncPodsDeletes(t *testing.T) {
|
|||||||
ID: "4567",
|
ID: "4567",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
|
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1744,7 +1744,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, emptyPodUIDs, *newMirrorPods(), time.Now())
|
}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -3008,7 +3008,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
|||||||
t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
|
t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
|
||||||
}
|
}
|
||||||
// Sync with empty pods so that the entry in status map will be removed.
|
// Sync with empty pods so that the entry in status map will be removed.
|
||||||
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
|
kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||||
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil {
|
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil {
|
||||||
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
|
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
|
||||||
}
|
}
|
||||||
@ -3236,7 +3236,7 @@ func TestUpdateNodeStatusError(t *testing.T) {
|
|||||||
func TestCreateMirrorPod(t *testing.T) {
|
func TestCreateMirrorPod(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kl := testKubelet.kubelet
|
kl := testKubelet.kubelet
|
||||||
manager := testKubelet.fakeMirrorManager
|
manager := testKubelet.fakeMirrorClient
|
||||||
pod := api.Pod{
|
pod := api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
UID: "12345678",
|
UID: "12345678",
|
||||||
@ -3267,7 +3267,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
|
|||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||||
kl := testKubelet.kubelet
|
kl := testKubelet.kubelet
|
||||||
manager := testKubelet.fakeMirrorManager
|
manager := testKubelet.fakeMirrorClient
|
||||||
orphanPods := []api.Pod{
|
orphanPods := []api.Pod{
|
||||||
{
|
{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -3293,12 +3293,10 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
mirrorPods := newMirrorPods()
|
kl.podManager.SetPods(orphanPods)
|
||||||
for _, pod := range orphanPods {
|
pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap()
|
||||||
mirrorPods.Insert(&pod)
|
|
||||||
}
|
|
||||||
// Sync with an empty pod list to delete all mirror pods.
|
// Sync with an empty pod list to delete all mirror pods.
|
||||||
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
|
err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
95
pkg/kubelet/mirror_client.go
Normal file
95
pkg/kubelet/mirror_client.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kubelet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Mirror client is used to create/delete a mirror pod.
|
||||||
|
|
||||||
|
type mirrorClient interface {
|
||||||
|
CreateMirrorPod(api.Pod, string) error
|
||||||
|
DeleteMirrorPod(string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type basicMirrorClient struct {
|
||||||
|
// mirror pods are stored in the kubelet directly because they need to be
|
||||||
|
// in sync with the internal pods.
|
||||||
|
apiserverClient client.Interface
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient {
|
||||||
|
return &basicMirrorClient{apiserverClient: apiserverClient}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a mirror pod.
|
||||||
|
func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod, hostname string) error {
|
||||||
|
if self.apiserverClient == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Indicate that the pod should be scheduled to the current node.
|
||||||
|
pod.Spec.Host = hostname
|
||||||
|
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
|
||||||
|
|
||||||
|
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes a mirror pod.
|
||||||
|
func (self *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
|
||||||
|
if self.apiserverClient == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
name, namespace, err := ParsePodFullName(podFullName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to parse a pod full name %q", podFullName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
|
||||||
|
if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil {
|
||||||
|
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper functions.
|
||||||
|
func getPodSource(pod *api.Pod) (string, error) {
|
||||||
|
if pod.Annotations != nil {
|
||||||
|
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
|
||||||
|
return source, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isStaticPod(pod *api.Pod) bool {
|
||||||
|
source, err := getPodSource(pod)
|
||||||
|
return err == nil && source != ApiserverSource
|
||||||
|
}
|
||||||
|
|
||||||
|
func isMirrorPod(pod *api.Pod) bool {
|
||||||
|
if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok {
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
return value == MirrorType
|
||||||
|
}
|
||||||
|
}
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package kubelet
|
package kubelet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -25,7 +24,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeMirrorManager struct {
|
type fakeMirrorClient struct {
|
||||||
mirrorPodLock sync.RWMutex
|
mirrorPodLock sync.RWMutex
|
||||||
// Note that a real mirror manager does not store the mirror pods in
|
// Note that a real mirror manager does not store the mirror pods in
|
||||||
// itself. This fake manager does this to track calls.
|
// itself. This fake manager does this to track calls.
|
||||||
@ -34,7 +33,7 @@ type fakeMirrorManager struct {
|
|||||||
deleteCounts map[string]int
|
deleteCounts map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error {
|
func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error {
|
||||||
self.mirrorPodLock.Lock()
|
self.mirrorPodLock.Lock()
|
||||||
defer self.mirrorPodLock.Unlock()
|
defer self.mirrorPodLock.Unlock()
|
||||||
podFullName := GetPodFullName(&pod)
|
podFullName := GetPodFullName(&pod)
|
||||||
@ -43,7 +42,7 @@ func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error {
|
func (self *fakeMirrorClient) DeleteMirrorPod(podFullName string) error {
|
||||||
self.mirrorPodLock.Lock()
|
self.mirrorPodLock.Lock()
|
||||||
defer self.mirrorPodLock.Unlock()
|
defer self.mirrorPodLock.Unlock()
|
||||||
self.mirrorPods.Delete(podFullName)
|
self.mirrorPods.Delete(podFullName)
|
||||||
@ -51,81 +50,38 @@ func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFakeMirrorMananger() *fakeMirrorManager {
|
func newFakeMirrorClient() *fakeMirrorClient {
|
||||||
m := fakeMirrorManager{}
|
m := fakeMirrorClient{}
|
||||||
m.mirrorPods = util.NewStringSet()
|
m.mirrorPods = util.NewStringSet()
|
||||||
m.createCounts = make(map[string]int)
|
m.createCounts = make(map[string]int)
|
||||||
m.deleteCounts = make(map[string]int)
|
m.deleteCounts = make(map[string]int)
|
||||||
return &m
|
return &m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) HasPod(podFullName string) bool {
|
func (self *fakeMirrorClient) HasPod(podFullName string) bool {
|
||||||
self.mirrorPodLock.RLock()
|
self.mirrorPodLock.RLock()
|
||||||
defer self.mirrorPodLock.RUnlock()
|
defer self.mirrorPodLock.RUnlock()
|
||||||
return self.mirrorPods.Has(podFullName)
|
return self.mirrorPods.Has(podFullName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) NumOfPods() int {
|
func (self *fakeMirrorClient) NumOfPods() int {
|
||||||
self.mirrorPodLock.RLock()
|
self.mirrorPodLock.RLock()
|
||||||
defer self.mirrorPodLock.RUnlock()
|
defer self.mirrorPodLock.RUnlock()
|
||||||
return self.mirrorPods.Len()
|
return self.mirrorPods.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) GetPods() []string {
|
func (self *fakeMirrorClient) GetPods() []string {
|
||||||
self.mirrorPodLock.RLock()
|
self.mirrorPodLock.RLock()
|
||||||
defer self.mirrorPodLock.RUnlock()
|
defer self.mirrorPodLock.RUnlock()
|
||||||
return self.mirrorPods.List()
|
return self.mirrorPods.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *fakeMirrorManager) GetCounts(podFullName string) (int, int) {
|
func (self *fakeMirrorClient) GetCounts(podFullName string) (int, int) {
|
||||||
self.mirrorPodLock.RLock()
|
self.mirrorPodLock.RLock()
|
||||||
defer self.mirrorPodLock.RUnlock()
|
defer self.mirrorPodLock.RUnlock()
|
||||||
return self.createCounts[podFullName], self.deleteCounts[podFullName]
|
return self.createCounts[podFullName], self.deleteCounts[podFullName]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that mirror pods are filtered out properly from the pod update.
|
|
||||||
func TestFilterOutMirrorPods(t *testing.T) {
|
|
||||||
mirrorPod := api.Pod{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
UID: "987654321",
|
|
||||||
Name: "bar",
|
|
||||||
Namespace: "default",
|
|
||||||
Annotations: map[string]string{
|
|
||||||
ConfigSourceAnnotationKey: "api",
|
|
||||||
ConfigMirrorAnnotationKey: "mirror",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
staticPod := api.Pod{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
UID: "123456789",
|
|
||||||
Name: "bar",
|
|
||||||
Namespace: "default",
|
|
||||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "file"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedPods := []api.Pod{
|
|
||||||
{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
UID: "999999999",
|
|
||||||
Name: "taco",
|
|
||||||
Namespace: "default",
|
|
||||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "api"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
staticPod,
|
|
||||||
}
|
|
||||||
updates := append(expectedPods, mirrorPod)
|
|
||||||
actualPods, actualMirrorPods := filterAndCategorizePods(updates)
|
|
||||||
if !reflect.DeepEqual(expectedPods, actualPods) {
|
|
||||||
t.Errorf("expected %#v, got %#v", expectedPods, actualPods)
|
|
||||||
}
|
|
||||||
if _, ok := actualMirrorPods.mirror[GetPodFullName(&mirrorPod)]; !ok {
|
|
||||||
t.Errorf("mirror pod is not recorded")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParsePodFullName(t *testing.T) {
|
func TestParsePodFullName(t *testing.T) {
|
||||||
type nameTuple struct {
|
type nameTuple struct {
|
||||||
Name string
|
Name string
|
@ -1,199 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2015 Google Inc. All rights reserved.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package kubelet
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Kubelet discover pod updates from 3 sources: file, http, and apiserver.
|
|
||||||
// Pods from non-apiserver sources are called static pods, and API server is
|
|
||||||
// not aware of the existence of static pods. In order to monitor the status of
|
|
||||||
// such pods, kubelet create a mirror pod for each static pod via the API
|
|
||||||
// server.
|
|
||||||
//
|
|
||||||
// A mirror pod has the same pod full name (name and namespace) as its static
|
|
||||||
// counterpart (albeit different metadata such as UID, etc). By leveraging the
|
|
||||||
// fact that kubelet reports the pod status using the pod full name, the status
|
|
||||||
// of the mirror pod always reflects the acutal status of the static pod.
|
|
||||||
// When a static pod gets deleted, the associated orphaned mirror pods will
|
|
||||||
// also be removed.
|
|
||||||
//
|
|
||||||
// This file includes functions to manage the mirror pods.
|
|
||||||
|
|
||||||
type mirrorManager interface {
|
|
||||||
CreateMirrorPod(api.Pod, string) error
|
|
||||||
DeleteMirrorPod(string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type basicMirrorManager struct {
|
|
||||||
// mirror pods are stored in the kubelet directly because they need to be
|
|
||||||
// in sync with the internal pods.
|
|
||||||
apiserverClient client.Interface
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBasicMirrorManager(apiserverClient client.Interface) *basicMirrorManager {
|
|
||||||
return &basicMirrorManager{apiserverClient: apiserverClient}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creates a mirror pod.
|
|
||||||
func (self *basicMirrorManager) CreateMirrorPod(pod api.Pod, hostname string) error {
|
|
||||||
if self.apiserverClient == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Indicate that the pod should be scheduled to the current node.
|
|
||||||
pod.Spec.Host = hostname
|
|
||||||
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
|
|
||||||
|
|
||||||
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deletes a mirror pod.
|
|
||||||
func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
|
|
||||||
if self.apiserverClient == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
name, namespace, err := ParsePodFullName(podFullName)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to parse a pod full name %q", podFullName)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
|
|
||||||
if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil {
|
|
||||||
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper functions.
|
|
||||||
func getPodSource(pod *api.Pod) (string, error) {
|
|
||||||
if pod.Annotations != nil {
|
|
||||||
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
|
|
||||||
return source, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func isStaticPod(pod *api.Pod) bool {
|
|
||||||
source, err := getPodSource(pod)
|
|
||||||
return err == nil && source != ApiserverSource
|
|
||||||
}
|
|
||||||
|
|
||||||
func isMirrorPod(pod *api.Pod) bool {
|
|
||||||
if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok {
|
|
||||||
return false
|
|
||||||
} else {
|
|
||||||
return value == MirrorType
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function separate the mirror pods from regular pods to
|
|
||||||
// facilitate pods syncing and mirror pod creation/deletion.
|
|
||||||
func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, mirrorPods) {
|
|
||||||
filteredPods := []api.Pod{}
|
|
||||||
mirrorPods := newMirrorPods()
|
|
||||||
|
|
||||||
for _, pod := range pods {
|
|
||||||
mirrorPods.Insert(&pod)
|
|
||||||
if !isMirrorPod(&pod) {
|
|
||||||
filteredPods = append(filteredPods, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return filteredPods, *mirrorPods
|
|
||||||
}
|
|
||||||
|
|
||||||
// mirrorPods is thread-compatible.
|
|
||||||
// TODO (yujuhong): Replace this with a pod manager that manages both regular
|
|
||||||
// pods and mirror pods.
|
|
||||||
type mirrorPods struct {
|
|
||||||
// Static pod UIDs indexed by pod full name.
|
|
||||||
static map[string]types.UID
|
|
||||||
// Mirror pod UIDs indexed by pod full name.
|
|
||||||
mirror map[string]types.UID
|
|
||||||
|
|
||||||
// Bi-directional UID mappings.
|
|
||||||
staticToMirror map[types.UID]types.UID
|
|
||||||
mirrorToStatic map[types.UID]types.UID
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMirrorPods() *mirrorPods {
|
|
||||||
mirrorPods := mirrorPods{}
|
|
||||||
mirrorPods.static = make(map[string]types.UID)
|
|
||||||
mirrorPods.mirror = make(map[string]types.UID)
|
|
||||||
mirrorPods.staticToMirror = make(map[types.UID]types.UID)
|
|
||||||
mirrorPods.mirrorToStatic = make(map[types.UID]types.UID)
|
|
||||||
return &mirrorPods
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) Insert(pod *api.Pod) {
|
|
||||||
podFullName := GetPodFullName(pod)
|
|
||||||
if isMirrorPod(pod) {
|
|
||||||
self.mirror[podFullName] = pod.UID
|
|
||||||
} else if isStaticPod(pod) {
|
|
||||||
self.static[podFullName] = pod.UID
|
|
||||||
}
|
|
||||||
staticUID, found1 := self.static[podFullName]
|
|
||||||
mirrorUID, found2 := self.mirror[podFullName]
|
|
||||||
// Update the UID mappings.
|
|
||||||
if found1 && found2 {
|
|
||||||
self.staticToMirror[staticUID] = mirrorUID
|
|
||||||
self.mirrorToStatic[mirrorUID] = staticUID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) HasStaticPod(key types.UID) bool {
|
|
||||||
_, ok := self.mirrorToStatic[key]
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) HasMirrorPod(key types.UID) bool {
|
|
||||||
_, ok := self.staticToMirror[key]
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) GetMirrorUID(key types.UID) (types.UID, bool) {
|
|
||||||
value, ok := self.staticToMirror[key]
|
|
||||||
if !ok {
|
|
||||||
return "", false
|
|
||||||
}
|
|
||||||
return value, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) GetStaticUID(key types.UID) (types.UID, bool) {
|
|
||||||
value, ok := self.mirrorToStatic[key]
|
|
||||||
if !ok {
|
|
||||||
return "", false
|
|
||||||
}
|
|
||||||
return value, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *mirrorPods) GetOrphanedMirrorPodNames() []string {
|
|
||||||
orphanedPodNames := []string{}
|
|
||||||
for podFullName := range self.mirror {
|
|
||||||
if _, ok := self.static[podFullName]; !ok {
|
|
||||||
orphanedPodNames = append(orphanedPodNames, podFullName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return orphanedPodNames
|
|
||||||
}
|
|
@ -26,68 +26,82 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Pod manager stores and manages access to the pods.
|
||||||
|
//
|
||||||
|
// Kubelet discovers pod updates from 3 sources: file, http, and apiserver.
|
||||||
|
// Pods from non-apiserver sources are called static pods, and API server is
|
||||||
|
// not aware of the existence of static pods. In order to monitor the status of
|
||||||
|
// such pods, kubelet creates a mirror pod for each static pod via the API
|
||||||
|
// server.
|
||||||
|
//
|
||||||
|
// A mirror pod has the same pod full name (name and namespace) as its static
|
||||||
|
// counterpart (albeit different metadata such as UID, etc). By leveraging the
|
||||||
|
// fact that kubelet reports the pod status using the pod full name, the status
|
||||||
|
// of the mirror pod always reflects the actual status of the static pod.
|
||||||
|
// When a static pod gets deleted, the associated orphaned mirror pod will
|
||||||
|
// also be removed.
|
||||||
|
|
||||||
type podManager interface {
|
type podManager interface {
|
||||||
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
|
GetPods() []api.Pod
|
||||||
GetPods() ([]api.Pod, mirrorPods)
|
|
||||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
|
||||||
GetPodByFullName(podFullName string) (*api.Pod, bool)
|
GetPodByFullName(podFullName string) (*api.Pod, bool)
|
||||||
TranslatePodUID(uid types.UID) types.UID
|
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||||
DeleteOrphanedMirrorPods(mirrorPods *mirrorPods)
|
GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod)
|
||||||
SetPods(pods []api.Pod)
|
SetPods(pods []api.Pod)
|
||||||
mirrorManager
|
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
|
||||||
|
DeleteOrphanedMirrorPods()
|
||||||
|
TranslatePodUID(uid types.UID) types.UID
|
||||||
|
mirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All maps in basicPodManager should be set by calling UpdatePods();
|
||||||
|
// individual arrays/maps are not immutable and no other methods should attempt
|
||||||
|
// to modify them.
|
||||||
type basicPodManager struct {
|
type basicPodManager struct {
|
||||||
// Protects all internal pod storage/mappings.
|
// Protects all internal maps.
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
pods []api.Pod
|
|
||||||
// Record the set of mirror pods (see mirror_manager.go for more details);
|
|
||||||
// similar to pods, this is not immutable and is protected by the same podLock.
|
|
||||||
// Note that basicPodManager.pods do not contain mirror pods as they are
|
|
||||||
// filtered out beforehand.
|
|
||||||
mirrorPods mirrorPods
|
|
||||||
|
|
||||||
// A mirror pod manager which provides helper functions.
|
// Regular pods indexed by UID.
|
||||||
mirrorManager mirrorManager
|
podByUID map[types.UID]*api.Pod
|
||||||
|
// Mirror pods indexed by UID.
|
||||||
|
mirrorPodByUID map[types.UID]*api.Pod
|
||||||
|
|
||||||
|
// Pods indexed by full name for easy access.
|
||||||
|
podByFullName map[string]*api.Pod
|
||||||
|
mirrorPodByFullName map[string]*api.Pod
|
||||||
|
|
||||||
|
// A mirror pod client to create/delete mirror pods.
|
||||||
|
mirrorClient mirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
|
func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
|
||||||
podManager := &basicPodManager{}
|
pm := &basicPodManager{}
|
||||||
podManager.mirrorManager = newBasicMirrorManager(apiserverClient)
|
pm.mirrorClient = newBasicMirrorClient(apiserverClient)
|
||||||
podManager.mirrorPods = *newMirrorPods()
|
pm.SetPods([]api.Pod{})
|
||||||
podManager.pods = []api.Pod{}
|
return pm
|
||||||
return podManager
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method is used only for testing to quickly set the internal pods.
|
|
||||||
func (self *basicPodManager) SetPods(pods []api.Pod) {
|
|
||||||
self.pods, self.mirrorPods = filterAndCategorizePods(pods)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the internal pods with those provided by the update.
|
// Update the internal pods with those provided by the update.
|
||||||
// Records new and updated pods in newPods and updatedPods.
|
|
||||||
func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
switch u.Op {
|
switch u.Op {
|
||||||
case SET:
|
case SET:
|
||||||
glog.V(3).Infof("SET: Containers changed")
|
glog.V(3).Infof("SET: Containers changed")
|
||||||
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
|
|
||||||
|
|
||||||
// Store the new pods. Don't worry about filtering host ports since those
|
// Store the new pods. Don't worry about filtering host ports since those
|
||||||
// pods will never be looked up.
|
// pods will never be looked up.
|
||||||
existingPods := make(map[types.UID]struct{})
|
existingPods := make(map[types.UID]struct{})
|
||||||
for i := range self.pods {
|
for uid := range self.podByUID {
|
||||||
existingPods[self.pods[i].UID] = struct{}{}
|
existingPods[uid] = struct{}{}
|
||||||
}
|
}
|
||||||
for _, pod := range newPods {
|
|
||||||
if _, ok := existingPods[pod.UID]; !ok {
|
// Update the internal pods.
|
||||||
podSyncTypes[pod.UID] = metrics.SyncPodCreate
|
self.setPods(u.Pods)
|
||||||
|
|
||||||
|
for uid := range self.podByUID {
|
||||||
|
if _, ok := existingPods[uid]; !ok {
|
||||||
|
podSyncTypes[uid] = metrics.SyncPodCreate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Actually update the pods.
|
|
||||||
self.pods = newPods
|
|
||||||
self.mirrorPods = newMirrorPods
|
|
||||||
case UPDATE:
|
case UPDATE:
|
||||||
glog.V(3).Infof("Update: Containers changed")
|
glog.V(3).Infof("Update: Containers changed")
|
||||||
|
|
||||||
@ -96,21 +110,52 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]
|
|||||||
for i := range u.Pods {
|
for i := range u.Pods {
|
||||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
|
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
|
||||||
}
|
}
|
||||||
allPods := updatePods(u.Pods, self.pods)
|
allPods := applyUpdates(u.Pods, self.getPods())
|
||||||
self.pods, self.mirrorPods = filterAndCategorizePods(allPods)
|
self.setPods(allPods)
|
||||||
default:
|
default:
|
||||||
panic("syncLoop does not support incremental changes")
|
panic("syncLoop does not support incremental changes")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark all remaining pods as sync.
|
// Mark all remaining pods as sync.
|
||||||
for i := range self.pods {
|
for uid := range self.podByUID {
|
||||||
if _, ok := podSyncTypes[self.pods[i].UID]; !ok {
|
if _, ok := podSyncTypes[uid]; !ok {
|
||||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
|
podSyncTypes[uid] = metrics.SyncPodSync
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
|
// Set the internal pods based on the new pods.
|
||||||
|
func (self *basicPodManager) SetPods(newPods []api.Pod) {
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
|
self.setPods(newPods)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *basicPodManager) setPods(newPods []api.Pod) {
|
||||||
|
podByUID := make(map[types.UID]*api.Pod)
|
||||||
|
mirrorPodByUID := make(map[types.UID]*api.Pod)
|
||||||
|
podByFullName := make(map[string]*api.Pod)
|
||||||
|
mirrorPodByFullName := make(map[string]*api.Pod)
|
||||||
|
|
||||||
|
for i := range newPods {
|
||||||
|
pod := newPods[i]
|
||||||
|
podFullName := GetPodFullName(&pod)
|
||||||
|
if isMirrorPod(&pod) {
|
||||||
|
mirrorPodByUID[pod.UID] = &pod
|
||||||
|
mirrorPodByFullName[podFullName] = &pod
|
||||||
|
} else {
|
||||||
|
podByUID[pod.UID] = &pod
|
||||||
|
podByFullName[podFullName] = &pod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.podByUID = podByUID
|
||||||
|
self.podByFullName = podByFullName
|
||||||
|
self.mirrorPodByUID = mirrorPodByUID
|
||||||
|
self.mirrorPodByFullName = mirrorPodByFullName
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyUpdates(changed []api.Pod, current []api.Pod) []api.Pod {
|
||||||
updated := []api.Pod{}
|
updated := []api.Pod{}
|
||||||
m := map[types.UID]*api.Pod{}
|
m := map[types.UID]*api.Pod{}
|
||||||
for i := range changed {
|
for i := range changed {
|
||||||
@ -132,34 +177,49 @@ func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
|
|||||||
return updated
|
return updated
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
func (self *basicPodManager) getPods() []api.Pod {
|
||||||
// pod map.
|
pods := make([]api.Pod, 0, len(self.podByUID))
|
||||||
func (self *basicPodManager) GetPods() ([]api.Pod, mirrorPods) {
|
for _, pod := range self.podByUID {
|
||||||
self.lock.RLock()
|
pods = append(pods, *pod)
|
||||||
defer self.lock.RUnlock()
|
}
|
||||||
return append([]api.Pod{}, self.pods...), self.mirrorPods
|
return pods
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPodByName provides the first pod that matches namespace and name, as well
|
// GetPods returns the regular pods bound to the kubelet and their spec.
|
||||||
// as whether the pod was found.
|
func (self *basicPodManager) GetPods() []api.Pod {
|
||||||
|
self.lock.RLock()
|
||||||
|
defer self.lock.RUnlock()
|
||||||
|
return self.getPods()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
|
||||||
|
// pod map indexed by full name for existence check.
|
||||||
|
func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod) {
|
||||||
|
self.lock.RLock()
|
||||||
|
defer self.lock.RUnlock()
|
||||||
|
mirrorPodByFullName := make(map[string]*api.Pod)
|
||||||
|
for key, value := range self.mirrorPodByFullName {
|
||||||
|
mirrorPodByFullName[key] = value
|
||||||
|
}
|
||||||
|
return self.getPods(), mirrorPodByFullName
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
|
||||||
|
// as well as whether the pod was found.
|
||||||
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||||
self.lock.RLock()
|
podFullName := BuildPodFullName(name, namespace)
|
||||||
defer self.lock.RUnlock()
|
return self.GetPodByFullName(podFullName)
|
||||||
for i := range self.pods {
|
|
||||||
pod := self.pods[i]
|
|
||||||
if pod.Namespace == namespace && pod.Name == name {
|
|
||||||
return &pod, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPodByName returns the (non-mirror) pod that matches full name, as well as
|
||||||
|
// whether the pod was found.
|
||||||
func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||||
name, namespace, err := ParsePodFullName(podFullName)
|
self.lock.RLock()
|
||||||
if err != nil {
|
defer self.lock.RUnlock()
|
||||||
return nil, false
|
if pod, ok := self.podByFullName[podFullName]; ok {
|
||||||
|
return pod, true
|
||||||
}
|
}
|
||||||
return self.GetPodByName(namespace, name)
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
|
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
|
||||||
@ -173,27 +233,40 @@ func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
|
|||||||
|
|
||||||
self.lock.RLock()
|
self.lock.RLock()
|
||||||
defer self.lock.RUnlock()
|
defer self.lock.RUnlock()
|
||||||
staticUID, ok := self.mirrorPods.GetStaticUID(uid)
|
if mirrorPod, ok := self.mirrorPodByUID[uid]; ok {
|
||||||
if ok {
|
podFullName := GetPodFullName(mirrorPod)
|
||||||
return staticUID
|
if pod, ok := self.podByFullName[podFullName]; ok {
|
||||||
} else {
|
return pod.UID
|
||||||
|
}
|
||||||
|
}
|
||||||
return uid
|
return uid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *basicPodManager) getFullNameMaps() (map[string]*api.Pod, map[string]*api.Pod) {
|
||||||
|
self.lock.RLock()
|
||||||
|
defer self.lock.RUnlock()
|
||||||
|
return self.podByFullName, self.mirrorPodByFullName
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete all mirror pods which do not have associated static pods. This method
|
||||||
|
// sends deletion requets to the API server, but does NOT modify the internal
|
||||||
|
// pod storage in basicPodManager.
|
||||||
|
func (self *basicPodManager) DeleteOrphanedMirrorPods() {
|
||||||
|
podByFullName, mirrorPodByFullName := self.getFullNameMaps()
|
||||||
|
|
||||||
|
for podFullName := range mirrorPodByFullName {
|
||||||
|
if _, ok := podByFullName[podFullName]; !ok {
|
||||||
|
self.mirrorClient.DeleteMirrorPod(podFullName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all orphaned mirror pods. This method doesn't acquire the lock
|
// Creates a mirror pod for the given pod.
|
||||||
// because it assumes the a copy of the mirrorPod is passed as an argument.
|
|
||||||
func (self *basicPodManager) DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) {
|
|
||||||
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
|
|
||||||
for _, podFullName := range podFullNames {
|
|
||||||
self.mirrorManager.DeleteMirrorPod(podFullName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error {
|
func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error {
|
||||||
return self.mirrorManager.CreateMirrorPod(pod, hostname)
|
return self.mirrorClient.CreateMirrorPod(pod, hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete a mirror pod by name.
|
||||||
func (self *basicPodManager) DeleteMirrorPod(podFullName string) error {
|
func (self *basicPodManager) DeleteMirrorPod(podFullName string) error {
|
||||||
return self.mirrorManager.DeleteMirrorPod(podFullName)
|
return self.mirrorClient.DeleteMirrorPod(podFullName)
|
||||||
}
|
}
|
||||||
|
@ -16,10 +16,84 @@ limitations under the License.
|
|||||||
|
|
||||||
package kubelet
|
package kubelet
|
||||||
|
|
||||||
// Stub out mirror manager for testing purpose.
|
import (
|
||||||
func newFakePodManager() (*basicPodManager, *fakeMirrorManager) {
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Stub out mirror client for testing purpose.
|
||||||
|
func newFakePodManager() (*basicPodManager, *fakeMirrorClient) {
|
||||||
podManager := newBasicPodManager(nil)
|
podManager := newBasicPodManager(nil)
|
||||||
fakeMirrorManager := newFakeMirrorMananger()
|
fakeMirrorClient := newFakeMirrorClient()
|
||||||
podManager.mirrorManager = fakeMirrorManager
|
podManager.mirrorClient = fakeMirrorClient
|
||||||
return podManager, fakeMirrorManager
|
return podManager, fakeMirrorClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that pods/maps are properly set after the pod update, and the basic
|
||||||
|
// methods work correctly.
|
||||||
|
func TestGetSetPods(t *testing.T) {
|
||||||
|
mirrorPod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "987654321",
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
Annotations: map[string]string{
|
||||||
|
ConfigSourceAnnotationKey: "api",
|
||||||
|
ConfigMirrorAnnotationKey: "mirror",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
staticPod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "123456789",
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
Annotations: map[string]string{ConfigSourceAnnotationKey: "file"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedPods := []api.Pod{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "999999999",
|
||||||
|
Name: "taco",
|
||||||
|
Namespace: "default",
|
||||||
|
Annotations: map[string]string{ConfigSourceAnnotationKey: "api"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
staticPod,
|
||||||
|
}
|
||||||
|
updates := append(expectedPods, mirrorPod)
|
||||||
|
podManager, _ := newFakePodManager()
|
||||||
|
podManager.SetPods(updates)
|
||||||
|
actualPods := podManager.GetPods()
|
||||||
|
if !reflect.DeepEqual(expectedPods, actualPods) {
|
||||||
|
t.Errorf("pods are not set correctly; expected %#v, got %#v", expectedPods, actualPods)
|
||||||
|
}
|
||||||
|
actualPod, ok := podManager.mirrorPodByUID[mirrorPod.UID]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("mirror pod %q is not found in the mirror pod map by UID", mirrorPod.UID)
|
||||||
|
} else if !reflect.DeepEqual(&mirrorPod, actualPod) {
|
||||||
|
t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod)
|
||||||
|
}
|
||||||
|
actualPod, ok = podManager.mirrorPodByFullName[GetPodFullName(&mirrorPod)]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("mirror pod %q is not found in the mirror pod map by full name", GetPodFullName(&mirrorPod))
|
||||||
|
} else if !reflect.DeepEqual(&mirrorPod, actualPod) {
|
||||||
|
t.Errorf("mirror pod is recorded incorrectly. expect: %v, got: %v", mirrorPod, actualPod)
|
||||||
|
}
|
||||||
|
if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != staticPod.UID {
|
||||||
|
t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v", mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID)
|
||||||
|
}
|
||||||
|
actualPod, ok = podManager.GetPodByFullName("bar_default")
|
||||||
|
if !ok || !reflect.DeepEqual(actualPod, &staticPod) {
|
||||||
|
t.Errorf("unable to get pod by full name; expected: %#v, got: %#v", staticPod, actualPod)
|
||||||
|
}
|
||||||
|
actualPod, ok = podManager.GetPodByName("default", "bar")
|
||||||
|
if !ok || !reflect.DeepEqual(actualPod, &staticPod) {
|
||||||
|
t.Errorf("unable to get pod by name; expected: %#v, got: %#v", staticPod, actualPod)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ type HostInterface interface {
|
|||||||
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||||
GetDockerVersion() ([]uint, error)
|
GetDockerVersion() ([]uint, error)
|
||||||
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
|
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
|
||||||
GetPods() ([]api.Pod, mirrorPods)
|
GetPods() []api.Pod
|
||||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||||
GetPodStatus(name string) (api.PodStatus, error)
|
GetPodStatus(name string) (api.PodStatus, error)
|
||||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||||
@ -260,7 +260,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
// handlePods returns a list of pod bound to the Kubelet and their spec
|
// handlePods returns a list of pod bound to the Kubelet and their spec
|
||||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
||||||
pods, _ := s.host.GetPods()
|
pods := s.host.GetPods()
|
||||||
podList := &api.PodList{
|
podList := &api.PodList{
|
||||||
Items: pods,
|
Items: pods,
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ type fakeKubelet struct {
|
|||||||
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||||
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||||
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
|
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
|
||||||
podsFunc func() ([]api.Pod, mirrorPods)
|
podsFunc func() []api.Pod
|
||||||
logFunc func(w http.ResponseWriter, req *http.Request)
|
logFunc func(w http.ResponseWriter, req *http.Request)
|
||||||
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
|
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
|
||||||
dockerVersionFunc func() ([]uint, error)
|
dockerVersionFunc func() ([]uint, error)
|
||||||
@ -79,7 +79,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
|
|||||||
return fk.machineInfoFunc()
|
return fk.machineInfoFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fk *fakeKubelet) GetPods() ([]api.Pod, mirrorPods) {
|
func (fk *fakeKubelet) GetPods() []api.Pod {
|
||||||
return fk.podsFunc()
|
return fk.podsFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user