Treat first-seen, non-pending pods as updates

Kubelet doesn't perform checkpointing and loses all its internal states after
restarts. It'd then mistaken pods from the api server as new pods and attempt
to go through the admission process. This may result in pods being rejected
even though they are running on the node (e.g., out of disk situation). This
change adds a condition to check whether the pod was seen before and categorize
such pods as updates. The change also removes freeze/unfreeze mechanism used to
work around such cases, since it is no longer needed and it stopped working
correctly ever since we switched to incremental updates.
This commit is contained in:
Yu-Ju Hong 2015-12-10 15:42:56 -08:00
parent bd9d980243
commit 712612c2dc
7 changed files with 43 additions and 58 deletions

View File

@ -229,10 +229,18 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
// this is a no-op
continue
}
// this is an add
recordFirstSeenTime(ref)
pods[name] = ref
addPods = append(addPods, ref)
// If a pod is not found in the cache, and it's also not in the
// pending phase, it implies that kubelet may have restarted.
// Treat this pod as update so that kubelet wouldn't reject the
// pod in the admission process.
if ref.Status.Phase != api.PodPending {
updatePods = append(updatePods, ref)
} else {
// this is an add
addPods = append(addPods, ref)
}
}
case kubetypes.REMOVE:
@ -275,7 +283,16 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
recordFirstSeenTime(ref)
pods[name] = ref
addPods = append(addPods, ref)
// If a pod is not found in the cache, and it's also not in the
// pending phase, it implies that kubelet may have restarted.
// Treat this pod as update so that kubelet wouldn't reject the
// pod in the admission process.
if ref.Status.Phase != api.PodPending {
updatePods = append(updatePods, ref)
} else {
// this is an add
addPods = append(addPods, ref)
}
}
for name, existing := range oldPods {

View File

@ -71,6 +71,9 @@ func CreateValidPod(name, namespace string) *api.Pod {
},
},
},
Status: api.PodStatus{
Phase: api.PodPending,
},
}
}

View File

