mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Merge pull request #128518 from tallclair/pleg-watch-conditions
[FG:InPlacePodVerticalScaling] PLEG watch conditions: rapid polling for expected changes
This commit is contained in:
commit
25101d33bc
@ -66,6 +66,9 @@ type RuntimeHelper interface {
|
|||||||
|
|
||||||
// UnprepareDynamicResources unprepares resources for a a pod.
|
// UnprepareDynamicResources unprepares resources for a a pod.
|
||||||
UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error
|
UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error
|
||||||
|
|
||||||
|
// SetPodWatchCondition flags a pod to be inspected until the condition is met.
|
||||||
|
SetPodWatchCondition(types.UID, string, func(*PodStatus) bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
|
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
|
||||||
|
@ -114,3 +114,7 @@ func (f *FakeRuntimeHelper) PrepareDynamicResources(ctx context.Context, pod *v1
|
|||||||
func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
|
func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeRuntimeHelper) SetPodWatchCondition(_ kubetypes.UID, _ string, _ func(*kubecontainer.PodStatus) bool) {
|
||||||
|
// Not implemented.
|
||||||
|
}
|
||||||
|
@ -2028,17 +2028,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) {
|
|
||||||
// While resize is in progress, periodically request the latest status from the runtime via
|
|
||||||
// the PLEG. This is necessary since ordinarily pod status is only fetched when a container
|
|
||||||
// undergoes a state transition.
|
|
||||||
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
|
||||||
if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
|
|
||||||
klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod))
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3174,3 +3163,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
|
|||||||
kl.tryReconcileMirrorPods(staticPod, mirrorPod)
|
kl.tryReconcileMirrorPods(staticPod, mirrorPod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) SetPodWatchCondition(podUID types.UID, conditionKey string, condition pleg.WatchCondition) {
|
||||||
|
kl.pleg.SetPodWatchCondition(podUID, conditionKey, condition)
|
||||||
|
}
|
||||||
|
@ -57,6 +57,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/logs"
|
"k8s.io/kubernetes/pkg/kubelet/logs"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/pleg"
|
||||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
|
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
||||||
@ -804,6 +805,26 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res
|
|||||||
"pod", format.Pod(pod), "resourceName", resourceName)
|
"pod", format.Pod(pod), "resourceName", resourceName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName)
|
||||||
|
|
||||||
|
// Watch (poll) the container for the expected resources update. Stop watching once the resources
|
||||||
|
// match the desired values.
|
||||||
|
resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool {
|
||||||
|
if status.Resources == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch resourceName {
|
||||||
|
case v1.ResourceMemory:
|
||||||
|
return status.Resources.MemoryLimit.Equal(*container.Resources.Limits.Memory())
|
||||||
|
case v1.ResourceCPU:
|
||||||
|
return status.Resources.CPURequest.Equal(*container.Resources.Requests.Cpu()) &&
|
||||||
|
status.Resources.CPULimit.Equal(*container.Resources.Limits.Cpu())
|
||||||
|
default:
|
||||||
|
return true // Shouldn't happen.
|
||||||
|
}
|
||||||
|
})
|
||||||
|
m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition)
|
||||||
|
|
||||||
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
|
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
|
||||||
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
|
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
|
||||||
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.
|
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.
|
||||||
|
@ -427,6 +427,6 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon
|
|||||||
metrics.EventedPLEGConnLatency.Observe(duration.Seconds())
|
metrics.EventedPLEGConnLatency.Observe(duration.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
|
func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
|
||||||
return fmt.Errorf("not implemented"), false
|
e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition)
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,16 @@ type GenericPLEG struct {
|
|||||||
podCacheMutex sync.Mutex
|
podCacheMutex sync.Mutex
|
||||||
// logger is used for contextual logging
|
// logger is used for contextual logging
|
||||||
logger klog.Logger
|
logger klog.Logger
|
||||||
|
// watchConditions tracks pod watch conditions, guarded by watchConditionsLock
|
||||||
|
// watchConditions is a map of pod UID -> condition key -> condition
|
||||||
|
watchConditions map[types.UID]map[string]versionedWatchCondition
|
||||||
|
watchConditionsLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type versionedWatchCondition struct {
|
||||||
|
key string
|
||||||
|
condition WatchCondition
|
||||||
|
version uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// plegContainerState has a one-to-one mapping to the
|
// plegContainerState has a one-to-one mapping to the
|
||||||
@ -132,6 +142,7 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan
|
|||||||
podRecords: make(podRecords),
|
podRecords: make(podRecords),
|
||||||
cache: cache,
|
cache: cache,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
|
watchConditions: make(map[types.UID]map[string]versionedWatchCondition),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,28 +263,29 @@ func (g *GenericPLEG) Relist() {
|
|||||||
// update running pod and container count
|
// update running pod and container count
|
||||||
updateRunningPodAndContainerMetrics(pods)
|
updateRunningPodAndContainerMetrics(pods)
|
||||||
g.podRecords.setCurrent(pods)
|
g.podRecords.setCurrent(pods)
|
||||||
|
g.cleanupOrphanedWatchConditions()
|
||||||
|
|
||||||
|
needsReinspection := make(map[types.UID]*kubecontainer.Pod)
|
||||||
|
|
||||||
// Compare the old and the current pods, and generate events.
|
|
||||||
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
|
||||||
for pid := range g.podRecords {
|
for pid := range g.podRecords {
|
||||||
|
// Compare the old and the current pods, and generate events.
|
||||||
oldPod := g.podRecords.getOld(pid)
|
oldPod := g.podRecords.getOld(pid)
|
||||||
pod := g.podRecords.getCurrent(pid)
|
pod := g.podRecords.getCurrent(pid)
|
||||||
// Get all containers in the old and the new pod.
|
// Get all containers in the old and the new pod.
|
||||||
allContainers := getContainersFromPods(oldPod, pod)
|
allContainers := getContainersFromPods(oldPod, pod)
|
||||||
|
var events []*PodLifecycleEvent
|
||||||
for _, container := range allContainers {
|
for _, container := range allContainers {
|
||||||
events := computeEvents(g.logger, oldPod, pod, &container.ID)
|
containerEvents := computeEvents(g.logger, oldPod, pod, &container.ID)
|
||||||
for _, e := range events {
|
events = append(events, containerEvents...)
|
||||||
updateEvents(eventsByPodID, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
needsReinspection := make(map[types.UID]*kubecontainer.Pod)
|
watchConditions := g.getPodWatchConditions(pid)
|
||||||
|
_, reinspect := g.podsToReinspect[pid]
|
||||||
|
|
||||||
// If there are events associated with a pod, we should update the
|
if len(events) == 0 && len(watchConditions) == 0 && !reinspect {
|
||||||
// podCache.
|
// Nothing else needed for this pod.
|
||||||
for pid, events := range eventsByPodID {
|
continue
|
||||||
pod := g.podRecords.getCurrent(pid)
|
}
|
||||||
|
|
||||||
// updateCache() will inspect the pod and update the cache. If an
|
// updateCache() will inspect the pod and update the cache. If an
|
||||||
// error occurs during the inspection, we want PLEG to retry again
|
// error occurs during the inspection, we want PLEG to retry again
|
||||||
@ -284,7 +296,8 @@ func (g *GenericPLEG) Relist() {
|
|||||||
// inspecting the pod and getting the PodStatus to update the cache
|
// inspecting the pod and getting the PodStatus to update the cache
|
||||||
// serially may take a while. We should be aware of this and
|
// serially may take a while. We should be aware of this and
|
||||||
// parallelize if needed.
|
// parallelize if needed.
|
||||||
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
|
status, updated, err := g.updateCache(ctx, pod, pid)
|
||||||
|
if err != nil {
|
||||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
// Rely on updateCache calling GetPodStatus to log the actual error.
|
||||||
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||||
|
|
||||||
@ -292,16 +305,25 @@ func (g *GenericPLEG) Relist() {
|
|||||||
needsReinspection[pid] = pod
|
needsReinspection[pid] = pod
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else {
|
} else if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||||
// this pod was in the list to reinspect and we did so because it had events, so remove it
|
|
||||||
// from the list (we don't want the reinspection code below to inspect it a second time in
|
|
||||||
// this relist execution)
|
|
||||||
delete(g.podsToReinspect, pid)
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
|
||||||
if !updated {
|
if !updated {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var completedConditions []versionedWatchCondition
|
||||||
|
for _, condition := range watchConditions {
|
||||||
|
if condition.condition(status) {
|
||||||
|
// condition was met: add it to the list of completed conditions.
|
||||||
|
completedConditions = append(completedConditions, condition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(completedConditions) > 0 {
|
||||||
|
g.completeWatchConditions(pid, completedConditions)
|
||||||
|
// If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync.
|
||||||
|
// We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates
|
||||||
|
// all containers in the pod with the latest status.
|
||||||
|
events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the internal storage and send out the events.
|
// Update the internal storage and send out the events.
|
||||||
@ -325,8 +347,6 @@ func (g *GenericPLEG) Relist() {
|
|||||||
if events[i].Type == ContainerDied {
|
if events[i].Type == ContainerDied {
|
||||||
// Fill up containerExitCode map for ContainerDied event when first time appeared
|
// Fill up containerExitCode map for ContainerDied event when first time appeared
|
||||||
if len(containerExitCode) == 0 && pod != nil {
|
if len(containerExitCode) == 0 && pod != nil {
|
||||||
// Get updated podStatus
|
|
||||||
status, err := g.cache.Get(pod.ID)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, containerStatus := range status.ContainerStatuses {
|
for _, containerStatus := range status.ContainerStatuses {
|
||||||
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
|
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
|
||||||
@ -342,18 +362,6 @@ func (g *GenericPLEG) Relist() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reinspect any pods that failed inspection during the previous relist
|
|
||||||
if len(g.podsToReinspect) > 0 {
|
|
||||||
g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection")
|
|
||||||
for pid, pod := range g.podsToReinspect {
|
|
||||||
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
|
|
||||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
|
||||||
g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
|
|
||||||
needsReinspection[pid] = pod
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the cache timestamp. This needs to happen *after*
|
// Update the cache timestamp. This needs to happen *after*
|
||||||
// all pods have been properly updated in the cache.
|
// all pods have been properly updated in the cache.
|
||||||
g.cache.UpdateTime(timestamp)
|
g.cache.UpdateTime(timestamp)
|
||||||
@ -427,13 +435,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
|
|||||||
// updateCache tries to update the pod status in the kubelet cache and returns true if the
|
// updateCache tries to update the pod status in the kubelet cache and returns true if the
|
||||||
// pod status was actually updated in the cache. It will return false if the pod status
|
// pod status was actually updated in the cache. It will return false if the pod status
|
||||||
// was ignored by the cache.
|
// was ignored by the cache.
|
||||||
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
|
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (*kubecontainer.PodStatus, bool, error) {
|
||||||
if pod == nil {
|
if pod == nil {
|
||||||
// The pod is missing in the current relist. This means that
|
// The pod is missing in the current relist. This means that
|
||||||
// the pod has no visible (active or inactive) containers.
|
// the pod has no visible (active or inactive) containers.
|
||||||
g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid))
|
g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid))
|
||||||
g.cache.Delete(pid)
|
g.cache.Delete(pid)
|
||||||
return nil, true
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
g.podCacheMutex.Lock()
|
g.podCacheMutex.Lock()
|
||||||
@ -477,22 +485,90 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
|
|||||||
timestamp = status.TimeStamp
|
timestamp = status.TimeStamp
|
||||||
}
|
}
|
||||||
|
|
||||||
return err, g.cache.Set(pod.ID, status, err, timestamp)
|
return status, g.cache.Set(pod.ID, status, err, timestamp), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
|
// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
|
||||||
ctx := context.Background()
|
// condition is met. The condition is keyed so it can be updated before the condition
|
||||||
if pod == nil {
|
// is met.
|
||||||
return fmt.Errorf("pod cannot be nil"), false
|
func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
|
||||||
|
g.watchConditionsLock.Lock()
|
||||||
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
|
conditions, ok := g.watchConditions[podUID]
|
||||||
|
if !ok {
|
||||||
|
conditions = make(map[string]versionedWatchCondition)
|
||||||
}
|
}
|
||||||
return g.updateCache(ctx, pod, pid)
|
|
||||||
|
versioned, found := conditions[conditionKey]
|
||||||
|
if found {
|
||||||
|
// Watch condition was already set. Increment its version & update the condition function.
|
||||||
|
versioned.version++
|
||||||
|
versioned.condition = condition
|
||||||
|
conditions[conditionKey] = versioned
|
||||||
|
} else {
|
||||||
|
conditions[conditionKey] = versionedWatchCondition{
|
||||||
|
key: conditionKey,
|
||||||
|
condition: condition,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.watchConditions[podUID] = conditions
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
|
// getPodWatchConditions returns a list of the active watch conditions for the pod.
|
||||||
if e == nil {
|
func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCondition {
|
||||||
|
g.watchConditionsLock.Lock()
|
||||||
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
|
podConditions, ok := g.watchConditions[podUID]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flatten the map into a list of conditions. This also serves to create a copy, so the lock can
|
||||||
|
// be released.
|
||||||
|
conditions := make([]versionedWatchCondition, 0, len(podConditions))
|
||||||
|
for _, condition := range podConditions {
|
||||||
|
conditions = append(conditions, condition)
|
||||||
|
}
|
||||||
|
return conditions
|
||||||
|
}
|
||||||
|
|
||||||
|
// completeWatchConditions removes the completed watch conditions, unless they have been updated
|
||||||
|
// since the condition was checked.
|
||||||
|
func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) {
|
||||||
|
g.watchConditionsLock.Lock()
|
||||||
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
|
conditions, ok := g.watchConditions[podUID]
|
||||||
|
if !ok {
|
||||||
|
// Pod was deleted, nothing to do.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
|
|
||||||
|
for _, completed := range completedConditions {
|
||||||
|
condition := conditions[completed.key]
|
||||||
|
// Only clear the condition if it has not been updated.
|
||||||
|
if condition.version == completed.version {
|
||||||
|
delete(conditions, completed.key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g.watchConditions[podUID] = conditions
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from
|
||||||
|
// the pod records. Events are not emitted for removed pods.
|
||||||
|
func (g *GenericPLEG) cleanupOrphanedWatchConditions() {
|
||||||
|
g.watchConditionsLock.Lock()
|
||||||
|
defer g.watchConditionsLock.Unlock()
|
||||||
|
|
||||||
|
for podUID := range g.watchConditions {
|
||||||
|
if g.podRecords.getCurrent(podUID) == nil {
|
||||||
|
// Pod was deleted, remove it from the watch conditions.
|
||||||
|
delete(g.watchConditions, podUID)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
|
func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
@ -82,9 +83,10 @@ func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
|
|||||||
return events
|
return events
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestContainer(ID string, state kubecontainer.State) *kubecontainer.Container {
|
func createTestContainer(id string, state kubecontainer.State) *kubecontainer.Container {
|
||||||
return &kubecontainer.Container{
|
return &kubecontainer.Container{
|
||||||
ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
|
ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: id},
|
||||||
|
Name: id,
|
||||||
State: state,
|
State: state,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -317,14 +319,14 @@ func testReportMissingPods(t *testing.T, numRelists int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG {
|
func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG {
|
||||||
pleg := &GenericPLEG{
|
pleg := NewGenericPLEG(
|
||||||
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour},
|
klog.Logger{},
|
||||||
runtime: runtimeMock,
|
runtimeMock,
|
||||||
eventChannel: make(chan *PodLifecycleEvent, 1000),
|
make(chan *PodLifecycleEvent, 1000),
|
||||||
podRecords: make(podRecords),
|
&RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour},
|
||||||
cache: kubecontainer.NewCache(),
|
kubecontainer.NewCache(),
|
||||||
clock: clock.RealClock{},
|
clock.RealClock{},
|
||||||
}
|
).(*GenericPLEG)
|
||||||
return pleg
|
return pleg
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -736,3 +738,260 @@ kubelet_running_pods 2
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchConditions(t *testing.T) {
|
||||||
|
pods := []*kubecontainer.Pod{{
|
||||||
|
Name: "running-pod",
|
||||||
|
ID: "running",
|
||||||
|
Sandboxes: []*kubecontainer.Container{
|
||||||
|
createTestContainer("s", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Name: "running-pod-2",
|
||||||
|
ID: "running-2",
|
||||||
|
Sandboxes: []*kubecontainer.Container{
|
||||||
|
createTestContainer("s", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
createTestContainer("c-exited", kubecontainer.ContainerStateExited),
|
||||||
|
createTestContainer("c-running", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Name: "terminating-pod",
|
||||||
|
ID: "terminating",
|
||||||
|
Sandboxes: []*kubecontainer.Container{
|
||||||
|
createTestContainer("s", kubecontainer.ContainerStateExited),
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Name: "reinspect-pod",
|
||||||
|
ID: "reinspect",
|
||||||
|
Sandboxes: []*kubecontainer.Container{
|
||||||
|
createTestContainer("s", kubecontainer.ContainerStateRunning),
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
initialPods := pods
|
||||||
|
initialPods = append(initialPods, &kubecontainer.Pod{
|
||||||
|
Name: "terminated-pod",
|
||||||
|
ID: "terminated",
|
||||||
|
Sandboxes: []*kubecontainer.Container{
|
||||||
|
createTestContainer("s", kubecontainer.ContainerStateExited),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
alwaysComplete := func(_ *kubecontainer.PodStatus) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
neverComplete := func(_ *kubecontainer.PodStatus) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var pleg *GenericPLEG
|
||||||
|
var updatingCond WatchCondition
|
||||||
|
// updatingCond always completes, but updates the condition first.
|
||||||
|
updatingCond = func(_ *kubecontainer.PodStatus) bool {
|
||||||
|
pleg.SetPodWatchCondition("running", "updating", updatingCond)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// resettingCond decrements the version before it completes.
|
||||||
|
var resettingCond = func(_ *kubecontainer.PodStatus) bool {
|
||||||
|
versioned := pleg.watchConditions["running"]["resetting"]
|
||||||
|
versioned.version = 0
|
||||||
|
pleg.watchConditions["running"]["resetting"] = versioned
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeContainerCond returns a RunningContainerWatchCondition that asserts the expected container status
|
||||||
|
makeContainerCond := func(expectedContainerName string, complete bool) WatchCondition {
|
||||||
|
return RunningContainerWatchCondition(expectedContainerName, func(status *kubecontainer.Status) bool {
|
||||||
|
if status.Name != expectedContainerName {
|
||||||
|
panic(fmt.Sprintf("unexpected container name: got %q, want %q", status.Name, expectedContainerName))
|
||||||
|
}
|
||||||
|
return complete
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
podUID types.UID
|
||||||
|
watchConditions map[string]WatchCondition
|
||||||
|
incrementInitialVersion bool // Whether to call SetPodWatchCondition multiple times to increment the version
|
||||||
|
expectEvaluated bool // Whether the watch conditions should be evaluated
|
||||||
|
expectRemoved bool // Whether podUID should be present in the watch conditions map
|
||||||
|
expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUID (only key & version checked)
|
||||||
|
}{{
|
||||||
|
name: "no watch conditions",
|
||||||
|
podUID: "running",
|
||||||
|
}, {
|
||||||
|
name: "running pod with conditions",
|
||||||
|
podUID: "running",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"completing": alwaysComplete,
|
||||||
|
"watching": neverComplete,
|
||||||
|
"updating": updatingCond,
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"watching": {version: 0},
|
||||||
|
"updating": {version: 1},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "conditions with incremented versions",
|
||||||
|
podUID: "running",
|
||||||
|
incrementInitialVersion: true,
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"completing": alwaysComplete,
|
||||||
|
"watching": neverComplete,
|
||||||
|
"updating": updatingCond,
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"watching": {version: 1},
|
||||||
|
"updating": {version: 2},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "completed watch condition with older version",
|
||||||
|
podUID: "running",
|
||||||
|
incrementInitialVersion: true,
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"resetting": resettingCond,
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"resetting": {version: 0},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "non-existent pod",
|
||||||
|
podUID: "non-existent",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"watching": neverComplete,
|
||||||
|
},
|
||||||
|
expectEvaluated: false,
|
||||||
|
expectRemoved: true,
|
||||||
|
}, {
|
||||||
|
name: "terminated pod",
|
||||||
|
podUID: "terminated",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"watching": neverComplete,
|
||||||
|
},
|
||||||
|
expectEvaluated: false,
|
||||||
|
expectRemoved: true,
|
||||||
|
}, {
|
||||||
|
name: "reinspecting pod",
|
||||||
|
podUID: "reinspect",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"watching": neverComplete,
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"watching": {version: 0},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "single container conditions",
|
||||||
|
podUID: "running",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"completing": makeContainerCond("c", true),
|
||||||
|
"watching": makeContainerCond("c", false),
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"watching": {version: 0},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "multi-container conditions",
|
||||||
|
podUID: "running-2",
|
||||||
|
watchConditions: map[string]WatchCondition{
|
||||||
|
"completing:exited": makeContainerCond("c-exited", true),
|
||||||
|
"watching:exited": makeContainerCond("c-exited", false),
|
||||||
|
"completing:running": makeContainerCond("c-running", true),
|
||||||
|
"watching:running": makeContainerCond("c-running", false),
|
||||||
|
"completing:dne": makeContainerCond("c-dne", true),
|
||||||
|
"watching:dne": makeContainerCond("c-dne", false),
|
||||||
|
},
|
||||||
|
expectEvaluated: true,
|
||||||
|
expectWatchConditions: map[string]versionedWatchCondition{
|
||||||
|
"watching:running": {version: 0},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
runtimeMock := containertest.NewMockRuntime(t)
|
||||||
|
pleg = newTestGenericPLEGWithRuntimeMock(runtimeMock)
|
||||||
|
|
||||||
|
// Mock pod statuses
|
||||||
|
for _, pod := range initialPods {
|
||||||
|
podStatus := &kubecontainer.PodStatus{
|
||||||
|
ID: pod.ID,
|
||||||
|
Name: pod.Name,
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
}
|
||||||
|
for _, c := range pod.Containers {
|
||||||
|
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &kubecontainer.Status{
|
||||||
|
ID: c.ID,
|
||||||
|
Name: c.Name,
|
||||||
|
State: c.State,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
runtimeMock.EXPECT().
|
||||||
|
GetPodStatus(mock.Anything, pod.ID, pod.Name, pod.Namespace).
|
||||||
|
Return(podStatus, nil).Maybe()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup initial pod records.
|
||||||
|
runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(initialPods, nil).Once()
|
||||||
|
pleg.Relist()
|
||||||
|
pleg.podsToReinspect["reinspect"] = nil
|
||||||
|
|
||||||
|
// Remove "terminated" pod.
|
||||||
|
runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(pods, nil).Once()
|
||||||
|
|
||||||
|
var evaluatedConditions []string
|
||||||
|
for key, condition := range test.watchConditions {
|
||||||
|
wrappedCondition := func(status *kubecontainer.PodStatus) bool {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
require.Fail(t, "condition error", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
assert.Equal(t, test.podUID, status.ID, "podUID")
|
||||||
|
if !test.expectEvaluated {
|
||||||
|
assert.Fail(t, "conditions should not be evaluated")
|
||||||
|
} else {
|
||||||
|
evaluatedConditions = append(evaluatedConditions, key)
|
||||||
|
}
|
||||||
|
return condition(status)
|
||||||
|
}
|
||||||
|
pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition)
|
||||||
|
if test.incrementInitialVersion {
|
||||||
|
// Set the watch condition a second time to increment the version.
|
||||||
|
pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pleg.Relist()
|
||||||
|
|
||||||
|
if test.expectEvaluated {
|
||||||
|
assert.Len(t, evaluatedConditions, len(test.watchConditions), "all conditions should be evaluated")
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectRemoved {
|
||||||
|
assert.NotContains(t, pleg.watchConditions, test.podUID, "Pod should be removed from watch conditions")
|
||||||
|
} else {
|
||||||
|
actualConditions := pleg.watchConditions[test.podUID]
|
||||||
|
assert.Len(t, actualConditions, len(test.expectWatchConditions), "expected number of conditions")
|
||||||
|
for key, expected := range test.expectWatchConditions {
|
||||||
|
if !assert.Contains(t, actualConditions, key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
actual := actualConditions[key]
|
||||||
|
assert.Equal(t, key, actual.key)
|
||||||
|
assert.Equal(t, expected.version, actual.version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -47,6 +47,8 @@ const (
|
|||||||
PodSync PodLifeCycleEventType = "PodSync"
|
PodSync PodLifeCycleEventType = "PodSync"
|
||||||
// ContainerChanged - event type when the new state of container is unknown.
|
// ContainerChanged - event type when the new state of container is unknown.
|
||||||
ContainerChanged PodLifeCycleEventType = "ContainerChanged"
|
ContainerChanged PodLifeCycleEventType = "ContainerChanged"
|
||||||
|
// ConditionMet - event type triggered when any number of watch conditions are met.
|
||||||
|
ConditionMet PodLifeCycleEventType = "ConditionMet"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodLifecycleEvent is an event that reflects the change of the pod state.
|
// PodLifecycleEvent is an event that reflects the change of the pod state.
|
||||||
@ -66,7 +68,10 @@ type PodLifecycleEventGenerator interface {
|
|||||||
Start()
|
Start()
|
||||||
Watch() chan *PodLifecycleEvent
|
Watch() chan *PodLifecycleEvent
|
||||||
Healthy() (bool, error)
|
Healthy() (bool, error)
|
||||||
UpdateCache(*kubecontainer.Pod, types.UID) (error, bool)
|
// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
|
||||||
|
// condition is met. The condition is keyed so it can be updated before the condition
|
||||||
|
// is met.
|
||||||
|
SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
// podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs
|
// podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs
|
||||||
@ -77,3 +82,19 @@ type podLifecycleEventGeneratorHandler interface {
|
|||||||
Update(relistDuration *RelistDuration)
|
Update(relistDuration *RelistDuration)
|
||||||
Relist()
|
Relist()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WatchCondition takes the latest PodStatus, and returns whether the condition is met.
|
||||||
|
type WatchCondition = func(*kubecontainer.PodStatus) bool
|
||||||
|
|
||||||
|
// RunningContainerWatchCondition wraps a condition on the container status to make a pod
|
||||||
|
// WatchCondition. If the container is no longer running, the condition is implicitly cleared.
|
||||||
|
func RunningContainerWatchCondition(containerName string, condition func(*kubecontainer.Status) bool) WatchCondition {
|
||||||
|
return func(podStatus *kubecontainer.PodStatus) bool {
|
||||||
|
status := podStatus.FindContainerStatusByName(containerName)
|
||||||
|
if status == nil || status.State != kubecontainer.ContainerStateRunning {
|
||||||
|
// Container isn't running. Consider the condition "completed" so it is cleared.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return condition(status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user