mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-13 21:25:09 +00:00
Merge pull request #5748 from yujuhong/refactor
Kubelet: add podManager for managing internal pod storage
This commit is contained in:
@@ -229,6 +229,8 @@ func NewMainKubelet(
|
||||
imageManager: imageManager,
|
||||
}
|
||||
|
||||
klet.podManager = newBasicPodManager(klet.kubeClient)
|
||||
|
||||
dockerCache, err := dockertools.NewDockerCache(dockerClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -253,8 +255,6 @@ func NewMainKubelet(
|
||||
|
||||
klet.podStatuses = make(map[string]api.PodStatus)
|
||||
|
||||
klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)
|
||||
|
||||
return klet, nil
|
||||
}
|
||||
|
||||
@@ -285,20 +285,10 @@ type Kubelet struct {
|
||||
podStatusUpdateFrequency time.Duration
|
||||
sourcesReady SourcesReadyFn
|
||||
|
||||
// Protects the pods array
|
||||
// We make complete array copies out of this while locked, which is OK because once added to this array,
|
||||
// pods are immutable
|
||||
podLock sync.RWMutex
|
||||
pods []api.Pod
|
||||
// Record the set of mirror pods (see mirror_manager.go for more details);
|
||||
// similar to pods, this is not immutable and is protected by the same podLock.
|
||||
// Note that Kubelet.pods do not contain mirror pods as they are filtered
|
||||
// out beforehand.
|
||||
mirrorPods mirrorPods
|
||||
|
||||
podManager podManager
|
||||
// A pod status cache stores statuses for pods (both rejected and synced).
|
||||
// Note that currently no thread attempts to acquire podStatusesLock while
|
||||
// holding podLock, and vice versa. If you intend to change this usage
|
||||
// accessing podManager, and vice versa. If you intend to change this usage
|
||||
// pattern, please explicitly impose an acquiring order to avoid deadlocks
|
||||
// and document such an order in the comment.
|
||||
podStatusesLock sync.RWMutex
|
||||
@@ -353,9 +343,6 @@ type Kubelet struct {
|
||||
// the EventRecorder to use
|
||||
recorder record.EventRecorder
|
||||
|
||||
// A mirror pod manager which provides helper functions.
|
||||
mirrorManager mirrorManager
|
||||
|
||||
// Policy for handling garbage collection of dead containers.
|
||||
containerGC containerGC
|
||||
|
||||
@@ -1445,7 +1432,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
|
||||
|
||||
if !hasMirrorPod && isStaticPod(pod) {
|
||||
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
|
||||
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
|
||||
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
|
||||
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
|
||||
}
|
||||
}
|
||||
@@ -1572,7 +1559,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
||||
}
|
||||
|
||||
// Run the sync in an async manifest worker.
|
||||
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
|
||||
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
|
||||
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
|
||||
})
|
||||
|
||||
@@ -1641,33 +1628,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
||||
}
|
||||
|
||||
// Remove any orphaned mirror pods.
|
||||
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)
|
||||
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
|
||||
updated := []api.Pod{}
|
||||
m := map[types.UID]*api.Pod{}
|
||||
for i := range changed {
|
||||
pod := &changed[i]
|
||||
m[pod.UID] = pod
|
||||
}
|
||||
|
||||
for i := range current {
|
||||
pod := ¤t[i]
|
||||
if m[pod.UID] != nil {
|
||||
updated = append(updated, *m[pod.UID])
|
||||
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
|
||||
} else {
|
||||
updated = append(updated, *pod)
|
||||
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
|
||||
}
|
||||
}
|
||||
|
||||
return updated
|
||||
}
|
||||
|
||||
type podsByCreationTime []api.Pod
|
||||
|
||||
func (s podsByCreationTime) Len() int {
|
||||
@@ -1771,7 +1736,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
|
||||
select {
|
||||
case u := <-updates:
|
||||
kl.updatePods(u, podSyncTypes)
|
||||
kl.podManager.UpdatePods(u, podSyncTypes)
|
||||
unsyncedPod = true
|
||||
case <-time.After(kl.resyncInterval):
|
||||
glog.V(4).Infof("Periodic sync")
|
||||
@@ -1782,7 +1747,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
for unsyncedPod {
|
||||
select {
|
||||
case u := <-updates:
|
||||
kl.updatePods(u, podSyncTypes)
|
||||
kl.podManager.UpdatePods(u, podSyncTypes)
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
// Break the for loop.
|
||||
unsyncedPod = false
|
||||
@@ -1830,52 +1795,6 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
|
||||
t.Stop()
|
||||
}
|
||||
|
||||
// Update the Kubelet's internal pods with those provided by the update.
|
||||
// Records new and updated pods in newPods and updatedPods.
|
||||
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
||||
kl.podLock.Lock()
|
||||
defer kl.podLock.Unlock()
|
||||
switch u.Op {
|
||||
case SET:
|
||||
glog.V(3).Infof("SET: Containers changed")
|
||||
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
|
||||
|
||||
// Store the new pods. Don't worry about filtering host ports since those
|
||||
// pods will never be looked up.
|
||||
existingPods := make(map[types.UID]struct{})
|
||||
for i := range kl.pods {
|
||||
existingPods[kl.pods[i].UID] = struct{}{}
|
||||
}
|
||||
for _, pod := range newPods {
|
||||
if _, ok := existingPods[pod.UID]; !ok {
|
||||
podSyncTypes[pod.UID] = metrics.SyncPodCreate
|
||||
}
|
||||
}
|
||||
// Actually update the pods.
|
||||
kl.pods = newPods
|
||||
kl.mirrorPods = newMirrorPods
|
||||
case UPDATE:
|
||||
glog.V(3).Infof("Update: Containers changed")
|
||||
|
||||
// Store the updated pods. Don't worry about filtering host ports since those
|
||||
// pods will never be looked up.
|
||||
for i := range u.Pods {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
|
||||
}
|
||||
allPods := updatePods(u.Pods, kl.pods)
|
||||
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
|
||||
default:
|
||||
panic("syncLoop does not support incremental changes")
|
||||
}
|
||||
|
||||
// Mark all remaining pods as sync.
|
||||
for i := range kl.pods {
|
||||
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns Docker version for this Kubelet.
|
||||
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
|
||||
if kl.dockerClient == nil {
|
||||
@@ -1937,31 +1856,17 @@ func (kl *Kubelet) GetHostname() string {
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pod map.
|
||||
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
|
||||
return kl.podManager.GetPods()
|
||||
}
|
||||
|
||||
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||
name, namespace, err := ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return kl.GetPodByName(namespace, name)
|
||||
return kl.podManager.GetPodByFullName(podFullName)
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well
|
||||
// as whether the pod was found.
|
||||
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
for i := range kl.pods {
|
||||
pod := kl.pods[i]
|
||||
if pod.Namespace == namespace && pod.Name == name {
|
||||
return &pod, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
return kl.podManager.GetPodByName(namespace, name)
|
||||
}
|
||||
|
||||
// updateNodeStatus updates node status to master with retries.
|
||||
@@ -2108,7 +2013,7 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
|
||||
|
||||
// GetPodStatus returns information from Docker about the containers in a pod
|
||||
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||
uid = kl.translatePodUID(uid)
|
||||
uid = kl.podManager.TranslatePodUID(uid)
|
||||
|
||||
// Check to see if we have a cached version of the status.
|
||||
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
|
||||
@@ -2172,7 +2077,7 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
|
||||
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
|
||||
uid = kl.translatePodUID(uid)
|
||||
uid = kl.podManager.TranslatePodUID(uid)
|
||||
|
||||
if kl.runner == nil {
|
||||
return nil, fmt.Errorf("no runner specified.")
|
||||
@@ -2191,7 +2096,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
|
||||
// ExecInContainer executes a command in a container, connecting the supplied
|
||||
// stdin/stdout/stderr to the command's IO streams.
|
||||
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
|
||||
uid = kl.translatePodUID(uid)
|
||||
uid = kl.podManager.TranslatePodUID(uid)
|
||||
|
||||
if kl.runner == nil {
|
||||
return fmt.Errorf("no runner specified.")
|
||||
@@ -2210,7 +2115,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
|
||||
// PortForward connects to the pod's port and copies data between the port
|
||||
// and the stream.
|
||||
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
|
||||
uid = kl.translatePodUID(uid)
|
||||
uid = kl.podManager.TranslatePodUID(uid)
|
||||
|
||||
if kl.runner == nil {
|
||||
return fmt.Errorf("no runner specified.")
|
||||
@@ -2244,29 +2149,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
|
||||
return kl.streamingConnectionIdleTimeout
|
||||
}
|
||||
|
||||
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
|
||||
// Otherwise, return the original UID. All public-facing functions should
|
||||
// perform this translation for UIDs because user may provide a mirror pod UID,
|
||||
// which is not recognized by internal Kubelet functions.
|
||||
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
|
||||
if uid == "" {
|
||||
return uid
|
||||
}
|
||||
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
|
||||
if ok {
|
||||
return staticUID
|
||||
} else {
|
||||
return uid
|
||||
}
|
||||
}
|
||||
|
||||
// GetContainerInfo returns stats (from Cadvisor) for a container.
|
||||
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
|
||||
|
||||
uid = kl.translatePodUID(uid)
|
||||
uid = kl.podManager.TranslatePodUID(uid)
|
||||
|
||||
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
|
||||
if err != nil {
|
||||
|
@@ -105,9 +105,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
}
|
||||
mockCadvisor := &cadvisor.Mock{}
|
||||
kubelet.cadvisor = mockCadvisor
|
||||
mirrorManager := newFakeMirrorMananger()
|
||||
kubelet.mirrorManager = mirrorManager
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager}
|
||||
podManager, fakeMirrorManager := newFakePodManager()
|
||||
kubelet.podManager = podManager
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorManager}
|
||||
}
|
||||
|
||||
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
||||
@@ -434,7 +434,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
ID: "9876",
|
||||
},
|
||||
}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -448,8 +448,9 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -468,7 +469,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
TerminationMessagePath: "/dev/somepath",
|
||||
}
|
||||
fakeDocker.ContainerList = []docker.APIContainers{}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -482,8 +483,9 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -518,7 +520,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
waitGroup := testKubelet.waitGroup
|
||||
kubelet.podInfraContainerImage = "custom_image_name"
|
||||
fakeDocker.ContainerList = []docker.APIContainers{}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -532,8 +534,9 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -572,7 +575,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
puller.HasImages = []string{}
|
||||
kubelet.podInfraContainerImage = "custom_image_name"
|
||||
fakeDocker.ContainerList = []docker.APIContainers{}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -587,7 +590,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -623,7 +627,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
ID: "9876",
|
||||
},
|
||||
}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -638,7 +642,8 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -670,7 +675,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
ID: "9876",
|
||||
},
|
||||
}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -696,7 +701,8 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -739,7 +745,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
ID: "8765",
|
||||
},
|
||||
}
|
||||
kubelet.pods = []api.Pod{
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
@@ -766,7 +772,8 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(2)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -907,7 +914,8 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@@ -948,7 +956,8 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@@ -1002,7 +1011,8 @@ func TestSyncPodUnhealthy(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@@ -1692,7 +1702,8 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@@ -2868,7 +2879,7 @@ func TestHandlePortConflicts(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.pods = pods
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err := kl.GetPodStatus(conflictedPodName, "")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
|
||||
@@ -2921,7 +2932,7 @@ func TestHandleNodeSelector(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.pods = pods
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err := kl.GetPodStatus(notfittingPodName, "")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
|
||||
@@ -2980,7 +2991,7 @@ func TestHandleMemExceeded(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.pods = pods
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err := kl.GetPodStatus(notfittingPodName, "")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
|
||||
@@ -3232,7 +3243,8 @@ func TestCreateMirrorPod(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
kl.pods = append(kl.pods, pod)
|
||||
pods := []api.Pod{pod}
|
||||
kl.podManager.SetPods(pods)
|
||||
hasMirrorPod := false
|
||||
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{})
|
||||
if err != nil {
|
||||
@@ -3357,7 +3369,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
// Use the mirror pod UID to retrieve the stats.
|
||||
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
|
||||
if err != nil {
|
||||
|
@@ -85,14 +85,6 @@ func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete all orphaned mirror pods.
|
||||
func deleteOrphanedMirrorPods(mirrorPods mirrorPods, manager mirrorManager) {
|
||||
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
|
||||
for _, podFullName := range podFullNames {
|
||||
manager.DeleteMirrorPod(podFullName)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions.
|
||||
func getPodSource(pod *api.Pod) (string, error) {
|
||||
if pod.Annotations != nil {
|
||||
|
199
pkg/kubelet/pod_manager.go
Normal file
199
pkg/kubelet/pod_manager.go
Normal file
@@ -0,0 +1,199 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type podManager interface {
|
||||
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
|
||||
GetPods() ([]api.Pod, mirrorPods)
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
GetPodByFullName(podFullName string) (*api.Pod, bool)
|
||||
TranslatePodUID(uid types.UID) types.UID
|
||||
DeleteOrphanedMirrorPods(mirrorPods *mirrorPods)
|
||||
SetPods(pods []api.Pod)
|
||||
mirrorManager
|
||||
}
|
||||
|
||||
type basicPodManager struct {
|
||||
// Protects all internal pod storage/mappings.
|
||||
lock sync.RWMutex
|
||||
pods []api.Pod
|
||||
// Record the set of mirror pods (see mirror_manager.go for more details);
|
||||
// similar to pods, this is not immutable and is protected by the same podLock.
|
||||
// Note that basicPodManager.pods do not contain mirror pods as they are
|
||||
// filtered out beforehand.
|
||||
mirrorPods mirrorPods
|
||||
|
||||
// A mirror pod manager which provides helper functions.
|
||||
mirrorManager mirrorManager
|
||||
}
|
||||
|
||||
func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
|
||||
podManager := &basicPodManager{}
|
||||
podManager.mirrorManager = newBasicMirrorManager(apiserverClient)
|
||||
podManager.mirrorPods = *newMirrorPods()
|
||||
podManager.pods = []api.Pod{}
|
||||
return podManager
|
||||
}
|
||||
|
||||
// This method is used only for testing to quickly set the internal pods.
|
||||
func (self *basicPodManager) SetPods(pods []api.Pod) {
|
||||
self.pods, self.mirrorPods = filterAndCategorizePods(pods)
|
||||
}
|
||||
|
||||
// Update the internal pods with those provided by the update.
|
||||
// Records new and updated pods in newPods and updatedPods.
|
||||
func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
switch u.Op {
|
||||
case SET:
|
||||
glog.V(3).Infof("SET: Containers changed")
|
||||
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
|
||||
|
||||
// Store the new pods. Don't worry about filtering host ports since those
|
||||
// pods will never be looked up.
|
||||
existingPods := make(map[types.UID]struct{})
|
||||
for i := range self.pods {
|
||||
existingPods[self.pods[i].UID] = struct{}{}
|
||||
}
|
||||
for _, pod := range newPods {
|
||||
if _, ok := existingPods[pod.UID]; !ok {
|
||||
podSyncTypes[pod.UID] = metrics.SyncPodCreate
|
||||
}
|
||||
}
|
||||
// Actually update the pods.
|
||||
self.pods = newPods
|
||||
self.mirrorPods = newMirrorPods
|
||||
case UPDATE:
|
||||
glog.V(3).Infof("Update: Containers changed")
|
||||
|
||||
// Store the updated pods. Don't worry about filtering host ports since those
|
||||
// pods will never be looked up.
|
||||
for i := range u.Pods {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
|
||||
}
|
||||
allPods := updatePods(u.Pods, self.pods)
|
||||
self.pods, self.mirrorPods = filterAndCategorizePods(allPods)
|
||||
default:
|
||||
panic("syncLoop does not support incremental changes")
|
||||
}
|
||||
|
||||
// Mark all remaining pods as sync.
|
||||
for i := range self.pods {
|
||||
if _, ok := podSyncTypes[self.pods[i].UID]; !ok {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
|
||||
updated := []api.Pod{}
|
||||
m := map[types.UID]*api.Pod{}
|
||||
for i := range changed {
|
||||
pod := &changed[i]
|
||||
m[pod.UID] = pod
|
||||
}
|
||||
|
||||
for i := range current {
|
||||
pod := ¤t[i]
|
||||
if m[pod.UID] != nil {
|
||||
updated = append(updated, *m[pod.UID])
|
||||
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
|
||||
} else {
|
||||
updated = append(updated, *pod)
|
||||
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
|
||||
}
|
||||
}
|
||||
|
||||
return updated
|
||||
}
|
||||
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pod map.
|
||||
func (self *basicPodManager) GetPods() ([]api.Pod, mirrorPods) {
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
return append([]api.Pod{}, self.pods...), self.mirrorPods
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well
|
||||
// as whether the pod was found.
|
||||
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
for i := range self.pods {
|
||||
pod := self.pods[i]
|
||||
if pod.Namespace == namespace && pod.Name == name {
|
||||
return &pod, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||
name, namespace, err := ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return self.GetPodByName(namespace, name)
|
||||
}
|
||||
|
||||
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
|
||||
// Otherwise, return the original UID. All public-facing functions should
|
||||
// perform this translation for UIDs because user may provide a mirror pod UID,
|
||||
// which is not recognized by internal Kubelet functions.
|
||||
func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
|
||||
if uid == "" {
|
||||
return uid
|
||||
}
|
||||
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
staticUID, ok := self.mirrorPods.GetStaticUID(uid)
|
||||
if ok {
|
||||
return staticUID
|
||||
} else {
|
||||
return uid
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all orphaned mirror pods. This method doesn't acquire the lock
|
||||
// because it assumes the a copy of the mirrorPod is passed as an argument.
|
||||
func (self *basicPodManager) DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) {
|
||||
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
|
||||
for _, podFullName := range podFullNames {
|
||||
self.mirrorManager.DeleteMirrorPod(podFullName)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error {
|
||||
return self.mirrorManager.CreateMirrorPod(pod, hostname)
|
||||
}
|
||||
|
||||
func (self *basicPodManager) DeleteMirrorPod(podFullName string) error {
|
||||
return self.mirrorManager.DeleteMirrorPod(podFullName)
|
||||
}
|
25
pkg/kubelet/pod_manager_test.go
Normal file
25
pkg/kubelet/pod_manager_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
// Stub out mirror manager for testing purpose.
|
||||
func newFakePodManager() (*basicPodManager, *fakeMirrorManager) {
|
||||
podManager := newBasicPodManager(nil)
|
||||
fakeMirrorManager := newFakeMirrorMananger()
|
||||
podManager.mirrorManager = fakeMirrorManager
|
||||
return podManager, fakeMirrorManager
|
||||
}
|
Reference in New Issue
Block a user