Merge pull request #19579 from Random-Liu/add-new-reason-cache

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-30 02:06:49 -08:00
commit 29c640d5ba
10 changed files with 346 additions and 27 deletions

View File

@ -174,7 +174,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
return f.PodList, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *util.Backoff) error {
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *util.Backoff) (result PodSyncResult) {
f.Lock()
defer f.Unlock()
@ -183,7 +183,11 @@ func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []a
for _, c := range pod.Spec.Containers {
f.StartedContainers = append(f.StartedContainers, c.Name)
}
return f.Err
// TODO(random-liu): Add SyncResult for starting and killing containers
if f.Err != nil {
result.Fail(f.Err)
}
return
}
func (f *FakeRuntime) KillPod(pod *api.Pod, runningPod Pod) error {

View File

@ -65,8 +65,9 @@ type Runtime interface {
// GarbageCollect removes dead containers using the specified container gc policy
GarbageCollect(gcPolicy ContainerGCPolicy) error
// Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) PodSyncResult
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// TODO(random-liu): Return PodSyncResult in KillPod.
KillPod(pod *api.Pod, runningPod Pod) error
// GetAPIPodStatus retrieves the api.PodStatus of the pod, including the information of
// all containers in the pod. Clients of this interface assume the

View File

@ -57,9 +57,9 @@ func (r *Mock) GetPods(all bool) ([]*Pod, error) {
return args.Get(0).([]*Pod), args.Error(1)
}
func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *util.Backoff) error {
func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *util.Backoff) PodSyncResult {
args := r.Called(pod, apiStatus, status, secrets, backOff)
return args.Error(0)
return args.Get(0).(PodSyncResult)
}
func (r *Mock) KillPod(pod *api.Pod, runningPod Pod) error {

View File

@ -1813,13 +1813,7 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
result := dm.syncPodWithSyncResult(pod, apiPodStatus, podStatus, pullSecrets, backOff)
return result.Error()
}
// (random-liu) This is just a temporary function, will be removed when we acturally add PodSyncEvent
func (dm *DockerManager) syncPodWithSyncResult(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))

View File

