mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #19889 from yujuhong/replace_cache
Auto commit by PR queue bot
This commit is contained in:
commit
71ae2736c0
48
pkg/kubelet/container/fake_cache.go
Normal file
48
pkg/kubelet/container/fake_cache.go
Normal file
@ -0,0 +1,48 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
type fakeCache struct {
|
||||
runtime Runtime
|
||||
}
|
||||
|
||||
func NewFakeCache(runtime Runtime) Cache {
|
||||
return &fakeCache{runtime: runtime}
|
||||
}
|
||||
|
||||
func (c *fakeCache) Get(id types.UID) (*PodStatus, error) {
|
||||
return c.runtime.GetPodStatus(id, "", "")
|
||||
}
|
||||
|
||||
func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
|
||||
return c.Get(id)
|
||||
}
|
||||
|
||||
func (c *fakeCache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
|
||||
}
|
||||
|
||||
func (c *fakeCache) Delete(id types.UID) {
|
||||
}
|
||||
|
||||
func (c *fakeCache) UpdateTime(_ time.Time) {
|
||||
}
|
@ -26,17 +26,17 @@ import (
|
||||
// fakePodWorkers runs sync pod function in serial, so we can have
|
||||
// deterministic behaviour in testing.
|
||||
type fakePodWorkers struct {
|
||||
syncPodFn syncPodFnType
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
t TestingInterface
|
||||
syncPodFn syncPodFnType
|
||||
cache kubecontainer.Cache
|
||||
t TestingInterface
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
|
||||
pods, err := f.runtimeCache.GetPods()
|
||||
status, err := f.cache.Get(pod.UID)
|
||||
if err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID), kubetypes.SyncPodUpdate); err != nil {
|
||||
if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -447,7 +447,7 @@ func NewMainKubelet(
|
||||
klet.runtimeCache = runtimeCache
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue()
|
||||
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
|
||||
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
|
||||
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
|
||||
@ -1571,9 +1571,16 @@ func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []s
|
||||
return nameservers, searches, nil
|
||||
}
|
||||
|
||||
// Kill all running containers in a pod (includes the pod infra container).
|
||||
func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
return kl.containerRuntime.KillPod(pod, runningPod)
|
||||
// One of the following aruguements must be non-nil: runningPod, status.
|
||||
// TODO: Modify containerRuntime.KillPod() to accept the right arguements.
|
||||
func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus) error {
|
||||
var p kubecontainer.Pod
|
||||
if runningPod != nil {
|
||||
p = *runningPod
|
||||
} else if status != nil {
|
||||
p = kubecontainer.ConvertPodStatusToRunningPod(status)
|
||||
}
|
||||
return kl.containerRuntime.KillPod(pod, p)
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
@ -1593,8 +1600,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Remove runningPod from the arguments.
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
|
||||
var firstSeenTime time.Time
|
||||
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
|
||||
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
|
||||
@ -1610,29 +1616,21 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
||||
}
|
||||
}
|
||||
|
||||
// Query the container runtime (or cache) to retrieve the pod status, and
|
||||
// update it in the status manager.
|
||||
podStatus, statusErr := kl.getRuntimePodStatus(pod)
|
||||
apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr)
|
||||
apiPodStatus, err := kl.generatePodStatus(pod, podStatus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
// TODO: The logic seems wrong since the pod phase can become pending when
|
||||
// the container runtime is temporarily not available.
|
||||
if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
|
||||
if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
|
||||
}
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
if statusErr != nil {
|
||||
return statusErr
|
||||
}
|
||||
|
||||
// Kill pods we can't run.
|
||||
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
|
||||
if err := kl.killPod(pod, runningPod); err != nil {
|
||||
if err := kl.killPod(pod, nil, podStatus); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return err
|
||||
@ -2098,7 +2096,7 @@ func (kl *Kubelet) podKiller() {
|
||||
ch <- pod.ID
|
||||
}()
|
||||
glog.V(2).Infof("Killing unwanted pod %q", pod.Name)
|
||||
err := kl.killPod(nil, *pod)
|
||||
err := kl.killPod(nil, pod, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
|
||||
}
|
||||
@ -3090,7 +3088,7 @@ func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus,
|
||||
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) {
|
||||
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (api.PodStatus, error) {
|
||||
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
|
||||
// TODO: Consider include the container information.
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
@ -3102,16 +3100,6 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
|
||||
Message: "Pod was active on the node longer than specified deadline"}, nil
|
||||
}
|
||||
|
||||
if statusErr != nil {
|
||||
// TODO: Re-evaluate whether we should set the status to "Pending".
|
||||
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr)
|
||||
return api.PodStatus{
|
||||
Phase: api.PodPending,
|
||||
Reason: "GeneralError",
|
||||
Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr),
|
||||
}, nil
|
||||
}
|
||||
// Convert the internal PodStatus to api.PodStatus.
|
||||
s := kl.convertStatusToAPIStatus(pod, podStatus)
|
||||
|
||||
// Assume info is ready to process
|
||||
|
@ -150,10 +150,11 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
kubelet.containerRuntime = fakeRuntime
|
||||
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime)
|
||||
kubelet.reasonCache = NewReasonCache()
|
||||
kubelet.podCache = kubecontainer.NewFakeCache(kubelet.containerRuntime)
|
||||
kubelet.podWorkers = &fakePodWorkers{
|
||||
syncPodFn: kubelet.syncPod,
|
||||
runtimeCache: kubelet.runtimeCache,
|
||||
t: t,
|
||||
syncPodFn: kubelet.syncPod,
|
||||
cache: kubelet.podCache,
|
||||
t: t,
|
||||
}
|
||||
|
||||
kubelet.probeManager = prober.FakeManager{}
|
||||
@ -3390,7 +3391,7 @@ func TestCreateMirrorPod(t *testing.T) {
|
||||
}
|
||||
pods := []*api.Pod{pod}
|
||||
kl.podManager.SetPods(pods)
|
||||
err := kl.syncPod(pod, nil, container.Pod{}, updateType)
|
||||
err := kl.syncPod(pod, nil, &container.PodStatus{}, updateType)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -3448,7 +3449,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
|
||||
|
||||
pods := []*api.Pod{pod, mirrorPod}
|
||||
kl.podManager.SetPods(pods)
|
||||
err := kl.syncPod(pod, mirrorPod, container.Pod{}, kubetypes.SyncPodUpdate)
|
||||
err := kl.syncPod(pod, mirrorPod, &container.PodStatus{}, kubetypes.SyncPodUpdate)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -3614,7 +3615,7 @@ func TestHostNetworkAllowed(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods([]*api.Pod{pod})
|
||||
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
|
||||
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
|
||||
if err != nil {
|
||||
t.Errorf("expected pod infra creation to succeed: %v", err)
|
||||
}
|
||||
@ -3647,7 +3648,7 @@ func TestHostNetworkDisallowed(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
|
||||
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
|
||||
if err == nil {
|
||||
t.Errorf("expected pod infra creation to fail")
|
||||
}
|
||||
@ -3674,7 +3675,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods([]*api.Pod{pod})
|
||||
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
|
||||
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
|
||||
if err != nil {
|
||||
t.Errorf("expected pod infra creation to succeed: %v", err)
|
||||
}
|
||||
@ -3700,7 +3701,7 @@ func TestPrivilegeContainerDisallowed(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
|
||||
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
|
||||
if err == nil {
|
||||
t.Errorf("expected pod infra creation to fail")
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ type PodWorkers interface {
|
||||
ForgetWorker(uid types.UID)
|
||||
}
|
||||
|
||||
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, kubetypes.SyncPodType) error
|
||||
type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error
|
||||
|
||||
type podWorkers struct {
|
||||
// Protects all per worker fields.
|
||||
@ -53,8 +53,6 @@ type podWorkers struct {
|
||||
// Tracks the last undelivered work item for this pod - a work item is
|
||||
// undelivered if it comes in while the worker is working.
|
||||
lastUndeliveredWorkUpdate map[types.UID]workUpdate
|
||||
// runtimeCache is used for listing running containers.
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
|
||||
workQueue queue.WorkQueue
|
||||
|
||||
@ -90,13 +88,12 @@ type workUpdate struct {
|
||||
updateType kubetypes.SyncPodType
|
||||
}
|
||||
|
||||
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
|
||||
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
|
||||
func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
|
||||
resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
|
||||
return &podWorkers{
|
||||
podUpdates: map[types.UID]chan workUpdate{},
|
||||
isWorking: map[types.UID]bool{},
|
||||
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
|
||||
runtimeCache: runtimeCache,
|
||||
syncPodFn: syncPodFn,
|
||||
recorder: recorder,
|
||||
workQueue: workQueue,
|
||||
@ -107,45 +104,31 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
|
||||
}
|
||||
|
||||
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
||||
var minRuntimeCacheTime time.Time
|
||||
|
||||
var lastSyncTime time.Time
|
||||
for newWork := range podUpdates {
|
||||
err := func() (err error) {
|
||||
err := func() error {
|
||||
podID := newWork.pod.UID
|
||||
if p.podCache != nil {
|
||||
// This is a blocking call that would return only if the cache
|
||||
// has an entry for the pod that is newer than minRuntimeCache
|
||||
// Time. This ensures the worker doesn't start syncing until
|
||||
// after the cache is at least newer than the finished time of
|
||||
// the previous sync.
|
||||
// TODO: We don't consume the return PodStatus yet, but we
|
||||
// should pass it to syncPod() eventually.
|
||||
p.podCache.GetNewerThan(podID, minRuntimeCacheTime)
|
||||
}
|
||||
// TODO: Deprecate the runtime cache.
|
||||
// We would like to have the state of the containers from at least
|
||||
// the moment when we finished the previous processing of that pod.
|
||||
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
|
||||
glog.Errorf("Error updating the container runtime cache: %v", err)
|
||||
// This is a blocking call that would return only if the cache
|
||||
// has an entry for the pod that is newer than minRuntimeCache
|
||||
// Time. This ensures the worker doesn't start syncing until
|
||||
// after the cache is at least newer than the finished time of
|
||||
// the previous sync.
|
||||
status, err := p.podCache.GetNewerThan(podID, lastSyncTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pods, err := p.runtimeCache.GetPods()
|
||||
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType)
|
||||
lastSyncTime = time.Now()
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods while syncing pod: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
|
||||
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
|
||||
minRuntimeCacheTime = time.Now()
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
|
||||
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
|
||||
return err
|
||||
}
|
||||
newWork.updateCompleteFn()
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
|
||||
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
|
||||
}
|
||||
p.wrapUp(newWork.pod.UID, err)
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package kubelet
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -45,10 +44,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||
processed := make(map[types.UID][]string)
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeRuntime := &kubecontainer.FakeRuntime{}
|
||||
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime)
|
||||
fakeCache := kubecontainer.NewFakeCache(fakeRuntime)
|
||||
podWorkers := newPodWorkers(
|
||||
fakeRuntimeCache,
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
|
||||
func() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
@ -60,7 +58,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||
queue.NewBasicWorkQueue(),
|
||||
time.Second,
|
||||
time.Second,
|
||||
nil,
|
||||
fakeCache,
|
||||
)
|
||||
return podWorkers, processed
|
||||
}
|
||||
@ -151,19 +149,19 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
|
||||
}
|
||||
|
||||
type simpleFakeKubelet struct {
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
runningPod kubecontainer.Pod
|
||||
wg sync.WaitGroup
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
podStatus *kubecontainer.PodStatus
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
|
||||
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
|
||||
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
|
||||
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
|
||||
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
|
||||
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
|
||||
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
|
||||
kl.wg.Done()
|
||||
return nil
|
||||
}
|
||||
@ -186,25 +184,21 @@ func (b byContainerName) Less(i, j int) bool {
|
||||
func TestFakePodWorkers(t *testing.T) {
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeRuntime := &kubecontainer.FakeRuntime{}
|
||||
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime)
|
||||
fakeCache := kubecontainer.NewFakeCache(fakeRuntime)
|
||||
|
||||
kubeletForRealWorkers := &simpleFakeKubelet{}
|
||||
kubeletForFakeWorkers := &simpleFakeKubelet{}
|
||||
|
||||
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil)
|
||||
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
|
||||
realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, fakeCache)
|
||||
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
|
||||
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
podList []*kubecontainer.Pod
|
||||
containersInRunningPod int
|
||||
pod *api.Pod
|
||||
mirrorPod *api.Pod
|
||||
}{
|
||||
{
|
||||
&api.Pod{},
|
||||
&api.Pod{},
|
||||
[]*kubecontainer.Pod{},
|
||||
0,
|
||||
},
|
||||
{
|
||||
&api.Pod{
|
||||
@ -221,29 +215,6 @@ func TestFakePodWorkers(t *testing.T) {
|
||||
Namespace: "new",
|
||||
},
|
||||
},
|
||||
[]*kubecontainer.Pod{
|
||||
{
|
||||
ID: "12345678",
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
Containers: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "fooContainer",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "12345678",
|
||||
Name: "fooMirror",
|
||||
Namespace: "new",
|
||||
Containers: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "fooContainerMirror",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
1,
|
||||
},
|
||||
{
|
||||
&api.Pod{
|
||||
@ -260,42 +231,11 @@ func TestFakePodWorkers(t *testing.T) {
|
||||
Namespace: "new",
|
||||
},
|
||||
},
|
||||
[]*kubecontainer.Pod{
|
||||
{
|
||||
ID: "98765",
|
||||
Name: "bar",
|
||||
Namespace: "new",
|
||||
Containers: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "barContainer0",
|
||||
},
|
||||
{
|
||||
Name: "barContainer1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "98765",
|
||||
Name: "barMirror",
|
||||
Namespace: "new",
|
||||
Containers: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "barContainerMirror0",
|
||||
},
|
||||
{
|
||||
Name: "barContainerMirror1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
2,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
kubeletForRealWorkers.wg.Add(1)
|
||||
|
||||
fakeRuntime.PodList = tt.podList
|
||||
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
|
||||
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
|
||||
|
||||
@ -309,14 +249,8 @@ func TestFakePodWorkers(t *testing.T) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
|
||||
}
|
||||
|
||||
if tt.containersInRunningPod != len(kubeletForFakeWorkers.runningPod.Containers) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, tt.containersInRunningPod, len(kubeletForFakeWorkers.runningPod.Containers))
|
||||
}
|
||||
|
||||
sort.Sort(byContainerName(kubeletForRealWorkers.runningPod))
|
||||
sort.Sort(byContainerName(kubeletForFakeWorkers.runningPod))
|
||||
if !reflect.DeepEqual(kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod)
|
||||
if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
|
||||
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,13 +122,17 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
|
||||
}
|
||||
glog.Infof("pod %q containers not running: syncing", pod.Name)
|
||||
|
||||
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get status for pod %q: %v", pod.Name, err)
|
||||
}
|
||||
glog.Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
|
||||
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
|
||||
glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
|
||||
if err = kl.syncPod(pod, mirrorPod, p, kubetypes.SyncPodUpdate); err != nil {
|
||||
if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
|
||||
return fmt.Errorf("error syncing pod: %v", err)
|
||||
}
|
||||
if retry >= runOnceMaxRetries {
|
||||
|
Loading…
Reference in New Issue
Block a user