Merge pull request #16545 from timstclair/mirrorpods

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-05 02:56:23 -08:00
commit fb571e3e2e
7 changed files with 131 additions and 102 deletions

View File

@ -255,7 +255,6 @@ func NewMainKubelet(
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := status.NewManager(kubeClient)
containerRefManager := kubecontainer.NewRefManager()
volumeManager := newVolumeManager()
@ -287,7 +286,6 @@ func NewMainKubelet(
recorder: recorder,
cadvisor: cadvisorInterface,
diskSpaceManager: diskSpaceManager,
statusManager: statusManager,
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
@ -417,6 +415,7 @@ func NewMainKubelet(
klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
klet.resyncInterval,
@ -1436,27 +1435,18 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
// Before returning, regenerate status and store it in the cache.
defer func() {
if kubepod.IsStaticPod(pod) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not
// exist yet.
return
}
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
// Propagate the error upstream.
syncErr = err
} else {
podToUpdate := pod
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID)
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(podToUpdate, status)
kl.statusManager.SetPodStatus(pod, status)
}
}()

View File

@ -111,7 +111,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
@ -120,6 +119,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.cadvisor = mockCadvisor
fakeMirrorClient := kubepod.NewFakeMirrorClient()
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager)
kubelet.containerRefManager = kubecontainer.NewRefManager()
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
if err != nil {
@ -160,6 +160,7 @@ func newTestPods(count int) []*api.Pod {
},
},
ObjectMeta: api.ObjectMeta{
UID: types.UID(10000 + i),
Name: fmt.Sprintf("pod%d", i),
},
}
@ -3249,39 +3250,6 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
mockCadvisor.AssertExpectations(t)
}
func TestDoNotCacheStatusForStaticPods(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kubelet := testKubelet.kubelet
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "staticFoo",
Namespace: "new",
Annotations: map[string]string{
kubetypes.ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar"},
},
},
},
}
kubelet.podManager.SetPods(pods)
kubelet.HandlePodSyncs(kubelet.podManager.GetPods())
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
}
}
func TestHostNetworkAllowed(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
@ -268,7 +269,7 @@ func newTestManager() *manager {
const probePeriod = 1
m := NewManager(
probePeriod,
status.NewManager(&testclient.Fake{}),
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
results.NewManager(),
results.NewManager(),
nil, // runner

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
@ -119,7 +120,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager = status.NewManager(&testclient.Fake{})
m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil))
resultsManager(m, probeType).Remove(containerID)
}
}

View File

@ -48,7 +48,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil),
statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager,
os: kubecontainer.FakeOS{},

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types"
@ -53,6 +54,7 @@ type podStatusSyncRequest struct {
// All methods are thread-safe.
type manager struct {
kubeClient client.Interface
podManager kubepod.Manager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
@ -87,9 +89,10 @@ type Manager interface {
const syncPeriod = 10 * time.Second
func NewManager(kubeClient client.Interface) Manager {
func NewManager(kubeClient client.Interface, podManager kubepod.Manager) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[types.UID]uint64),
@ -131,49 +134,41 @@ func (m *manager) Start() {
func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
status, ok := m.podStatuses[uid]
status, ok := m.podStatuses[m.podManager.TranslatePodUID(uid)]
return status.status, ok
}
func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
oldStatus, found := m.podStatuses[pod.UID]
// ensure that the start time does not change across updates.
if found && oldStatus.status.StartTime != nil {
status.StartTime = oldStatus.status.StartTime
var oldStatus api.PodStatus
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}
// Set ReadyCondition.LastTransitionTime.
// Note we cannot do this while generating the status since we do not have oldStatus
// at that time for mirror pods.
if readyCondition := api.GetPodReadyCondition(status); readyCondition != nil {
// Need to set LastTransitionTime.
lastTransitionTime := unversioned.Now()
if found {
oldReadyCondition := api.GetPodReadyCondition(oldStatus.status)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
readyCondition.LastTransitionTime = lastTransitionTime
}
// if the status has no start time, we need to set an initial time
// TODO(yujuhong): Consider setting StartTime when generating the pod
// status instead, which would allow manager to become a simple cache
// again.
if status.StartTime.IsZero() {
if pod.Status.StartTime.IsZero() {
// the pod did not have a previously recorded value so set to now
now := unversioned.Now()
status.StartTime = &now
} else {
// the pod had a recorded value, but the kubelet restarted so we need to rebuild cache
// based on last observed value
status.StartTime = pod.Status.StartTime
}
// ensure that the start time does not change across updates.
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
} else if status.StartTime.IsZero() {
// if the status has no start time, we need to set an initial time
now := unversioned.Now()
status.StartTime = &now
}
newStatus := m.updateStatusInternal(pod, status)
@ -288,14 +283,14 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// TODO: make me easier to express from client code
pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", status.podName)
glog.V(3).Infof("Pod %q (%s) was deleted on the server", status.podName, uid)
m.deletePodStatus(uid)
return
}
if err == nil {
if len(pod.UID) > 0 && pod.UID != uid {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update",
kubeletutil.FormatPodName(pod))
translatedUID := m.podManager.TranslatePodUID(pod.UID)
if len(translatedUID) > 0 && translatedUID != uid {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletutil.FormatPodName(pod))
m.deletePodStatus(uid)
return
}
@ -310,12 +305,12 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", pod.Name)
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", kubeletutil.FormatPodName(pod))
return
}
if err := m.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", pod.Name)
m.deletePodStatus(pod.UID)
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", kubeletutil.FormatPodName(pod))
m.deletePodStatus(uid)
return
}
}

