Revert "Enable kubecontainer.Cache in kubelet"

This commit is contained in:
Piotr Szczesniak 2016-01-18 13:35:41 +01:00
parent 1534672b4b
commit 9659057986
3 changed files with 110 additions and 100 deletions

View File

@ -115,14 +115,8 @@ const (
plegChannelCapacity = 1000 plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events. // Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container // The period directly affects the response time of kubelet.
// changes and to update pod status. On the other hand, a shorter period plegRelistPeriod = time.Second * 3
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
// backOffPeriod is the period to back off when pod syncing resulting in an // backOffPeriod is the period to back off when pod syncing resulting in an
// error. It is also used as the base period for the exponential backoff // error. It is also used as the base period for the exponential backoff
@ -346,8 +340,6 @@ func NewMainKubelet(
klet.livenessManager = proberesults.NewManager() klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// Initialize the runtime. // Initialize the runtime.
switch containerRuntime { switch containerRuntime {
case "docker": case "docker":
@ -373,6 +365,8 @@ func NewMainKubelet(
imageBackOff, imageBackOff,
serializeImagePulls, serializeImagePulls,
) )
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
case "rkt": case "rkt":
conf := &rkt.Config{ conf := &rkt.Config{
Path: rktPath, Path: rktPath,
@ -393,13 +387,14 @@ func NewMainKubelet(
return nil, err return nil, err
} }
klet.containerRuntime = rktRuntime klet.containerRuntime = rktRuntime
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
// No Docker daemon to put in a container. // No Docker daemon to put in a container.
dockerDaemonContainer = "" dockerDaemonContainer = ""
default: default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
} }
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)
// setup containerGC // setup containerGC
@ -446,7 +441,7 @@ func NewMainKubelet(
} }
klet.runtimeCache = runtimeCache klet.runtimeCache = runtimeCache
klet.workQueue = queue.NewBasicWorkQueue() klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
@ -583,9 +578,6 @@ type Kubelet struct {
// Generates pod events. // Generates pod events.
pleg pleg.PodLifecycleEventGenerator pleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
// The name of the resource-only container to run the Kubelet in (empty for no container). // The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute. // Name must be absolute.
resourceContainer string resourceContainer string
@ -1572,42 +1564,31 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil return nil
} }
// TODO: Remove runningPod from the arguments. func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error { start := kl.clock.Now()
var firstSeenTime time.Time var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
} else {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
} }
if updateType == kubetypes.SyncPodCreate { // Before returning, regenerate status and store it in the cache.
if !firstSeenTime.IsZero() { defer func() {
// This is the first time we are syncing the pod. Record the latency status, err := kl.generatePodStatus(pod)
// since kubelet first saw the pod if firstSeenTime is set. if err != nil {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) glog.Errorf("Unable to generate status for pod %q with error(%v)", format.Pod(pod), err)
// Propagate the error upstream.
syncErr = err
} else { } else {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, status)
} }
} }()
// Query the container runtime (or cache) to retrieve the pod status, and
// update it in the status manager.
podStatus, statusErr := kl.getRuntimePodStatus(pod)
apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr)
if err != nil {
return err
}
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
// TODO: The logic seems wrong since the pod phase can become pending when
// the container runtime is temporarily not available.
if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
if statusErr != nil {
return statusErr
}
// Kill pods we can't run. // Kill pods we can't run.
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
@ -1658,6 +1639,51 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} }
kl.volumeManager.SetVolumes(pod.UID, podVolumes) kl.volumeManager.SetVolumes(pod.UID, podVolumes)
// The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create would cancel this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
var apiPodStatus api.PodStatus
var podStatus *kubecontainer.PodStatus
// Always generate the kubecontainer.PodStatus to know whether there are
// running containers associated with the pod.
podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod)
if err != nil {
glog.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err)
return err
}
apiPodStatus = *apiPodStatusPtr
podStatus = podStatusPtr
if updateType == kubetypes.SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if !firstSeenTime.IsZero() {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
// kubelet may have just been restarted. Re-use the last known
// apiPodStatus.
apiPodStatus = pod.Status
apiPodStatus.StartTime = &unversioned.Time{Time: start}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
glog.V(3).Infof("Reusing api pod status for new pod %q", format.Pod(pod))
}
pullSecrets, err := kl.getPullSecretsForPod(pod) pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil { if err != nil {
glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
@ -2259,6 +2285,10 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update") glog.Errorf("Kubelet does not support snapshot update")
} }
case e := <-plegCh: case e := <-plegCh:
// Filter out started events since we don't use them now.
if e.Type == pleg.ContainerStarted {
break
}
pod, ok := kl.podManager.GetPodByUID(e.ID) pod, ok := kl.podManager.GetPodByUID(e.ID)
if !ok { if !ok {
// If the pod no longer exists, ignore the event. // If the pod no longer exists, ignore the event.
@ -3054,21 +3084,19 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
} }
} }
// Get the internal PodStatus from the cache if the cache exists; // By passing the pod directly, this method avoids pod lookup, which requires
// otherwise, query the runtime directly. // grabbing a lock.
func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, error) { // TODO(random-liu): api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
// after refactoring, modify them later.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
start := kl.clock.Now() start := kl.clock.Now()
defer func() { defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}() }()
if kl.podCache != nil {
return kl.podCache.Get(pod.UID)
}
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
}
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %q", format.Pod(pod)) glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// TODO: Consider include the container information. // TODO: Consider include the container information.
if kl.pastActiveDeadline(pod) { if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded" reason := "DeadlineExceeded"
@ -3079,41 +3107,44 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
Message: "Pod was active on the node longer than specified deadline"}, nil Message: "Pod was active on the node longer than specified deadline"}, nil
} }
if statusErr != nil { spec := &pod.Spec
// TODO: Re-evaluate whether we should set the status to "Pending". podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod)
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr)
return api.PodStatus{ if err != nil {
// Error handling
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
}
pendingStatus := api.PodStatus{
Phase: api.PodPending, Phase: api.PodPending,
Reason: "GeneralError", Reason: "GeneralError",
Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr), Message: fmt.Sprintf("Query container info failed with error (%v)", err),
}, nil }
} return pendingStatus, nil
// Ask the runtime to convert the internal PodStatus to api.PodStatus.
s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus)
if err != nil {
glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err)
return api.PodStatus{}, err
} }
// Assume info is ready to process // Assume info is ready to process
spec := &pod.Spec podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
s.Phase = GetPhase(spec, s.ContainerStatuses) kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase)) podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase))
if !kl.standaloneMode { if !kl.standaloneMode {
hostIP, err := kl.GetHostIP() hostIP, err := kl.GetHostIP()
if err != nil { if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err) glog.V(4).Infof("Cannot get host IP: %v", err)
} else { } else {
s.HostIP = hostIP.String() podStatus.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && s.PodIP == "" { if podUsesHostNetwork(pod) && podStatus.PodIP == "" {
s.PodIP = hostIP.String() podStatus.PodIP = hostIP.String()
} }
} }
} }
return *s, nil return *podStatus, nil
} }
// Returns logs of current machine. // Returns logs of current machine.

