mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
More comments around PLEG WatchConditions
This commit is contained in:
parent
35bd1e6831
commit
7fce6f2317
@ -799,6 +799,9 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName)
|
resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName)
|
||||||
|
|
||||||
|
// Watch (poll) the container for the expected resources update. Stop watching once the resources
|
||||||
|
// match the desired values.
|
||||||
resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool {
|
resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool {
|
||||||
if status.Resources == nil {
|
if status.Resources == nil {
|
||||||
return false
|
return false
|
||||||
@ -814,6 +817,7 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition)
|
m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition)
|
||||||
|
|
||||||
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
|
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
|
||||||
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
|
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
|
||||||
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.
|
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.
|
||||||
|
@ -314,11 +314,15 @@ func (g *GenericPLEG) Relist() {
|
|||||||
var completedConditions []versionedWatchCondition
|
var completedConditions []versionedWatchCondition
|
||||||
for _, condition := range watchConditions {
|
for _, condition := range watchConditions {
|
||||||
if condition.condition(status) {
|
if condition.condition(status) {
|
||||||
|
// condition was met: add it to the list of completed conditions.
|
||||||
completedConditions = append(completedConditions, condition)
|
completedConditions = append(completedConditions, condition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(completedConditions) > 0 {
|
if len(completedConditions) > 0 {
|
||||||
g.completeWatchConditions(pid, completedConditions)
|
g.completeWatchConditions(pid, completedConditions)
|
||||||
|
// If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync.
|
||||||
|
// We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates
|
||||||
|
// all containers in the pod with the latest status.
|
||||||
events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet})
|
events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -484,24 +488,25 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
|
|||||||
return status, g.cache.Set(pod.ID, status, err, timestamp), err
|
return status, g.cache.Set(pod.ID, status, err, timestamp), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
|
||||||
|
// condition is met. The condition is keyed so it can be updated before the condition
|
||||||
|
// is met.
|
||||||
func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
|
func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
|
||||||
g.watchConditionsLock.Lock()
|
g.watchConditionsLock.Lock()
|
||||||
defer g.watchConditionsLock.Unlock()
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
conditions, ok := g.watchConditions[podUID]
|
conditions, ok := g.watchConditions[podUID]
|
||||||
if !ok {
|
if !ok {
|
||||||
if condition == nil {
|
|
||||||
return // Condition isn't set, nothing to do.
|
|
||||||
}
|
|
||||||
conditions = make(map[string]versionedWatchCondition)
|
conditions = make(map[string]versionedWatchCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
versioned, found := conditions[conditionKey]
|
versioned, found := conditions[conditionKey]
|
||||||
if found {
|
if found {
|
||||||
|
// Watch condition was already set. Increment its version & update the condition function.
|
||||||
versioned.version++
|
versioned.version++
|
||||||
versioned.condition = condition
|
versioned.condition = condition
|
||||||
conditions[conditionKey] = versioned
|
conditions[conditionKey] = versioned
|
||||||
} else if condition != nil {
|
} else {
|
||||||
conditions[conditionKey] = versionedWatchCondition{
|
conditions[conditionKey] = versionedWatchCondition{
|
||||||
key: conditionKey,
|
key: conditionKey,
|
||||||
condition: condition,
|
condition: condition,
|
||||||
@ -516,19 +521,22 @@ func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCo
|
|||||||
g.watchConditionsLock.Lock()
|
g.watchConditionsLock.Lock()
|
||||||
defer g.watchConditionsLock.Unlock()
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
conditions, ok := g.watchConditions[podUID]
|
podConditions, ok := g.watchConditions[podUID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filtered := make([]versionedWatchCondition, 0, len(conditions))
|
// Flatten the map into a list of conditions. This also serves to create a copy, so the lock can
|
||||||
for _, condition := range conditions {
|
// be released.
|
||||||
filtered = append(filtered, condition)
|
conditions := make([]versionedWatchCondition, 0, len(podConditions))
|
||||||
|
for _, condition := range podConditions {
|
||||||
|
conditions = append(conditions, condition)
|
||||||
}
|
}
|
||||||
return filtered
|
return conditions
|
||||||
}
|
}
|
||||||
|
|
||||||
// completeWatchConditions clears the completed watch conditions.
|
// completeWatchConditions removes the completed watch conditions, unless they have been updated
|
||||||
|
// since the condition was checked.
|
||||||
func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) {
|
func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) {
|
||||||
g.watchConditionsLock.Lock()
|
g.watchConditionsLock.Lock()
|
||||||
defer g.watchConditionsLock.Unlock()
|
defer g.watchConditionsLock.Unlock()
|
||||||
@ -549,6 +557,8 @@ func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditi
|
|||||||
g.watchConditions[podUID] = conditions
|
g.watchConditions[podUID] = conditions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from
|
||||||
|
// the pod records. Events are not emitted for removed pods.
|
||||||
func (g *GenericPLEG) cleanupOrphanedWatchConditions() {
|
func (g *GenericPLEG) cleanupOrphanedWatchConditions() {
|
||||||
g.watchConditionsLock.Lock()
|
g.watchConditionsLock.Lock()
|
||||||
defer g.watchConditionsLock.Unlock()
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user