Merge pull request #13003 from yujuhong/decouple_workers

kubelet: trigger pod workers independently
This commit is contained in:
Yu-Ju Hong 2015-08-26 09:53:25 -07:00
commit c237ac4c84
10 changed files with 402 additions and 399 deletions

View File

@ -692,7 +692,7 @@ func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfi
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder)
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
// define file config source
if kc.ConfigFile != "" {

View File

@ -504,8 +504,6 @@ func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) {
util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
err := kl.SyncPods([]*api.Pod{}, nil, nil, time.Now())
if err != nil {
log.Errorf("failed to cleanly remove all pods and associated state: %v", err)
}
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}

View File

@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
case kubelet.SET:
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
glog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods

View File

@ -42,6 +42,8 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
type TestingInterface interface {
Errorf(format string, args ...interface{})
}

View File

@ -82,6 +82,11 @@ const (
// max backoff period
maxContainerBackOff = 300 * time.Second
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
)
var (
@ -92,11 +97,11 @@ var (
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occurring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod,
startTime time.Time) error
HandlePodAdditions(pods []*api.Pod)
HandlePodUpdates(pods []*api.Pod)
HandlePodDeletions(pods []*api.Pod)
HandlePodSyncs(pods []*api.Pod)
HandlePodCleanups() error
}
type SourcesReadyFn func() bool
@ -377,6 +382,8 @@ func NewMainKubelet(
}
klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
return klet, nil
}
@ -532,6 +539,9 @@ type Kubelet struct {
// Container restart Backoff
backOff *util.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.Pod
}
// getRootDir returns the full path to the directory under which kubelet can
@ -745,6 +755,10 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)
// Run the system oom watcher forever.
kl.statusManager.Start()
kl.syncLoop(updates, kl)
@ -1227,7 +1241,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create is processed by SyncPods, which cancels this pod worker.
// delete before the second create would cancel this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
@ -1421,58 +1435,38 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
return false
}
//podIsTerminated returns true if status is in one of the terminated state.
func podIsTerminated(status *api.PodStatus) bool {
// Returns true if pod is in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
return true
}
return false
}
// Filter out pods in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
var pods []*api.Pod
for _, pod := range allPods {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
if podIsTerminated(&status) {
func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
var filteredPods []*api.Pod
for _, p := range pods {
if kl.podIsTerminated(p) {
continue
}
pods = append(pods, pod)
filteredPods = append(filteredPods, p)
}
return pods
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
// Send updates to pod workers.
kl.dispatchWork(pods, podSyncTypes, mirrorPods, start)
// Clean up unwanted/orphaned resources.
if err := kl.cleanupPods(allPods, pods); err != nil {
return err
}
return nil
return filteredPods
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) {
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podUIDs[pod.UID] = true
@ -1483,53 +1477,79 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[str
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// dispatchWork dispatches pod updates to workers.
func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) {
// Check for any containers that need starting
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
}
// cleanupPods performs a series of cleanup work, including terminating pod
// workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error {
desiredPods := make(map[types.UID]empty)
for _, pod := range admittedPods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
func (kl *Kubelet) deletePod(uid types.UID) error {
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
kl.podWorkers.ForgetWorker(uid)
// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
return fmt.Errorf("error listing containers: %v", err)
}
pod := kubecontainer.Pods(runningPods).FindPod("", uid)
if pod.IsEmpty() {
return fmt.Errorf("pod not found")
}
kl.podKillingCh <- &pod
// TODO: delete the mirror pod here?
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
// TODO(yujuhong): This function is executed by the main sync loop, so it
// should not contain any blocking calls. Re-examine the function and decide
// whether or not we should move it into a separte goroutine.
func (kl *Kubelet) HandlePodCleanups() error {
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
return nil
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods)
desiredPods := make(map[types.UID]empty)
for _, pod := range activePods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed killing unwanted containers: %v", err)
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; !found {
kl.podKillingCh <- pod
}
}
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cache to get the latest set of
// running pods to clean up the volumes.
@ -1571,42 +1591,38 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
return err
}
// killUnwantedPods kills the unwanted, running pods in parallel.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) error {
ch := make(chan error, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; found {
// Per-pod workers will handle the desired pods.
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan error) {
var err error = nil
defer func() {
ch <- err
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err = kl.killPod(nil, *pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
// podKiller launches a goroutine to kill a pod received from the channel if
// another goroutine isn't already in action.
func (kl *Kubelet) podKiller() {
killing := util.NewStringSet()
resultCh := make(chan types.UID)
defer close(resultCh)
for {
select {
case pod, ok := <-kl.podKillingCh:
if !ok {
return
}
}(pod, ch)
}
if killing.Has(string(pod.ID)) {
// The pod is already being killed.
break
}
killing.Insert(string(pod.ID))
go func(pod *kubecontainer.Pod, ch chan types.UID) {
defer func() {
ch <- pod.ID
}()
glog.V(2).Infof("Killing unwanted pod %q", pod.Name)
err := kl.killPod(nil, *pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
}
}(pod, resultCh)
// Aggregate errors from the pod killing workers.
var errs []error
for i := 0; i < numWorkers; i++ {
err := <-ch
if err != nil {
errs = append(errs, err)
case podID := <-resultCh:
killing.Delete(string(podID))
}
}
return utilErrors.NewAggregate(errs)
}
type podsByCreationTime []*api.Pod
@ -1624,44 +1640,34 @@ func (s podsByCreationTime) Less(i, j int) bool {
}
// checkHostPortConflicts detects pods with conflicted host ports.
func checkHostPortConflicts(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
func hasHostPortConflicts(pods []*api.Pod) bool {
ports := util.StringSet{}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
for _, pod := range pods {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) != 0 {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) > 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs)
notFitting = append(notFitting, pod)
continue
return true
}
fitting = append(fitting, pod)
}
return
return false
}
// checkSufficientfFreeResources detects pods that exceeds node's resources.
func (kl *Kubelet) checkSufficientfFreeResources(pods []*api.Pod) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) {
// hasInsufficientfFreeResources detects pods that exceeds node's resources.
// TODO: Consider integrate disk space into this function, and returns a
// suitable reason and message per resource type.
func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
return pods, nil, nil
// TODO: Should we admit the pod when machine info is unavailable?
return false, false
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
capacity := CapacityFromMachineInfo(info)
return predicates.CheckPodsExceedingFreeResources(pods, capacity)
_, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity)
return len(notFittingCPU) > 0, len(notFittingMemory) > 0
}
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
if len(podSyncTypes) == 0 {
// regular sync. no new pods
return pods
}
func (kl *Kubelet) isOutOfDisk() bool {
outOfDockerDisk := false
outOfRootDisk := false
// Check disk space once globally and reject or accept all new pods.
@ -1681,120 +1687,53 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]S
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl.diskSpaceManager.Unfreeze()
if !outOfDockerDisk && !outOfRootDisk {
// Disk space is fine.
return pods
}
var fitting []*api.Pod
for i := range pods {
pod := pods[i]
// Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == SyncPodCreate {
reason := "OutOfDisk"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to lack of disk space."})
continue
}
fitting = append(fitting, pod)
}
return fitting
return outOfDockerDisk || outOfRootDisk
}
// checkNodeSelectorMatching detects pods that do not match node's labels.
func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
// matchesNodeSelector returns true if pod matches node's labels.
func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
if kl.standaloneMode {
return pods, notFitting
return true
}
node, err := kl.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return pods, nil
return true
}
for _, pod := range pods {
if !predicates.PodMatchesNodeLabels(pod, node) {
notFitting = append(notFitting, pod)
continue
}
fitting = append(fitting, pod)
}
return
return predicates.PodMatchesNodeLabels(pod, node)
}
// handleNotfittingPods handles pods that do not fit on the node and returns
// the pods that fit. It currently checks host port conflicts, node selector
// mismatches, and exceeded node capacity.
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
reason := "HostPortConflict"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to host port conflict.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to host port conflict"})
}
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting {
reason := "NodeSelectorMismatching"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to node selector mismatch.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to node selector mismatch"})
}
fitting, notFittingCPU, notFittingMemory := kl.checkSufficientfFreeResources(fitting)
for _, pod := range notFittingCPU {
reason := "InsufficientFreeCPU"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free CPU.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to insufficient free CPU"})
}
for _, pod := range notFittingMemory {
reason := "InsufficientFreeMemory"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free memory.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to insufficient free memory"})
}
return fitting
func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
kl.recorder.Eventf(pod, reason, message)
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod " + message})
}
// admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted.
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" include all admitted pods plus the
// new pod. The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
if hasHostPortConflicts(pods) {
return false, "HostPortConflict", "cannot start the pod due to host port conflict."
}
if !kl.matchesNodeSelector(pod) {
return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch"
}
cpu, memory := kl.hasInsufficientfFreeResources(pods)
if cpu {
return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU."
} else if memory {
return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory"
}
if kl.isOutOfDisk() {
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
// Reject pods that we cannot run.
// handleNotFittingPods relies on static information (e.g. immutable fields
// in the pod specs or machine information that doesn't change without
// rebooting), and the pods are sorted by immutable creation time. Hence it
// should only rejects new pods without checking the pod sync types.
fitting := kl.handleNotFittingPods(pods)
// Reject new creation requests if diskspace is running low.
admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes)
return admittedPods
return true, "", ""
}
// syncLoop is the main loop for processing changes. It watches for changes from
@ -1821,39 +1760,124 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
glog.Infof("Skipping pod synchronization, network is not configured")
return
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
select {
case u, ok := <-updates:
if !ok {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return
}
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
kl.syncLoopMonitor.Store(time.Now())
switch u.Op {
case ADD:
glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodAdditions(u.Pods)
case UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodUpdates(u.Pods)
case REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodDeletions(u.Pods)
case SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.podManager.UpdatePods(u, podSyncTypes)
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
// Periodically syncs all the pods and performs cleanup tasks.
glog.V(4).Infof("SyncLoop (periodic sync)")
handler.HandlePodSyncs(kl.podManager.GetPods())
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
kl.syncLoopMonitor.Store(time.Now())
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if syncType == SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// TODO: Consider handling all mirror pods updates in a separate component.
func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
// corresponding static pod. Send update to the pod worker if the static
// pod exists.
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)
}
}
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := time.Now()
sort.Sort(podsByCreationTime(pods))
for _, pod := range pods {
kl.podManager.AddPod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Note that allPods includes the new pod since we added at the
// beginning of the loop.
allPods := kl.podManager.GetPods()
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive and the new pod.
activePods := kl.filterOutTerminatedPods(allPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start)
}
}
func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// TODO: Evaluate if we need to validate and reject updates.
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)
}
}
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod.UID); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)
}
}
}
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodSync, mirrorPod, start)
}
kl.syncLoopMonitor.Store(time.Now())
}
func (kl *Kubelet) LatestLoopEntryTime() time.Time {

View File

@ -132,6 +132,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}
@ -348,10 +349,7 @@ func TestSyncPodsStartPod(t *testing.T) {
},
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodSyncs(pods)
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
}
@ -375,16 +373,12 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
},
},
}
if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodCleanups()
// Sources are not ready yet. Don't remove any pods.
fakeRuntime.AssertKilledPods([]string{})
ready = true
if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodCleanups()
// Sources are ready. Remove unwanted pods.
fakeRuntime.AssertKilledPods([]string{"12345678"})
@ -2004,18 +1998,17 @@ func TestGetHostPortConflicts(t *testing.T) {
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
}
// Pods should not cause any conflict.
_, conflicts := checkHostPortConflicts(pods)
if len(conflicts) != 0 {
t.Errorf("expected no conflicts, Got %#v", conflicts)
if hasHostPortConflicts(pods) {
t.Errorf("expected no conflicts, Got conflicts")
}
// The new pod should cause conflict and be reported.
expected := &api.Pod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
// The new pod should cause conflict and be reported.
pods = append(pods, expected)
if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []*api.Pod{expected}) {
t.Errorf("expected %#v, Got %#v", expected, actual)
if !hasHostPortConflicts(pods) {
t.Errorf("expected no conflict, Got no conflicts")
}
}
@ -2052,7 +2045,7 @@ func TestHandlePortConflicts(t *testing.T) {
// The newer pod should be rejected.
conflictedPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(conflictedPod.UID)
if !found {
@ -2094,7 +2087,7 @@ func TestHandleNodeSelector(t *testing.T) {
// The first pod should be rejected.
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
@ -2142,7 +2135,7 @@ func TestHandleMemExceeded(t *testing.T) {
// The newer pod should be rejected.
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
@ -2167,12 +2160,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
}
podToTest := pods[1]
// Run once to populate the status map.
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
kl.podManager.SetPods([]*api.Pod{})
kl.HandlePodCleanups()
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
t.Fatalf("expected to not have status cached for pod2")
}
@ -2695,12 +2689,8 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
}
kl.podManager.SetPods(orphanPods)
pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap()
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodCleanups()
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
@ -2802,7 +2792,7 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "foo",
Name: "staticFoo",
Namespace: "new",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
@ -2815,11 +2805,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
},
},
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodSyncs(kubelet.podManager.GetPods())
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
@ -3151,10 +3139,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
// Let the pod worker sets the status to fail after this sync.
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodUpdates(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", pods[0].UID)
@ -3205,10 +3190,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodUpdates(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", pods[0].UID)
@ -3243,10 +3225,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodSyncs(kl.podManager.GetPods())
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)
@ -3254,10 +3233,8 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
}
// Pod 1 has been deleted and no longer exists.
err = kl.SyncPods([]*api.Pod{pods[0]}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.podManager.SetPods([]*api.Pod{pods[0]})
kl.HandlePodCleanups()
if !dirExists(kl.getPodDir(pods[0].UID)) {
t.Errorf("expected directory to exist for pod 0")
}
@ -3298,10 +3275,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodSyncs(pods)
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)
@ -3311,7 +3285,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
// deleted.
kl.statusManager.SetPodStatus(pods[1], api.PodStatus{Phase: api.PodFailed})
kl.statusManager.SetPodStatus(pods[2], api.PodStatus{Phase: api.PodSucceeded})
err = kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
kl.HandlePodCleanups()
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)