@ -36,9 +36,6 @@ type diskSpaceManager interface {
// Checks the available disk space
IsRootDiskSpaceAvailable() (bool, error)
IsDockerDiskSpaceAvailable() (bool, error)
// Always returns sufficient space till Unfreeze() is called.
// This is a signal from caller that its internal initialization is done.
Unfreeze()
}
type DiskSpacePolicy struct {
@ -58,9 +55,8 @@ type fsInfo struct {
type realDiskSpaceManager struct {
cadvisor cadvisor.Interface
cachedInfo map[string]fsInfo // cache of filesystem info.
lock sync.Mutex // protecting cachedInfo and frozen.
lock sync.Mutex // protecting cachedInfo.
policy DiskSpacePolicy // thresholds. Set at creation time.
frozen bool // space checks always return ok when frozen is set. True on creation.
}
func (dm *realDiskSpaceManager) getFsInfo(fsType string, f func() (cadvisorapi.FsInfo, error)) (fsInfo, error) {
@ -96,9 +92,6 @@ func (dm *realDiskSpaceManager) IsRootDiskSpaceAvailable() (bool, error) {
}
func (dm *realDiskSpaceManager) isSpaceAvailable(fsType string, threshold int, f func() (cadvisorapi.FsInfo, error)) (bool, error) {
if dm.frozen {
return true, nil
}
fsInfo, err := dm.getFsInfo(fsType, f)
if err != nil {
return true, fmt.Errorf("failed to get fs info for %q: %v", fsType, err)
@ -117,12 +110,6 @@ func (dm *realDiskSpaceManager) isSpaceAvailable(fsType string, threshold int, f
return true, nil
}
func (dm *realDiskSpaceManager) Unfreeze() {
dm.lock.Lock()
defer dm.lock.Unlock()
dm.frozen = false
}
func validatePolicy(policy DiskSpacePolicy) error {
if policy.DockerFreeDiskMB < 0 {
return fmt.Errorf("free disk space should be non-negative. Invalid value %d for docker disk space threshold.", policy.DockerFreeDiskMB)
@ -144,7 +131,6 @@ func newDiskSpaceManager(cadvisorInterface cadvisor.Interface, policy DiskSpaceP
cadvisor: cadvisorInterface,
policy: policy,
cachedInfo: map[string]fsInfo{},
frozen: true,
}
return dm, nil

View File

@ -71,8 +71,6 @@ func TestSpaceAvailable(t *testing.T) {
Capacity: 10 * mb,
}, nil)
dm.Unfreeze()
ok, err := dm.IsDockerDiskSpaceAvailable()
assert.NoError(err)
assert.True(ok)
@ -96,8 +94,6 @@ func TestIsDockerDiskSpaceAvailableWithSpace(t *testing.T) {
Available: 500 * mb,
}, nil)
dm.Unfreeze()
ok, err := dm.IsDockerDiskSpaceAvailable()
assert.NoError(err)
assert.True(ok)
@ -117,8 +113,6 @@ func TestIsDockerDiskSpaceAvailableWithoutSpace(t *testing.T) {
dm, err := newDiskSpaceManager(mockCadvisor, policy)
require.NoError(t, err)
dm.Unfreeze()
ok, err := dm.IsDockerDiskSpaceAvailable()
assert.NoError(err)
assert.False(ok)
@ -139,8 +133,6 @@ func TestIsRootDiskSpaceAvailableWithSpace(t *testing.T) {
Available: 999 * mb,
}, nil)
dm.Unfreeze()
ok, err := dm.IsRootDiskSpaceAvailable()
assert.NoError(err)
assert.True(ok)
@ -161,8 +153,6 @@ func TestIsRootDiskSpaceAvailableWithoutSpace(t *testing.T) {
Available: 9 * mb,
}, nil)
dm.Unfreeze()
ok, err := dm.IsRootDiskSpaceAvailable()
assert.NoError(err)
assert.False(ok)
@ -174,8 +164,6 @@ func TestCache(t *testing.T) {
dm, err := newDiskSpaceManager(mockCadvisor, policy)
assert.NoError(err)
dm.Unfreeze()
mockCadvisor.On("DockerImagesFsInfo").Return(cadvisorapi.FsInfo{
Usage: 400 * mb,
Capacity: 1000 * mb,
@ -220,7 +208,6 @@ func TestFsInfoError(t *testing.T) {
dm, err := newDiskSpaceManager(mockCadvisor, policy)
assert.NoError(err)
dm.Unfreeze()
mockCadvisor.On("DockerImagesFsInfo").Return(cadvisorapi.FsInfo{}, fmt.Errorf("can't find fs"))
mockCadvisor.On("RootFsInfo").Return(cadvisorapi.FsInfo{}, fmt.Errorf("EBUSY"))
ok, err := dm.IsDockerDiskSpaceAvailable()
@ -246,7 +233,6 @@ func Test_getFsInfo(t *testing.T) {
cadvisor: mockCadvisor,
policy: policy,
cachedInfo: map[string]fsInfo{},
frozen: false,
}
available, err := dm.isSpaceAvailable("root", 10, dm.cadvisor.RootFsInfo)
@ -265,7 +251,6 @@ func Test_getFsInfo(t *testing.T) {
cadvisor: mockCadvisor,
policy: policy,
cachedInfo: map[string]fsInfo{},
frozen: false,
}
available, err = dm.isSpaceAvailable("root", 10, dm.cadvisor.RootFsInfo)
assert.False(available)
@ -282,7 +267,6 @@ func Test_getFsInfo(t *testing.T) {
cadvisor: mockCadvisor,
policy: policy,
cachedInfo: map[string]fsInfo{},
frozen: true,
}
available, err = dm.isSpaceAvailable("root", 10, dm.cadvisor.RootFsInfo)
assert.True(available)
@ -300,7 +284,6 @@ func Test_getFsInfo(t *testing.T) {
cadvisor: mockCadvisor,
policy: policy,
cachedInfo: map[string]fsInfo{},
frozen: false,
}
available, err = dm.isSpaceAvailable("root", 10, dm.cadvisor.RootFsInfo)
assert.True(available)
@ -310,20 +293,3 @@ func Test_getFsInfo(t *testing.T) {
// Available error case skipped as v2.FSInfo uses uint64 and this
// can not be less than 0
}
// TestUnfreeze verifies that Unfreze does infact change the frozen
// private field in master
func TestUnfreeze(t *testing.T) {
dm := &realDiskSpaceManager{
cadvisor: new(cadvisor.Mock),
policy: testPolicy(),
cachedInfo: map[string]fsInfo{},
frozen: true,
}
dm.Unfreeze()
if dm.frozen {
t.Errorf("DiskSpaceManager did not unfreeze: %+v", dm)
}
}

View File

@ -2240,11 +2240,6 @@ func (kl *Kubelet) isOutOfDisk() bool {
if err == nil && !withinBounds {
outOfRootDisk = true
}
// Kubelet would indicate all pods as newly created on the first run after restart.
// We ignore the first disk check to ensure that running pods are not killed.
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl.diskSpaceManager.Unfreeze()
return outOfDockerDisk || outOfRootDisk
}

View File

@ -2482,7 +2482,6 @@ func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisor.Mock, rootCa
if err != nil {
return err
}
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
return nil
}
@ -3612,6 +3611,15 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) {
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
mockCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 400 * mb,
Capacity: 1000 * mb,
Available: 600 * mb,
}, nil)
mockCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 9 * mb,
Capacity: 10 * mb,
}, nil)
done := make(chan struct{})
go func() {

View File

@ -23,6 +23,7 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -36,6 +37,15 @@ import (
func TestRunOnce(t *testing.T) {
cadvisor := &cadvisor.Mock{}
cadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
cadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 400 * mb,
Capacity: 1000 * mb,
Available: 600 * mb,
}, nil)
cadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 9 * mb,
Capacity: 10 * mb,
}, nil)
podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
fakeRuntime := &kubecontainer.FakeRuntime{}