diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index ad509c9435c..516cba76a45 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -799,6 +799,9 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res return err } resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) + + // Watch (poll) the container for the expected resources update. Stop watching once the resources + // match the desired values. resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { if status.Resources == nil { return false @@ -814,6 +817,7 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res } }) m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) + // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 956ef60afc3..ce0abc6bd64 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -314,11 +314,15 @@ func (g *GenericPLEG) Relist() { var completedConditions []versionedWatchCondition for _, condition := range watchConditions { if condition.condition(status) { + // condition was met: add it to the list of completed conditions. completedConditions = append(completedConditions, condition) } } if len(completedConditions) > 0 { g.completeWatchConditions(pid, completedConditions) + // If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync. + // We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates + // all containers in the pod with the latest status. events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet}) } @@ -484,24 +488,25 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p return status, g.cache.Set(pod.ID, status, err, timestamp), err } +// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch +// condition is met. The condition is keyed so it can be updated before the condition +// is met. func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() conditions, ok := g.watchConditions[podUID] if !ok { - if condition == nil { - return // Condition isn't set, nothing to do. - } conditions = make(map[string]versionedWatchCondition) } versioned, found := conditions[conditionKey] if found { + // Watch condition was already set. Increment its version & update the condition function. versioned.version++ versioned.condition = condition conditions[conditionKey] = versioned - } else if condition != nil { + } else { conditions[conditionKey] = versionedWatchCondition{ key: conditionKey, condition: condition, @@ -516,19 +521,22 @@ func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCo g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() - conditions, ok := g.watchConditions[podUID] + podConditions, ok := g.watchConditions[podUID] if !ok { return nil } - filtered := make([]versionedWatchCondition, 0, len(conditions)) - for _, condition := range conditions { - filtered = append(filtered, condition) + // Flatten the map into a list of conditions. This also serves to create a copy, so the lock can + // be released. + conditions := make([]versionedWatchCondition, 0, len(podConditions)) + for _, condition := range podConditions { + conditions = append(conditions, condition) } - return filtered + return conditions } -// completeWatchConditions clears the completed watch conditions. +// completeWatchConditions removes the completed watch conditions, unless they have been updated +// since the condition was checked. func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() @@ -549,6 +557,8 @@ func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditi g.watchConditions[podUID] = conditions } +// cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from +// the pod records. Events are not emitted for removed pods. func (g *GenericPLEG) cleanupOrphanedWatchConditions() { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock()