Dont generatePodStatus twice for new pods

This commit is contained in:
Prashanth Balasubramanian 2015-06-09 17:50:15 -07:00
parent 5a02fc07d8
commit b5ed0e9b13
8 changed files with 163 additions and 81 deletions

View File

@ -37,7 +37,7 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl
if err != nil { if err != nil {
f.t.Errorf("Unexpected error: %v", err) f.t.Errorf("Unexpected error: %v", err)
} }
if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID)); err != nil { if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID), SyncPodUpdate); err != nil {
f.t.Errorf("Unexpected error: %v", err) f.t.Errorf("Unexpected error: %v", err)
} }
} }

View File

@ -88,7 +88,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what // Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which // type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring). // syncing began (for use in monitoring).
SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod, SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod,
startTime time.Time) error startTime time.Time) error
} }
@ -1078,7 +1078,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil return nil
} }
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
@ -1130,11 +1130,39 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} }
kl.volumeManager.SetVolumes(pod.UID, podVolumes) kl.volumeManager.SetVolumes(pod.UID, podVolumes)
podStatus, err := kl.generatePodStatus(pod) // The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create is processed by SyncPods, which cancels this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
//
// Also note that deletes currently have an updateType of `create` set in UpdatePods.
// This, again, does not matter because deletes are not processed by this method.
var podStatus api.PodStatus
if updateType == SyncPodCreate {
podStatus = pod.Status
glog.V(3).Infof("Not generating pod status for new pod %v", podFullName)
} else {
var err error
podStatus, err = kl.generatePodStatus(pod)
if err != nil { if err != nil {
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
return err return err
} }
}
pullSecrets, err := kl.getPullSecretsForPod(pod) pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil { if err != nil {
@ -1306,7 +1334,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
} }
// SyncPods synchronizes the configured list of pods (desired state) with the host current state. // SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error { mirrorPods map[string]*api.Pod, start time.Time) error {
defer func() { defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
@ -1344,7 +1372,7 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
}) })
// Note the number of containers for new pods. // Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == metrics.SyncPodCreate) { if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
} }
} }
@ -1486,7 +1514,7 @@ func (kl *Kubelet) checkCapacityExceeded(pods []*api.Pod) (fitting []*api.Pod, n
} }
// handleOutOfDisk detects if pods can't fit due to lack of disk space. // handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod { func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
if len(podSyncTypes) == 0 { if len(podSyncTypes) == 0 {
// regular sync. no new pods // regular sync. no new pods
return pods return pods
@ -1519,7 +1547,7 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]m
for i := range pods { for i := range pods {
pod := pods[i] pod := pods[i]
// Only reject pods that didn't start yet. // Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == metrics.SyncPodCreate { if podSyncTypes[pod.UID] == SyncPodCreate {
kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.") kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{ kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed, Phase: api.PodFailed,
@ -1578,7 +1606,7 @@ func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
// admitPods handles pod admission. It filters out terminated pods, and pods // admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted. // that don't fit on the node, and may reject pods if node is overcommitted.
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod { func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
// Pod phase progresses monotonically. Once a pod has reached a final state, // Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave irregardless of the restart policy. The statuses // it should never leave irregardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them. // of such pods should not be changed, and there is no need to sync them.
@ -1616,7 +1644,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.") glog.Info("Starting kubelet main sync loop.")
for { for {
unsyncedPod := false unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType) podSyncTypes := make(map[types.UID]SyncPodType)
select { select {
case u, ok := <-updates: case u, ok := <-updates:
if !ok { if !ok {

View File

@ -42,7 +42,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -427,7 +426,7 @@ func apiContainerToContainer(c docker.APIContainers) container.Container {
} }
} }
var emptyPodUIDs map[types.UID]metrics.SyncPodType var emptyPodUIDs map[types.UID]SyncPodType
// TODO: Remove this function after all docker-specifc tests have been migrated // TODO: Remove this function after all docker-specifc tests have been migrated
// to dockertools. // to dockertools.
@ -2774,6 +2773,7 @@ func TestUpdateNodeStatusError(t *testing.T) {
} }
func TestCreateMirrorPod(t *testing.T) { func TestCreateMirrorPod(t *testing.T) {
for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} {
testKubelet := newTestKubeletWithFakeRuntime(t) testKubelet := newTestKubeletWithFakeRuntime(t)
kl := testKubelet.kubelet kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient manager := testKubelet.fakeMirrorClient
@ -2789,7 +2789,7 @@ func TestCreateMirrorPod(t *testing.T) {
} }
pods := []*api.Pod{pod} pods := []*api.Pod{pod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
err := kl.syncPod(pod, nil, container.Pod{}) err := kl.syncPod(pod, nil, container.Pod{}, updateType)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -2800,6 +2800,7 @@ func TestCreateMirrorPod(t *testing.T) {
if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) { if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) {
t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods()) t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods())
} }
}
} }
func TestDeleteOutdatedMirrorPod(t *testing.T) { func TestDeleteOutdatedMirrorPod(t *testing.T) {
@ -2844,7 +2845,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*api.Pod{pod, mirrorPod} pods := []*api.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
err := kl.syncPod(pod, mirrorPod, container.Pod{}) err := kl.syncPod(pod, mirrorPod, container.Pod{}, SyncPodUpdate)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -3044,7 +3045,7 @@ func TestHostNetworkAllowed(t *testing.T) {
}, },
} }
kubelet.podManager.SetPods([]*api.Pod{pod}) kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, container.Pod{}) err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate)
if err != nil { if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err) t.Errorf("expected pod infra creation to succeed: %v", err)
} }
@ -3073,7 +3074,7 @@ func TestHostNetworkDisallowed(t *testing.T) {
HostNetwork: true, HostNetwork: true,
}, },
} }
err := kubelet.syncPod(pod, nil, container.Pod{}) err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate)
if err == nil { if err == nil {
t.Errorf("expected pod infra creation to fail") t.Errorf("expected pod infra creation to fail")
} }
@ -3100,7 +3101,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) {
}, },
} }
kubelet.podManager.SetPods([]*api.Pod{pod}) kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, container.Pod{}) err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate)
if err != nil { if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err) t.Errorf("expected pod infra creation to succeed: %v", err)
} }
@ -3126,7 +3127,7 @@ func TestPrivilegeContainerDisallowed(t *testing.T) {
}, },
}, },
} }
err := kubelet.syncPod(pod, nil, container.Pod{}) err := kubelet.syncPod(pod, nil, container.Pod{}, SyncPodUpdate)
if err == nil { if err == nil {
t.Errorf("expected pod infra creation to fail") t.Errorf("expected pod infra creation to fail")
} }