View File

@ -19,7 +19,6 @@ package kubelet
import (
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -45,9 +44,19 @@ type podManager interface {
GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod)
GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*api.Pod)
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType)
// Methods that modify a single pod.
AddPod(pod *api.Pod)
UpdatePod(pod *api.Pod)
DeletePod(pod *api.Pod)
DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
@ -103,50 +112,6 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
return pm
}
// Update the internal pods with those provided by the update.
func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) {
pm.lock.Lock()
defer pm.lock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for uid := range pm.podByUID {
existingPods[uid] = struct{}{}
}
// Update the internal pods.
pm.setPods(u.Pods)
for uid := range pm.podByUID {
if _, ok := existingPods[uid]; !ok {
podSyncTypes[uid] = SyncPodCreate
}
}
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = SyncPodUpdate
}
allPods := applyUpdates(u.Pods, pm.getAllPods())
pm.setPods(allPods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for uid := range pm.podByUID {
if _, ok := podSyncTypes[uid]; !ok {
podSyncTypes[uid] = SyncPodSync
}
}
}
// Set the internal pods based on the new pods.
func (pm *basicPodManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock()
@ -177,24 +142,34 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) {
pm.mirrorPodByFullName = mirrorPodByFullName
}
func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod {
updated := []*api.Pod{}
m := map[types.UID]*api.Pod{}
for _, pod := range changed {
m[pod.UID] = pod
}
func (pm *basicPodManager) AddPod(pod *api.Pod) {
pm.UpdatePod(pod)
}
for _, pod := range current {
if m[pod.UID] != nil {
updated = append(updated, m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
}
func (pm *basicPodManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
}
}
return updated
func (pm *basicPodManager) DeletePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName)
} else {
delete(pm.podByUID, pod.UID)
delete(pm.podByFullName, podFullName)
}
}
// GetPods returns the regular pods bound to the kubelet and their spec.
@ -204,23 +179,20 @@ func (pm *basicPodManager) GetPods() []*api.Pod {
return podsMapToPods(pm.podByUID)
}
// GetPodsAndMirrorPods returns the both regular and mirror pods.
func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pods := podsMapToPods(pm.podByUID)
mirrorPods := podsMapToPods(pm.mirrorPodByUID)
return pods, mirrorPods
}
// Returns all pods (including mirror pods).
func (pm *basicPodManager) getAllPods() []*api.Pod {
return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
}
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
// pods indexed by full name.
func (pm *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) {
pm.lock.RLock()
defer pm.lock.RUnlock()
mirrorPods := make(map[string]*api.Pod)
for key, pod := range pm.mirrorPodByFullName {
mirrorPods[key] = pod
}
return podsMapToPods(pm.podByUID), mirrorPods
}
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found.
func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
@ -295,3 +267,17 @@ func podsMapToPods(UIDMap map[types.UID]*api.Pod) []*api.Pod {
}
return pods
}
func (pm *basicPodManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
return mirrorPod, ok
}
func (pm *basicPodManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
return pod, ok
}

