Add WatchCondition concept to the PLEG

This commit is contained in:
Tim Allclair 2024-10-30 21:07:17 -07:00
parent 07a9ab87bc
commit f4d36dd402
4 changed files with 284 additions and 18 deletions

View File

@ -430,3 +430,7 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon
func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
return fmt.Errorf("not implemented"), false
}
func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition)
}

View File

@ -80,6 +80,16 @@ type GenericPLEG struct {
podCacheMutex sync.Mutex
// logger is used for contextual logging
logger klog.Logger
// watchConditions tracks pod watch conditions, guarded by watchConditionsLock
// watchConditions is a map of pod UID -> condition key -> condition
watchConditions map[types.UID]map[string]versionedWatchCondition
watchConditionsLock sync.Mutex
}
type versionedWatchCondition struct {
key string
condition WatchCondition
version uint32
}
// plegContainerState has a one-to-one mapping to the
@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan
panic("cache cannot be nil")
}
return &GenericPLEG{
logger: logger,
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
logger: logger,
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
watchConditions: make(map[types.UID]map[string]versionedWatchCondition),
}
}
@ -252,6 +263,7 @@ func (g *GenericPLEG) Relist() {
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)
g.cleanupOrphanedWatchConditions()
needsReinspection := make(map[types.UID]*kubecontainer.Pod)
@ -267,9 +279,10 @@ func (g *GenericPLEG) Relist() {
events = append(events, containerEvents...)
}
watchConditions := g.getPodWatchConditions(pid)
_, reinspect := g.podsToReinspect[pid]
if len(events) == 0 && !reinspect {
if len(events) == 0 && len(watchConditions) == 0 && !reinspect {
// Nothing else needed for this pod.
continue
}
@ -283,7 +296,8 @@ func (g *GenericPLEG) Relist() {
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
status, updated, err := g.updateCache(ctx, pod, pid)
if err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
@ -299,6 +313,14 @@ func (g *GenericPLEG) Relist() {
}
}
var completedConditions []versionedWatchCondition
for _, condition := range watchConditions {
if condition.condition(status) {
completedConditions = append(completedConditions, condition)
}
}
g.completeWatchConditions(pid, completedConditions)
// Update the internal storage and send out the events.
g.podRecords.update(pid)
@ -320,8 +342,6 @@ func (g *GenericPLEG) Relist() {
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
@ -410,13 +430,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
// updateCache tries to update the pod status in the kubelet cache and returns true if the
// pod status was actually updated in the cache. It will return false if the pod status
// was ignored by the cache.
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (*kubecontainer.PodStatus, bool, error) {
if pod == nil {
// The pod is missing in the current relist. This means that
// the pod has no visible (active or inactive) containers.
g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid))
g.cache.Delete(pid)
return nil, true
return nil, true, nil
}
g.podCacheMutex.Lock()
@ -460,7 +480,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
timestamp = status.TimeStamp
}
return err, g.cache.Set(pod.ID, status, err, timestamp)
return status, g.cache.Set(pod.ID, status, err, timestamp), err
}
func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
@ -468,14 +488,85 @@ func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error,
if pod == nil {
return fmt.Errorf("pod cannot be nil"), false
}
return g.updateCache(ctx, pod, pid)
_, updated, err := g.updateCache(ctx, pod, pid)
return err, updated
}
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
if e == nil {
func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
g.watchConditionsLock.Lock()
defer g.watchConditionsLock.Unlock()
conditions, ok := g.watchConditions[podUID]
if !ok {
if condition == nil {
return // Condition isn't set, nothing to do.
}
conditions = make(map[string]versionedWatchCondition)
}
versioned, found := conditions[conditionKey]
if found {
versioned.version++
versioned.condition = condition
conditions[conditionKey] = versioned
} else if condition != nil {
conditions[conditionKey] = versionedWatchCondition{
key: conditionKey,
condition: condition,
}
}
g.watchConditions[podUID] = conditions
}
// getPodWatchConditions returns a list of the active watch conditions for the pod.
func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCondition {
g.watchConditionsLock.Lock()
defer g.watchConditionsLock.Unlock()
conditions, ok := g.watchConditions[podUID]
if !ok {
return nil
}
filtered := make([]versionedWatchCondition, 0, len(conditions))
for _, condition := range conditions {
filtered = append(filtered, condition)
}
return filtered
}
// completeWatchConditions clears the completed watch conditions.
func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) {
g.watchConditionsLock.Lock()
defer g.watchConditionsLock.Unlock()
conditions, ok := g.watchConditions[podUID]
if !ok {
// Pod was deleted, nothing to do.
return
}
eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
for _, completed := range completedConditions {
condition := conditions[completed.key]
// Only clear the condition if it has not been updated.
if condition.version == completed.version {
delete(conditions, completed.key)
}
}
g.watchConditions[podUID] = conditions
}
func (g *GenericPLEG) cleanupOrphanedWatchConditions() {
g.watchConditionsLock.Lock()
defer g.watchConditionsLock.Unlock()
for podUID := range g.watchConditions {
if g.podRecords.getCurrent(podUID) == nil {
// Pod was deleted, remove it from the watch conditions.
delete(g.watchConditions, podUID)
}
}
}
func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {

View File

@ -736,3 +736,154 @@ kubelet_running_pods 2
})
}
}
func TestWatchConditions(t *testing.T) {
pods := []*containertest.FakePod{{
Pod: &kubecontainer.Pod{
Name: "running-pod",
ID: "running",
Sandboxes: []*kubecontainer.Container{
createTestContainer("s", kubecontainer.ContainerStateRunning),
},
Containers: []*kubecontainer.Container{
createTestContainer("c", kubecontainer.ContainerStateRunning),
},
},
}, {
Pod: &kubecontainer.Pod{
Name: "terminating-pod",
ID: "terminating",
Sandboxes: []*kubecontainer.Container{
createTestContainer("s", kubecontainer.ContainerStateExited),
},
},
}, {
Pod: &kubecontainer.Pod{
Name: "reinspect-pod",
ID: "reinspect",
Sandboxes: []*kubecontainer.Container{
createTestContainer("s", kubecontainer.ContainerStateRunning),
},
},
}}
initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{
Name: "terminated-pod",
ID: "terminated",
Sandboxes: []*kubecontainer.Container{
createTestContainer("s", kubecontainer.ContainerStateExited),
},
}})
alwaysComplete := func(_ *kubecontainer.PodStatus) bool {
return true
}
neverComplete := func(_ *kubecontainer.PodStatus) bool {
return false
}
var pleg *GenericPLEG
var updatingCond WatchCondition
// updatingCond always completes, but updates the condition first.
updatingCond = func(_ *kubecontainer.PodStatus) bool {
pleg.SetPodWatchCondition("running", "updating", updatingCond)
return true
}
testCases := []struct {
name string
podUID types.UID
watchConditions map[string]WatchCondition
expectEvaluated bool // Whether the watch conditions should be evaluated
expectRemoved bool // Whether podUID should be present in the watch conditions map
expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUIDa (only key & version checked)
}{{
name: "no watch conditions",
podUID: "running",
}, {
name: "running pod with conditions",
podUID: "running",
watchConditions: map[string]WatchCondition{
"completing": alwaysComplete,
"watching": neverComplete,
"updating": updatingCond,
},
expectEvaluated: true,
expectWatchConditions: map[string]versionedWatchCondition{
"watching": {version: 0},
"updating": {version: 1},
},
}, {
name: "non-existant pod",
podUID: "non-existant",
watchConditions: map[string]WatchCondition{
"watching": neverComplete,
},
expectEvaluated: false,
expectRemoved: true,
}, {
name: "terminated pod",
podUID: "terminated",
watchConditions: map[string]WatchCondition{
"watching": neverComplete,
},
expectEvaluated: false,
expectRemoved: true,
}, {
name: "reinspecting pod",
podUID: "reinspect",
watchConditions: map[string]WatchCondition{
"watching": neverComplete,
},
expectEvaluated: true,
expectWatchConditions: map[string]versionedWatchCondition{
"watching": {version: 0},
},
}}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
testPleg := newTestGenericPLEG()
pleg = testPleg.pleg
runtime := testPleg.runtime
runtime.AllPodList = initialPods
pleg.Relist() // Setup initial pod records.
runtime.AllPodList = pods // Doesn't have "terminated" pod.
pleg.podsToReinspect["reinspect"] = nil
var evaluatedConditions []string
for key, condition := range test.watchConditions {
wrappedCondition := func(status *kubecontainer.PodStatus) bool {
if !test.expectEvaluated {
assert.Fail(t, "conditions should not be evaluated")
} else {
evaluatedConditions = append(evaluatedConditions, key)
}
return condition(status)
}
pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition)
}
pleg.Relist()
if test.expectEvaluated {
assert.Len(t, evaluatedConditions, len(test.watchConditions), "all conditions should be evaluated")
}
if test.expectRemoved {
assert.NotContains(t, pleg.watchConditions, test.podUID, "Pod should be removed from watch conditions")
} else {
actualConditions := pleg.watchConditions[test.podUID]
assert.Len(t, actualConditions, len(test.expectWatchConditions), "expected number of conditions")
for key, expected := range test.expectWatchConditions {
if !assert.Contains(t, actualConditions, key) {
continue
}
actual := actualConditions[key]
assert.Equal(t, key, actual.key)
assert.Equal(t, expected.version, actual.version)
}
}
})
}
}

View File

@ -67,6 +67,10 @@ type PodLifecycleEventGenerator interface {
Watch() chan *PodLifecycleEvent
Healthy() (bool, error)
UpdateCache(*kubecontainer.Pod, types.UID) (error, bool)
// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
// condition is met. The condition is keyed so it can be updated before the condition
// is met.
SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition)
}
// podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs
@ -77,3 +81,19 @@ type podLifecycleEventGeneratorHandler interface {
Update(relistDuration *RelistDuration)
Relist()
}
// WatchCondition takes the latest PodStatus, and returns whether the condition is met.
type WatchCondition func(*kubecontainer.PodStatus) bool
// RunningContainerWatchCondition wraps a condition on the container status to make a pod
// WatchCondition. If the container is no longer running, the condition is implicitly cleared.
func RunningContainerWatchCondition(containerName string, condition func(*kubecontainer.Status) bool) WatchCondition {
return func(podStatus *kubecontainer.PodStatus) bool {
status := podStatus.FindContainerStatusByName(containerName)
if status == nil || status.State != kubecontainer.ContainerStateRunning {
// Container isn't running. Consider the condition "completed" so it is cleared.
return true
}
return condition(status)
}
}