View File

@ -83,27 +83,6 @@ func Register(containerCache kubecontainer.RuntimeCache) {
}) })
} }
type SyncPodType int
const (
SyncPodSync SyncPodType = iota
SyncPodUpdate
SyncPodCreate
)
func (sp SyncPodType) String() string {
switch sp {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}
// Gets the time since the specified start in microseconds. // Gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 { func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())

View File

@ -22,7 +22,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -48,13 +47,35 @@ type podManager interface {
GetPodByName(namespace, name string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod)
SetPods(pods []*api.Pod) SetPods(pods []*api.Pod)
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType)
DeleteOrphanedMirrorPods() DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID TranslatePodUID(uid types.UID) types.UID
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
mirrorClient mirrorClient
} }
// SyncPodType classifies pod updates, eg: create, update.
type SyncPodType int
const (
SyncPodSync SyncPodType = iota
SyncPodUpdate
SyncPodCreate
)
func (sp SyncPodType) String() string {
switch sp {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}
// All maps in basicPodManager should be set by calling UpdatePods(); // All maps in basicPodManager should be set by calling UpdatePods();
// individual arrays/maps are not immutable and no other methods should attempt // individual arrays/maps are not immutable and no other methods should attempt
// to modify them. // to modify them.
@ -83,7 +104,7 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
} }
// Update the internal pods with those provided by the update. // Update the internal pods with those provided by the update.
func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) {
pm.lock.Lock() pm.lock.Lock()
defer pm.lock.Unlock() defer pm.lock.Unlock()
switch u.Op { switch u.Op {
@ -101,7 +122,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me
for uid := range pm.podByUID { for uid := range pm.podByUID {
if _, ok := existingPods[uid]; !ok { if _, ok := existingPods[uid]; !ok {
podSyncTypes[uid] = metrics.SyncPodCreate podSyncTypes[uid] = SyncPodCreate
} }
} }
case UPDATE: case UPDATE:
@ -110,7 +131,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me
// Store the updated pods. Don't worry about filtering host ports since those // Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up. // pods will never be looked up.
for i := range u.Pods { for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate podSyncTypes[u.Pods[i].UID] = SyncPodUpdate
} }
allPods := applyUpdates(u.Pods, pm.getAllPods()) allPods := applyUpdates(u.Pods, pm.getAllPods())
pm.setPods(allPods) pm.setPods(allPods)
@ -121,7 +142,7 @@ func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]me
// Mark all remaining pods as sync. // Mark all remaining pods as sync.
for uid := range pm.podByUID { for uid := range pm.podByUID {
if _, ok := podSyncTypes[uid]; !ok { if _, ok := podSyncTypes[uid]; !ok {
podSyncTypes[uid] = metrics.SyncPodSync podSyncTypes[uid] = SyncPodSync
} }
} }
} }