@ -551,7 +551,9 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
if backOff == nil {
backOff = util.NewBackOff(time.Second, time.Minute)
}
err = dm.SyncPod(pod, *apiPodStatus, podStatus, []api.Secret{}, backOff)
//TODO(random-liu): Add test for PodSyncResult
result := dm.SyncPod(pod, *apiPodStatus, podStatus, []api.Secret{}, backOff)
err = result.Error()
if err != nil && !expectErr {
t.Errorf("unexpected error: %v", err)
} else if err == nil && expectErr {

View File

@ -441,6 +441,7 @@ func NewMainKubelet(
return nil, err
}
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
@ -563,6 +564,10 @@ type Kubelet struct {
// Container runtime.
containerRuntime kubecontainer.Runtime
// reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus.
reasonCache *ReasonCache
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
@ -1675,8 +1680,9 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
return err
}
err = kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
if err != nil {
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err = result.Error(); err != nil {
return err
}
@ -3087,12 +3093,8 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr),
}, nil
}
// Ask the runtime to convert the internal PodStatus to api.PodStatus.
s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus)
if err != nil {
glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err)
return api.PodStatus{}, err
}
// Convert the internal PodStatus to api.PodStatus.
s := kl.convertStatusToAPIStatus(pod, podStatus)
// Assume info is ready to process
spec := &pod.Spec
@ -3115,6 +3117,142 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
return *s, nil
}
// TODO(random-liu): Move this to some better place.
// TODO(random-liu): Add test for convertStatusToAPIStatus()
func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus {
var apiPodStatus api.PodStatus
uid := pod.UID
convertContainerStatus := func(cs *kubecontainer.ContainerStatus) *api.ContainerStatus {
cid := cs.ID.String()
status := &api.ContainerStatus{
Name: cs.Name,
RestartCount: cs.RestartCount,
Image: cs.Image,
ImageID: cs.ImageID,
ContainerID: cid,
}
switch cs.State {
case kubecontainer.ContainerStateRunning:
status.State.Running = &api.ContainerStateRunning{StartedAt: unversioned.NewTime(cs.StartedAt)}
case kubecontainer.ContainerStateExited:
status.State.Terminated = &api.ContainerStateTerminated{
ExitCode: cs.ExitCode,
Reason: cs.Reason,
Message: cs.Message,
StartedAt: unversioned.NewTime(cs.StartedAt),
FinishedAt: unversioned.NewTime(cs.FinishedAt),
ContainerID: cid,
}
default:
status.State.Waiting = &api.ContainerStateWaiting{}
}
return status
}
statuses := make(map[string]*api.ContainerStatus, len(pod.Spec.Containers))
// Create a map of expected containers based on the pod spec.
expectedContainers := make(map[string]api.Container)
for _, container := range pod.Spec.Containers {
expectedContainers[container.Name] = container
}
containerDone := sets.NewString()
apiPodStatus.PodIP = podStatus.IP
for _, containerStatus := range podStatus.ContainerStatuses {
cName := containerStatus.Name
if _, ok := expectedContainers[cName]; !ok {
// This would also ignore the infra container.
continue
}
if containerDone.Has(cName) {
continue
}
status := convertContainerStatus(containerStatus)
if existing, found := statuses[cName]; found {
existing.LastTerminationState = status.State
containerDone.Insert(cName)
} else {
statuses[cName] = status
}
}
// Handle the containers for which we cannot find any associated active or dead containers or are in restart backoff
// Fetch old containers statuses from old pod status.
// TODO(random-liu) Maybe it's better to get status from status manager, because it takes the newest status and there is not
// status in api.Pod of static pod.
oldStatuses := make(map[string]api.ContainerStatus, len(pod.Spec.Containers))
for _, status := range pod.Status.ContainerStatuses {
oldStatuses[status.Name] = status
}
for _, container := range pod.Spec.Containers {
// TODO(random-liu): We should define "Waiting" state better. And cleanup the following code.
if containerStatus, found := statuses[container.Name]; found {
reason, message, ok := kl.reasonCache.Get(uid, container.Name)
if ok && reason == kubecontainer.ErrCrashLoopBackOff {
containerStatus.LastTerminationState = containerStatus.State
containerStatus.State = api.ContainerState{
Waiting: &api.ContainerStateWaiting{
Reason: reason.Error(),
Message: message,
},
}
}
continue
}
var containerStatus api.ContainerStatus
containerStatus.Name = container.Name
containerStatus.Image = container.Image
if oldStatus, found := oldStatuses[container.Name]; found {
// Some states may be lost due to GC; apply the last observed
// values if possible.
containerStatus.RestartCount = oldStatus.RestartCount
containerStatus.LastTerminationState = oldStatus.LastTerminationState
}
reason, _, ok := kl.reasonCache.Get(uid, container.Name)
if !ok {
// default position for a container
// At this point there are no active or dead containers, the reasonCache is empty (no entry or the entry has expired)
// its reasonable to say the container is being created till a more accurate reason is logged
containerStatus.State = api.ContainerState{
Waiting: &api.ContainerStateWaiting{
Reason: fmt.Sprintf("ContainerCreating"),
Message: fmt.Sprintf("Image: %s is ready, container is creating", container.Image),
},
}
} else if reason == kubecontainer.ErrImagePullBackOff ||
reason == kubecontainer.ErrImageInspect ||
reason == kubecontainer.ErrImagePull ||
reason == kubecontainer.ErrImageNeverPull {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
} else if reason == kubecontainer.ErrRunContainer {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
}
statuses[container.Name] = &containerStatus
}
apiPodStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for containerName, status := range statuses {
if status.State.Waiting != nil {
status.State.Running = nil
// For containers in the waiting state, fill in a specific reason if it is recorded.
if reason, message, ok := kl.reasonCache.Get(uid, containerName); ok {
status.State.Waiting.Reason = reason.Error()
status.State.Waiting.Message = message
}
}
apiPodStatus.ContainerStatuses = append(apiPodStatus.ContainerStatuses, *status)
}
// Sort the container statuses since clients of this interface expect the list
// of containers in a pod has a deterministic order.
sort.Sort(kubetypes.SortedContainerStatuses(apiPodStatus.ContainerStatuses))
return &apiPodStatus
}
// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve

View File

@ -148,6 +148,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.containerRuntime = fakeRuntime
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime)
kubelet.reasonCache = NewReasonCache()
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: kubelet.syncPod,
runtimeCache: kubelet.runtimeCache,