View File

@ -23,11 +23,15 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
)
@ -39,8 +43,10 @@ var testPod *api.Pod = &api.Pod{
},
}
func newTestManager() *manager {
return NewManager(&testclient.Fake{}).(*manager)
func newTestManager(kubeClient client.Interface) *manager {
podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
podManager.AddPod(testPod)
return NewManager(kubeClient, podManager).(*manager)
}
func generateRandomMessage() string {
@ -91,7 +97,7 @@ func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
}
func TestNewStatus(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
@ -102,7 +108,7 @@ func TestNewStatus(t *testing.T) {
}
func TestNewStatusPreservesPodStartTime(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -134,7 +140,7 @@ func getReadyPodStatus() api.PodStatus {
}
func TestNewStatusSetsReadyTransitionTime(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
podStatus := getReadyPodStatus()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -154,14 +160,14 @@ func TestNewStatusSetsReadyTransitionTime(t *testing.T) {
}
func TestChangedStatus(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2)
}
func TestChangedStatusKeepsStartTime(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
now := unversioned.Now()
firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now
@ -178,7 +184,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
}
func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
podStatus := getReadyPodStatus()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -208,7 +214,7 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
}
func TestUnchangedStatus(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
podStatus := getRandomPodStatus()
syncer.SetPodStatus(testPod, podStatus)
syncer.SetPodStatus(testPod, podStatus)
@ -216,7 +222,7 @@ func TestUnchangedStatus(t *testing.T) {
}
func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
podStatus := getReadyPodStatus()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -247,21 +253,22 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
func TestSyncBatchIgnoresNotFound(t *testing.T) {
client := testclient.Fake{}
syncer := NewManager(&client).(*manager)
syncer := newTestManager(&client)
client.AddReactor("get", "pods", func(action testclient.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound("pods", "test-pod")
})
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.syncBatch()
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
_, found := syncer.GetPodStatus(testPod.UID)
assert.False(t, found, "Pod status should have been deleted")
}
func TestSyncBatch(t *testing.T) {
syncer := newTestManager()
syncer := newTestManager(&testclient.Fake{})
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.syncBatch()
@ -273,11 +280,14 @@ func TestSyncBatch(t *testing.T) {
}
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestManager()
testPod.UID = "first"
syncer := newTestManager(&testclient.Fake{})
pod := *testPod
pod.UID = "first"
syncer.podManager.AddPod(&pod)
differentPod := *testPod
differentPod.UID = "second"
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.podManager.AddPod(&differentPod)
syncer.kubeClient = testclient.NewSimpleFake(&pod)
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
syncer.syncBatch()
verifyActions(t, syncer.kubeClient, []testclient.Action{
@ -287,12 +297,20 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) {
func TestSyncBatchNoDeadlock(t *testing.T) {
client := &testclient.Fake{}
m := NewManager(client).(*manager)
m := newTestManager(client)
// Setup fake client.
var ret api.Pod
var err error
client.AddReactor("*", "pods", func(action testclient.Action) (bool, runtime.Object, error) {
switch action := action.(type) {
case testclient.GetAction:
assert.Equal(t, testPod.Name, action.GetName(), "Unexpeted GetAction: %+v", action)
case testclient.UpdateAction:
assert.Equal(t, testPod.Name, action.GetObject().(*api.Pod).Name, "Unexpeted UpdateAction: %+v", action)
default:
assert.Fail(t, "Unexpected Action: %+v", action)
}
return true, &ret, err
})
@ -352,7 +370,7 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
func TestStaleUpdates(t *testing.T) {
pod := *testPod
client := testclient.NewSimpleFake(&pod)
m := NewManager(client).(*manager)
m := newTestManager(client)
status := api.PodStatus{Message: "initial status"}
m.SetPodStatus(&pod, status)
@ -425,3 +443,59 @@ func TestStatusEquality(t *testing.T) {
}
}
}
func TestStaticPodStatus(t *testing.T) {
staticPod := *testPod
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
mirrorPod := *testPod
mirrorPod.UID = "mirror-12345678"
mirrorPod.Annotations = map[string]string{
kubetypes.ConfigSourceAnnotationKey: "api",
kubetypes.ConfigMirrorAnnotationKey: "mirror",
}
client := testclient.NewSimpleFake(&mirrorPod)
m := newTestManager(client)
m.podManager.AddPod(&staticPod)
m.podManager.AddPod(&mirrorPod)
// Verify setup.
assert.True(t, kubepod.IsStaticPod(&staticPod), "SetUp error: staticPod")
assert.True(t, kubepod.IsMirrorPod(&mirrorPod), "SetUp error: mirrorPod")
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID)
status := getRandomPodStatus()
now := unversioned.Now()
status.StartTime = &now
m.SetPodStatus(&staticPod, status)
retrievedStatus, _ := m.GetPodStatus(staticPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
// Should translate mirrorPod / staticPod UID.
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
})
updateAction := client.Actions()[1].(testclient.UpdateActionImpl)
updatedPod := updateAction.Object.(*api.Pod)
assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID)
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
client.ClearActions()
otherPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "other-87654321",
Name: "other",
Namespace: "new",
},
}
m.podManager.AddPod(otherPod)
m.SetPodStatus(otherPod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
_, found := m.GetPodStatus(otherPod.UID)
assert.False(t, found, "otherPod status should have been deleted")
}