View File

@ -34,7 +34,7 @@ type PodWorkers interface {
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
} }
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error
type podWorkers struct { type podWorkers struct {
// Protects all per worker fields. // Protects all per worker fields.
@ -71,6 +71,9 @@ type workUpdate struct {
// Function to call when the update is complete. // Function to call when the update is complete.
updateCompleteFn func() updateCompleteFn func()
// A string describing the type of this update, eg: create
updateType SyncPodType
} }
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
@ -103,7 +106,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
} }
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID)) kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
if err != nil { if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
@ -122,6 +125,14 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
var podUpdates chan workUpdate var podUpdates chan workUpdate
var exists bool var exists bool
// TODO: Pipe this through from the kubelet. Currently kubelets operating with
// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates
// and deletes as SET operations, which makes updates indistinguishable from
// creates. The intent here is to communicate to the pod worker that it can take
// certain liberties, like skipping status generation, when it receives a create
// event for a pod.
updateType := SyncPodUpdate
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists { if podUpdates, exists = p.podUpdates[uid]; !exists {
@ -131,6 +142,12 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
// the channel is empty, so buffer of size 1 is enough. // the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1) podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates p.podUpdates[uid] = podUpdates
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
updateType = SyncPodCreate
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
p.managePodLoop(podUpdates) p.managePodLoop(podUpdates)
@ -142,12 +159,14 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
pod: pod, pod: pod,
mirrorPod: mirrorPod, mirrorPod: mirrorPod,
updateCompleteFn: updateComplete, updateCompleteFn: updateComplete,
updateType: updateType,
} }
} else { } else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{ p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod, pod: pod,
mirrorPod: mirrorPod, mirrorPod: mirrorPod,
updateCompleteFn: updateComplete, updateCompleteFn: updateComplete,
updateType: updateType,
} }
} }
} }

View File

@ -41,19 +41,21 @@ func newPod(uid, name string) *api.Pod {
} }
} }
func createPodWorkers() (*podWorkers, map[types.UID][]string) { func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder))
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) return kubecontainer.NewFakeRuntimeCache(dockerManager)
}
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
lock := sync.Mutex{} lock := sync.Mutex{}
processed := make(map[types.UID][]string) processed := make(map[types.UID][]string)
fakeRecorder := &record.FakeRecorder{}
fakeRuntimeCache := createFakeRuntimeCache(fakeRecorder)
podWorkers := newPodWorkers( podWorkers := newPodWorkers(
fakeRuntimeCache, fakeRuntimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
func() { func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
@ -118,6 +120,38 @@ func TestUpdatePod(t *testing.T) {
} }
} }
func TestUpdateType(t *testing.T) {
syncType := make(chan SyncPodType)
fakeRecorder := &record.FakeRecorder{}
podWorkers := newPodWorkers(
createFakeRuntimeCache(fakeRecorder),
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
func() {
syncType <- updateType
}()
return nil
},
fakeRecorder,
)
cases := map[*api.Pod][]SyncPodType{
newPod("u1", "n1"): {SyncPodCreate, SyncPodUpdate},
newPod("u2", "n1"): {SyncPodCreate},
}
for p, expectedTypes := range cases {
for i := range expectedTypes {
podWorkers.UpdatePod(p, nil, func() {})
select {
case gotType := <-syncType:
if gotType != expectedTypes[i] {
t.Fatalf("Expected sync type %v got %v for pod with uid %v", expectedTypes[i], gotType, p.UID)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("Unexpected delay is running pod worker")
}
}
}
}
func TestForgetNonExistingPodWorkers(t *testing.T) { func TestForgetNonExistingPodWorkers(t *testing.T) {
podWorkers, _ := createPodWorkers() podWorkers, _ := createPodWorkers()
@ -159,12 +193,12 @@ type simpleFakeKubelet struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
return nil return nil
} }
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
kl.wg.Done() kl.wg.Done()
return nil return nil

View File

@ -104,7 +104,7 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
glog.Infof("pod %q containers not running: syncing", pod.Name) glog.Infof("pod %q containers not running: syncing", pod.Name)
// We don't create mirror pods in this mode; pass a dummy boolean value // We don't create mirror pods in this mode; pass a dummy boolean value
// to sycnPod. // to sycnPod.
if err = kl.syncPod(pod, nil, p); err != nil { if err = kl.syncPod(pod, nil, p, SyncPodUpdate); err != nil {
return fmt.Errorf("error syncing pod: %v", err) return fmt.Errorf("error syncing pod: %v", err)
} }
if retry >= RunOnceMaxRetries { if retry >= RunOnceMaxRetries {