View File

@ -32,6 +32,7 @@ import (
type PodWorkers interface {
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error
@ -171,19 +172,30 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
}
}
func (p *podWorkers) removeWorker(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
delete(p.podUpdates, uid)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
delete(p.lastUndeliveredWorkUpdate, uid)
}
}
}
func (p *podWorkers) ForgetWorker(uid types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
p.removeWorker(uid)
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key, channel := range p.podUpdates {
for key := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
delete(p.lastUndeliveredWorkUpdate, key)
}
p.removeWorker(key)
}
}
}

View File

@ -53,10 +53,15 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(pods []*api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
kl.handleNotFittingPods(pods)
ch := make(chan RunPodResult)
admitted := []*api.Pod{}
for _, pod := range pods {
// Check if we can admit the pod.
if ok, reason, message := kl.canAdmitPod(append(admitted, pod), pod); !ok {
kl.rejectPod(pod, reason, message)
} else {
admitted = append(admitted, pod)
}
go func(pod *api.Pod) {
err := kl.runPod(pod, retryDelay)
ch <- RunPodResult{pod, err}

View File

@ -76,6 +76,7 @@ func TestRunOnce(t *testing.T) {
cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
podManager, _ := newFakePodManager()
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
kb := &Kubelet{
rootDirectory: "/tmp/kubelet",
@ -88,6 +89,7 @@ func TestRunOnce(t *testing.T) {
podManager: podManager,
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),
diskSpaceManager: diskSpaceManager,
}
kb.containerManager, _ = newContainerManager(cadvisor, "", "", "")