104
pkg/kubelet/reason_cache.go Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"sync"
"github.com/golang/groupcache/lru"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
// ReasonCache stores the failure reason of the latest container start
// in a string, keyed by <pod_UID>_<container_name>. The goal is to
// propagate this reason to the container status. This endeavor is
// "best-effort" for two reasons:
// 1. The cache is not persisted.
// 2. We use an LRU cache to avoid extra garbage collection work. This
// means that some entries may be recycled before a pod has been
// deleted.
// TODO(random-liu): Use more reliable cache which could collect garbage of failed pod.
// TODO(random-liu): Move reason cache to somewhere better.
type ReasonCache struct {
lock sync.RWMutex
cache *lru.Cache
}
// reasonInfo is the cached item in ReasonCache
type reasonInfo struct {
reason error
message string
}
// maxReasonCacheEntries is the cache entry number in lru cache. 1000 is a proper number
// for our 100 pods per node target. If we support more pods per node in the future, we
// may want to increase the number.
const maxReasonCacheEntries = 1000
func NewReasonCache() *ReasonCache {
return &ReasonCache{cache: lru.New(maxReasonCacheEntries)}
}
func (c *ReasonCache) composeKey(uid types.UID, name string) string {
return fmt.Sprintf("%s_%s", uid, name)
}
// add adds error reason into the cache
func (c *ReasonCache) add(uid types.UID, name string, reason error, message string) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Add(c.composeKey(uid, name), reasonInfo{reason, message})
}
// Update updates the reason cache with the SyncPodResult. Only SyncResult with
// StartContainer action will change the cache.
func (c *ReasonCache) Update(uid types.UID, result kubecontainer.PodSyncResult) {
for _, r := range result.SyncResults {
if r.Action != kubecontainer.StartContainer {
continue
}
name := r.Target.(string)
if r.Error != nil {
c.add(uid, name, r.Error, r.Message)
} else {
c.Remove(uid, name)
}
}
}
// Remove removes error reason from the cache
func (c *ReasonCache) Remove(uid types.UID, name string) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Remove(c.composeKey(uid, name))
}
// Get gets error reason from the cache. The return values are error reason, error message and
// whether an error reason is found in the cache. If no error reason is found, empty string will
// be returned for error reason and error message.
func (c *ReasonCache) Get(uid types.UID, name string) (error, string, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache.Get(c.composeKey(uid, name))
if !ok {
return nil, "", ok
}
info := value.(reasonInfo)
return info.reason, info.message, ok
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
func TestReasonCache(t *testing.T) {
// Create test sync result
syncResult := kubecontainer.PodSyncResult{}
results := []*kubecontainer.SyncResult{
// reason cache should be set for SyncResult with StartContainer action and error
kubecontainer.NewSyncResult(kubecontainer.StartContainer, "container_1"),
// reason cache should not be set for SyncResult with StartContainer action but without error
kubecontainer.NewSyncResult(kubecontainer.StartContainer, "container_2"),
// reason cache should not be set for SyncResult with other actions
kubecontainer.NewSyncResult(kubecontainer.KillContainer, "container_3"),
}
results[0].Fail(kubecontainer.ErrRunContainer, "message_1")
results[2].Fail(kubecontainer.ErrKillContainer, "message_3")
syncResult.AddSyncResult(results...)
uid := types.UID("pod_1")
reasonCache := NewReasonCache()
reasonCache.Update(uid, syncResult)
assertReasonInfo(t, reasonCache, uid, results[0], true)
assertReasonInfo(t, reasonCache, uid, results[1], false)
assertReasonInfo(t, reasonCache, uid, results[2], false)
reasonCache.Remove(uid, results[0].Target.(string))
assertReasonInfo(t, reasonCache, uid, results[0], false)
}
func assertReasonInfo(t *testing.T, cache *ReasonCache, uid types.UID, result *kubecontainer.SyncResult, found bool) {
name := result.Target.(string)
actualReason, actualMessage, ok := cache.Get(uid, name)
if ok && !found {
t.Fatalf("unexpected cache hit: %v, %q", actualReason, actualMessage)
}
if !ok && found {
t.Fatalf("corresponding reason info not found")
}
if !found {
return
}
reason := result.Error
message := result.Message
if actualReason != reason || actualMessage != message {
t.Errorf("expected %v %q, got %v %q", reason, message, actualReason, actualMessage)
}
}

View File

@ -976,7 +976,13 @@ func (r *Runtime) APIVersion() (kubecontainer.Version, error) {
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
var err error
defer func() {
if err != nil {
result.Fail(err)
}
}()
// TODO: (random-liu) Stop using running pod in SyncPod()
// TODO: (random-liu) Rename podStatus to apiPodStatus, rename internalPodStatus to podStatus, and use new pod status as much as possible,
// we may stop using apiPodStatus someday.
@ -1031,15 +1037,15 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat
if restartPod {
// Kill the pod only if the pod is actually running.
if len(runningPod.Containers) > 0 {
if err := r.KillPod(pod, runningPod); err != nil {
return err
if err = r.KillPod(pod, runningPod); err != nil {
return
}
}
if err := r.RunPod(pod, pullSecrets); err != nil {
return err
if err = r.RunPod(pod, pullSecrets); err != nil {
return
}
}
return nil
return
}
// GarbageCollect collects the pods/containers.