View File

@ -71,9 +71,6 @@ type podWorkers struct {
// resyncInterval is the duration to wait until the next sync. // resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration resyncInterval time.Duration
// podCache stores kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
} }
type workUpdate struct { type workUpdate struct {
@ -91,7 +88,7 @@ type workUpdate struct {
} }
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers {
return &podWorkers{ return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{}, podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{}, isWorking: map[types.UID]bool{},
@ -102,27 +99,13 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
workQueue: workQueue, workQueue: workQueue,
resyncInterval: resyncInterval, resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod, backOffPeriod: backOffPeriod,
podCache: podCache,
} }
} }
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time var minRuntimeCacheTime time.Time
for newWork := range podUpdates { for newWork := range podUpdates {
err := func() (err error) { err := func() (err error) {
podID := newWork.pod.UID
if p.podCache != nil {
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
// TODO: We don't consume the return PodStatus yet, but we
// should pass it to syncPod() eventually.
p.podCache.GetNewerThan(podID, minRuntimeCacheTime)
}
// TODO: Deprecate the runtime cache.
// We would like to have the state of the containers from at least // We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod. // the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
@ -223,13 +206,10 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error. // Requeue the last update if the last sync returned error.
switch { if syncErr != nil {
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(uid, p.resyncInterval)
default:
// Error occurred during the sync; back off and then retry.
p.workQueue.Enqueue(uid, p.backOffPeriod) p.workQueue.Enqueue(uid, p.backOffPeriod)
} else {
p.workQueue.Enqueue(uid, p.resyncInterval)
} }
p.checkForUpdates(uid) p.checkForUpdates(uid)
} }

View File

@ -60,7 +60,6 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
queue.NewBasicWorkQueue(), queue.NewBasicWorkQueue(),
time.Second, time.Second,
time.Second, time.Second,
nil,
) )
return podWorkers, processed return podWorkers, processed
} }
@ -191,7 +190,7 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{} kubeletForFakeWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil) realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
tests := []struct { tests := []struct {