Add support for Evented PLEG

Signed-off-by: Harshal Patil <harpatil@redhat.com>
Co-authored-by: Swarup Ghosh <swghosh@redhat.com>
This commit is contained in:
Harshal Patil 2022-09-14 16:09:03 +05:30 committed by Swarup Ghosh
parent 7369bd27e0
commit 86284d42f8
No known key found for this signature in database
GPG Key ID: D6F7B42875641C50
18 changed files with 1488 additions and 510 deletions

View File

@ -284,6 +284,14 @@ const (
// Allows running an ephemeral container in pod namespaces to troubleshoot a running pod.
EphemeralContainers featuregate.Feature = "EphemeralContainers"
// owner: @harche
// kep: http://kep.k8s.io/3386
// alpha: v1.25
//
// Allows using event-driven PLEG (pod lifecycle event generator) through kubelet
// which avoids frequent relisting of containers which helps optimize performance.
EventedPLEG featuregate.Feature = "EventedPLEG"
// owner: @andrewsykim @SergeyKanzhelev
// GA: v1.20
//
@ -914,6 +922,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
EphemeralContainers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27
EventedPLEG: {Default: false, PreRelease: featuregate.Alpha},
ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default and remove after v1.22 based on KEP #1972 update
ExpandCSIVolumes: {Default: true, PreRelease: featuregate.GA}, // remove in 1.26

View File

@ -21,6 +21,8 @@ import (
"time"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// Cache stores the PodStatus for the pods. It represents *all* the visible
@ -36,7 +38,10 @@ import (
// cache entries.
type Cache interface {
Get(types.UID) (*PodStatus, error)
Set(types.UID, *PodStatus, error, time.Time)
// Set updates the cache by setting the PodStatus for the pod only
// if the data is newer than the cache based on the provided
// time stamp. Returns if the cache was updated.
Set(types.UID, *PodStatus, error, time.Time) (updated bool)
// GetNewerThan is a blocking call that only returns the status
// when it is newer than the given time.
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
@ -93,12 +98,22 @@ func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error
return d.status, d.err
}
// Set sets the PodStatus for the pod.
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
// Set sets the PodStatus for the pod only if the data is newer than the cache
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) {
c.lock.Lock()
defer c.lock.Unlock()
defer c.notify(id, timestamp)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Set the value in the cache only if it's not present already
// or the timestamp in the cache is older than the current update timestamp
if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) {
return false
}
}
c.pods[id] = &data{status: status, err: err, modified: timestamp}
c.notify(id, timestamp)
return true
}
// Delete removes the entry of the pod.
@ -142,6 +157,29 @@ func (c *cache) get(id types.UID) *data {
// Otherwise, it returns nil. The caller should acquire the lock.
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
d, ok := c.pods[id]
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Evented PLEG has CREATED, STARTED, STOPPED and DELETED events
// However if the container creation fails for some reason there is no
// CRI event received by the kubelet and that pod will get stuck a
// GetNewerThan call in the pod workers. This is reproducible with
// the node e2e test,
// https://github.com/kubernetes/kubernetes/blob/83415e5c9e6e59a3d60a148160490560af2178a1/test/e2e_node/pod_hostnamefqdn_test.go#L161
// which forces failure during pod creation. This issue also exists in
// Generic PLEG but since it updates global timestamp periodically
// the GetNewerThan call gets unstuck.
// During node e2e tests, it was observed this change does not have any
// adverse impact on the behaviour of the Generic PLEG as well.
switch {
case !ok:
return makeDefaultData(id)
case ok && (d.modified.After(minTime) || (c.timestamp != nil && c.timestamp.After(minTime))):
return d
default:
return nil
}
}
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
if !ok && globalTimestampIsNewer {
// Status is not cached, but the global timestamp is newer than

View File

@ -122,6 +122,8 @@ type Runtime interface {
// CheckpointContainer tells the runtime to checkpoint a container
// and store the resulting archive to the checkpoint directory.
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
// Generate pod status from the CRI event
GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*PodStatus, error)
}
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
@ -305,6 +307,8 @@ type PodStatus struct {
// Status of the pod sandbox.
// Only for kuberuntime now, other runtime may keep it nil.
SandboxStatuses []*runtimeapi.PodSandboxStatus
// Timestamp at which container and pod statuses were recorded
TimeStamp time.Time
}
// Status represents the status of a container.

View File

@ -40,7 +40,8 @@ func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*container.Po
return c.Get(id)
}
func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) {
func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) (updated bool) {
return true
}
func (c *fakeCache) Delete(id types.UID) {

View File

@ -276,6 +276,15 @@ func (f *FakeRuntime) KillContainerInPod(container v1.Container, pod *v1.Pod) er
return f.Err
}
func (f *FakeRuntime) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GeneratePodStatus")
status := f.PodStatus
return &status, f.Err
}
func (f *FakeRuntime) GetPodStatus(_ context.Context, uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
f.Lock()
defer f.Unlock()

View File

@ -168,6 +168,21 @@ func (mr *MockRuntimeMockRecorder) GarbageCollect(ctx, gcPolicy, allSourcesReady
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GarbageCollect", reflect.TypeOf((*MockRuntime)(nil).GarbageCollect), ctx, gcPolicy, allSourcesReady, evictNonDeletedPods)
}
// GeneratePodStatus mocks base method.
func (m *MockRuntime) GeneratePodStatus(event *v10.ContainerEventResponse) (*container.PodStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GeneratePodStatus", event)
ret0, _ := ret[0].(*container.PodStatus)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GeneratePodStatus indicates an expected call of GeneratePodStatus.
func (mr *MockRuntimeMockRecorder) GeneratePodStatus(event interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GeneratePodStatus", reflect.TypeOf((*MockRuntime)(nil).GeneratePodStatus), event)
}
// GetContainerLogs mocks base method.
func (m *MockRuntime) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID container.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
m.ctrl.T.Helper()

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
@ -792,5 +793,25 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options
}
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
return nil
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
if err != nil {
klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
return err
}
for {
resp, err := containerEventsStreamingClient.Recv()
if err == io.EOF {
klog.ErrorS(err, "container events stream is closed")
return err
}
if err != nil {
klog.ErrorS(err, "failed to receive streaming container event")
return err
}
if resp != nil {
containerEventsCh <- resp
klog.V(4).InfoS("container event received", "resp", resp)
}
}
}

View File

@ -162,7 +162,13 @@ const (
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
genericPlegRelistPeriod = time.Second * 1
genericPlegRelistThreshold = time.Minute * 3
// Generic PLEG relist period and threshold when used with Evented PLEG.
eventedPlegRelistPeriod = time.Second * 300
eventedPlegRelistThreshold = time.Minute * 10
eventedPlegMaxStreamRetries = 5
// backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff
@ -699,9 +705,37 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: eventedPlegRelistPeriod,
RelistThreshold: eventedPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
// Evented PLEG should be able to reset the Generic PLEG relisting duration
// to the default value.
eventedRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.eventedPleg = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
} else {
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
}
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy)
}
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
}
@ -1062,6 +1096,9 @@ type Kubelet struct {
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
// Evented PLEG
eventedPleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
@ -1485,6 +1522,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start the pod lifecycle event generator.
kl.pleg.Start()
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
kl.syncLoop(ctx, updates, kl)
}

View File

@ -313,7 +313,7 @@ func newTestKubeletWithImageList(
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, kubelet.podCache, clock.RealClock{})
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
kubelet.clock = fakeClock
nodeRef := &v1.ObjectReference{

View File

@ -490,6 +490,29 @@ func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string)
return buf.String()
}
func (m *kubeGenericRuntimeManager) convertToKubeContainerStatus(status *runtimeapi.ContainerStatus) (cStatus *kubecontainer.Status) {
cStatus = toKubeContainerStatus(status, m.runtimeName)
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
// Populate the termination message if needed.
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
}
// Enrich the termination message written by the application is not empty
if len(tMessage) != 0 {
if len(cStatus.Message) != 0 {
cStatus.Message += ": "
}
cStatus.Message += tMessage
}
}
return cStatus
}
// getPodContainerStatuses gets all containers' statuses for the pod.
func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context, uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) {
// Select all containers of the given pod.
@ -521,25 +544,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context,
if status == nil {
return nil, remote.ErrContainerStatusNil
}
cStatus := toKubeContainerStatus(status, m.runtimeName)
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
// Populate the termination message if needed.
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
}
// Enrich the termination message written by the application is not empty
if len(tMessage) != 0 {
if len(cStatus.Message) != 0 {
cStatus.Message += ": "
}
cStatus.Message += tMessage
}
}
cStatus := m.convertToKubeContainerStatus(status)
statuses = append(statuses, cStatus)
}

View File

@ -34,6 +34,7 @@ import (
kubetypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/flowcontrol"
@ -43,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/credentialprovider/plugin"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
@ -967,6 +969,26 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(ctx context.Context, p
return
}
func (m *kubeGenericRuntimeManager) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
podIPs := m.determinePodSandboxIPs(event.PodSandboxStatus.Metadata.Namespace, event.PodSandboxStatus.Metadata.Name, event.PodSandboxStatus)
kubeContainerStatuses := []*kubecontainer.Status{}
for _, status := range event.ContainersStatuses {
kubeContainerStatuses = append(kubeContainerStatuses, m.convertToKubeContainerStatus(status))
}
sort.Sort(containerStatusByCreated(kubeContainerStatuses))
return &kubecontainer.PodStatus{
ID: kubetypes.UID(event.PodSandboxStatus.Metadata.Uid),
Name: event.PodSandboxStatus.Metadata.Name,
Namespace: event.PodSandboxStatus.Metadata.Namespace,
IPs: podIPs,
SandboxStatuses: []*runtimeapi.PodSandboxStatus{event.PodSandboxStatus},
ContainerStatuses: kubeContainerStatuses,
}, nil
}
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visible in Runtime.
func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
@ -1001,6 +1023,9 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
klog.V(4).InfoS("getSandboxIDByPodUID got sandbox IDs for pod", "podSandboxID", podSandboxIDs, "pod", klog.KObj(pod))
sandboxStatuses := []*runtimeapi.PodSandboxStatus{}
containerStatuses := []*kubecontainer.Status{}
var timestamp time.Time
podIPs := []string{}
for idx, podSandboxID := range podSandboxIDs {
resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
@ -1024,16 +1049,45 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
if idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {
podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status)
}
}
// Get statuses of all containers visible in the pod.
containerStatuses, err := m.getPodContainerStatuses(ctx, uid, name, namespace)
if idx == 0 && utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if resp.Timestamp == 0 {
// If the Evented PLEG is enabled in the kubelet, but not in the runtime
// then the pod status we get will not have the timestamp set.
// e.g. CI job 'pull-kubernetes-e2e-gce-alpha-features' will runs with
// features gate enabled, which includes Evented PLEG, but uses the
// runtime without Evented PLEG support.
klog.V(4).InfoS("Runtime does not set pod status timestamp", "pod", klog.KObj(pod))
containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
if err != nil {
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
}
return nil, err
}
} else {
// Get the statuses of all containers visible to the pod and
// timestamp from sandboxStatus.
timestamp = time.Unix(resp.Timestamp, 0)
for _, cs := range resp.ContainersStatuses {
cStatus := m.convertToKubeContainerStatus(cs)
containerStatuses = append(containerStatuses, cStatus)
}
}
}
}
if !utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Get statuses of all containers visible in the pod.
containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
if err != nil {
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
}
return nil, err
}
}
m.logReduction.ClearID(podFullName)
return &kubecontainer.PodStatus{
@ -1043,6 +1097,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
IPs: podIPs,
SandboxStatuses: sandboxStatuses,
ContainerStatuses: containerStatuses,
TimeStamp: timestamp,
}, nil
}

398
pkg/kubelet/pleg/evented.go Normal file
View File

@ -0,0 +1,398 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pleg
import (
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/utils/clock"
)
// The frequency with which global timestamp of the cache is to
// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
// call, after this period it will be unblocked.
const globalCacheUpdatePeriod = 5 * time.Second
var (
eventedPLEGUsage = false
eventedPLEGUsageMu = sync.RWMutex{}
)
// isEventedPLEGInUse indicates whether Evented PLEG is in use. Even after enabling
// the Evented PLEG feature gate, there could be several reasons it may not be in use.
// e.g. Streaming data issues from the runtime or the runtime does not implement the
// container events stream.
func isEventedPLEGInUse() bool {
eventedPLEGUsageMu.Lock()
defer eventedPLEGUsageMu.Unlock()
return eventedPLEGUsage
}
// setEventedPLEGUsage should only be accessed from
// Start/Stop of Evented PLEG.
func setEventedPLEGUsage(enable bool) {
eventedPLEGUsageMu.RLock()
defer eventedPLEGUsageMu.RUnlock()
eventedPLEGUsage = enable
}
type EventedPLEG struct {
// The container runtime.
runtime kubecontainer.Runtime
// The runtime service.
runtimeService internalapi.RuntimeService
// The channel from which the subscriber listens events.
eventChannel chan *PodLifecycleEvent
// Cache for storing the runtime states required for syncing pods.
cache kubecontainer.Cache
// For testability.
clock clock.Clock
// GenericPLEG is used to force relist when required.
genericPleg PodLifecycleEventGenerator
// The maximum number of retries when getting container events from the runtime.
eventedPlegMaxStreamRetries int
// Indicates relisting related parameters
relistDuration *RelistDuration
// Stop the Evented PLEG by closing the channel.
stopCh chan struct{}
// Stops the periodic update of the cache global timestamp.
stopCacheUpdateCh chan struct{}
// Locks the start/stop operation of the Evented PLEG.
runningMu sync.Mutex
}
// NewEventedPLEG instantiates a new EventedPLEG object and return it.
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
relistDuration *RelistDuration, clock clock.Clock) PodLifecycleEventGenerator {
return &EventedPLEG{
runtime: runtime,
runtimeService: runtimeService,
eventChannel: eventChannel,
cache: cache,
genericPleg: genericPleg,
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
relistDuration: relistDuration,
clock: clock,
}
}
// Watch returns a channel from which the subscriber can receive PodLifecycleEvent events.
func (e *EventedPLEG) Watch() chan *PodLifecycleEvent {
return e.eventChannel
}
// Relist relists all containers using GenericPLEG
func (e *EventedPLEG) Relist() {
e.genericPleg.Relist()
}
// Start starts the Evented PLEG
func (e *EventedPLEG) Start() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if isEventedPLEGInUse() {
return
}
setEventedPLEGUsage(true)
e.stopCh = make(chan struct{})
e.stopCacheUpdateCh = make(chan struct{})
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}
// Stop stops the Evented PLEG
func (e *EventedPLEG) Stop() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if !isEventedPLEGInUse() {
return
}
setEventedPLEGUsage(false)
close(e.stopCh)
close(e.stopCacheUpdateCh)
}
// In case the Evented PLEG experiences undetectable issues in the underlying
// GRPC connection there is a remote chance the pod might get stuck in a
// given state while it has progressed in its life cycle. This function will be
// called periodically to update the global timestamp of the cache so that those
// pods stuck at GetNewerThan in pod workers will get unstuck.
func (e *EventedPLEG) updateGlobalCache() {
e.cache.UpdateTime(time.Now())
}
// Update the relisting period and threshold
func (e *EventedPLEG) Update(relistDuration *RelistDuration) {
e.genericPleg.Update(relistDuration)
}
// Healthy check if PLEG work properly.
func (e *EventedPLEG) Healthy() (bool, error) {
// GenericPLEG is declared unhealthy when relisting time is more
// than the relistThreshold. In case EventedPLEG is turned on,
// relistingPeriod and relistingThreshold are adjusted to higher
// values. So the health check of Generic PLEG should check
// the adjusted values of relistingPeriod and relistingThreshold.
// EventedPLEG is declared unhealthy only if eventChannel is out of capacity.
if len(e.eventChannel) == cap(e.eventChannel) {
return false, fmt.Errorf("EventedPLEG: pleg event channel capacity is full with %v events", len(e.eventChannel))
}
timestamp := e.clock.Now()
metrics.PLEGLastSeen.Set(float64(timestamp.Unix()))
return true, nil
}
func (e *EventedPLEG) watchEventsChannel() {
containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, cap(e.eventChannel))
defer close(containerEventsResponseCh)
// Get the container events from the runtime.
go func() {
numAttempts := 0
for {
if numAttempts >= e.eventedPlegMaxStreamRetries {
if isEventedPLEGInUse() {
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working")
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
e.genericPleg.Start()
break
}
}
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh)
if err != nil {
numAttempts++
e.Relist() // Force a relist to get the latest container and pods running metric.
klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err)
}
}
}()
if isEventedPLEGInUse() {
e.processCRIEvents(containerEventsResponseCh)
}
}
func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeapi.ContainerEventResponse) {
for event := range containerEventsResponseCh {
podID := types.UID(event.PodSandboxStatus.Metadata.Uid)
shouldSendPLEGEvent := false
status, err := e.runtime.GeneratePodStatus(event)
if err != nil {
// nolint:logcheck // Not using the result of klog.V inside the
// if branch is okay, we just use it to determine whether the
// additional "podStatus" key and its value should be added.
if klog.V(6).Enabled() {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
}
} else {
if klogV := klog.V(6); klogV.Enabled() {
klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID)
}
// Preserve the pod IP across cache updates if the new IP is empty.
// When a pod is torn down, kubelet may race with PLEG and retrieve
// a pod status after network teardown, but the kubernetes API expects
// the completed pod's IP to be available after the pod is dead.
status.IPs = e.getPodIPs(podID, status)
}
e.updateRunningPodMetric(status)
e.updateRunningContainerMetric(status)
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
for _, sandbox := range status.SandboxStatuses {
if sandbox.Id == event.ContainerId {
// When the CONTAINER_DELETED_EVENT is received by the kubelet,
// the runtime has indicated that the container has been removed
// by the runtime and hence, it must be removed from the cache
// of kubelet too.
e.cache.Delete(podID)
}
}
shouldSendPLEGEvent = true
} else {
if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) {
shouldSendPLEGEvent = true
}
}
if shouldSendPLEGEvent {
e.processCRIEvent(event)
}
}
}
func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) {
switch event.ContainerEventType {
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
// We only need to update the pod status on container create.
// But we don't have to generate any PodLifeCycleEvent. Container creation related
// PodLifeCycleEvent is ignored by the existing Generic PLEG as well.
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
// In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of
// Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
}
}
func (e *EventedPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
if len(status.IPs) != 0 {
return status.IPs
}
oldStatus, err := e.cache.Get(pid)
if err != nil || len(oldStatus.IPs) == 0 {
return nil
}
for _, sandboxStatus := range status.SandboxStatuses {
// If at least one sandbox is ready, then use this status update's pod IP
if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
return status.IPs
}
}
// For pods with no ready containers or sandboxes (like exited pods)
// use the old status' pod IP
return oldStatus.IPs
}
func (e *EventedPLEG) sendPodLifecycleEvent(event *PodLifecycleEvent) {
select {
case e.eventChannel <- event:
default:
// record how many events were discarded due to channel out of capacity
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
}
}
func getPodSandboxState(podStatus *kubecontainer.PodStatus) kubecontainer.State {
// increase running pod count when cache doesn't contain podID
var sandboxId string
for _, sandbox := range podStatus.SandboxStatuses {
sandboxId = sandbox.Id
// pod must contain only one sandbox
break
}
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.ID.ID == sandboxId {
if containerStatus.State == kubecontainer.ContainerStateRunning {
return containerStatus.State
}
}
}
return kubecontainer.ContainerStateExited
}
func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) {
cachedPodStatus, err := e.cache.Get(podStatus.ID)
if err != nil {
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
}
// cache miss condition: The pod status object will have empty state if missed in cache
if len(cachedPodStatus.SandboxStatuses) < 1 {
sandboxState := getPodSandboxState(podStatus)
if sandboxState == kubecontainer.ContainerStateRunning {
metrics.RunningPodCount.Inc()
}
} else {
oldSandboxState := getPodSandboxState(cachedPodStatus)
currentSandboxState := getPodSandboxState(podStatus)
if oldSandboxState == kubecontainer.ContainerStateRunning && currentSandboxState != kubecontainer.ContainerStateRunning {
metrics.RunningPodCount.Dec()
} else if oldSandboxState != kubecontainer.ContainerStateRunning && currentSandboxState == kubecontainer.ContainerStateRunning {
metrics.RunningPodCount.Inc()
}
}
}
func getContainerStateCount(podStatus *kubecontainer.PodStatus) map[kubecontainer.State]int {
containerStateCount := make(map[kubecontainer.State]int)
for _, container := range podStatus.ContainerStatuses {
containerStateCount[container.State]++
}
return containerStateCount
}
func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) {
cachedPodStatus, err := e.cache.Get(podStatus.ID)
if err != nil {
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
}
// cache miss condition: The pod status object will have empty state if missed in cache
if len(cachedPodStatus.SandboxStatuses) < 1 {
containerStateCount := getContainerStateCount(podStatus)
for state, count := range containerStateCount {
// add currently obtained count
metrics.RunningContainerCount.WithLabelValues(string(state)).Add(float64(count))
}
} else {
oldContainerStateCount := getContainerStateCount(cachedPodStatus)
currentContainerStateCount := getContainerStateCount(podStatus)
// old and new set of container states may vary;
// get a unique set of container states combining both
containerStates := make(map[kubecontainer.State]bool)
for state := range oldContainerStateCount {
containerStates[state] = true
}
for state := range currentContainerStateCount {
containerStates[state] = true
}
// update the metric via difference of old and current counts
for state := range containerStates {
diff := currentContainerStateCount[state] - oldContainerStateCount[state]
metrics.RunningContainerCount.WithLabelValues(string(state)).Add(float64(diff))
}
}
}

View File

@ -0,0 +1,133 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pleg
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
v1 "k8s.io/cri-api/pkg/apis/runtime/v1"
critest "k8s.io/cri-api/pkg/apis/testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/metrics"
testingclock "k8s.io/utils/clock/testing"
)
func newTestEventedPLEG() *EventedPLEG {
return &EventedPLEG{
runtime: &containertest.FakeRuntime{},
clock: testingclock.NewFakeClock(time.Time{}),
cache: kubecontainer.NewCache(),
runtimeService: critest.NewFakeRuntimeService(),
eventChannel: make(chan *PodLifecycleEvent, 100),
}
}
func TestHealthyEventedPLEG(t *testing.T) {
metrics.Register()
pleg := newTestEventedPLEG()
_, _, events := createTestPodsStatusesAndEvents(100)
for _, event := range events[:5] {
pleg.eventChannel <- event
}
// test if healthy when event channel has 5 events
isHealthy, err := pleg.Healthy()
require.NoError(t, err)
assert.Equal(t, true, isHealthy)
// send remaining 95 events and make channel out of capacity
for _, event := range events[5:] {
pleg.eventChannel <- event
}
// pleg is unhealthy when channel is out of capacity
isHealthy, err = pleg.Healthy()
require.Error(t, err)
assert.Equal(t, false, isHealthy)
}
func TestUpdateRunningPodMetric(t *testing.T) {
metrics.Register()
pleg := newTestEventedPLEG()
podStatuses := make([]*kubecontainer.PodStatus, 5)
for i := range podStatuses {
id := fmt.Sprintf("test-pod-%d", i)
podStatuses[i] = &kubecontainer.PodStatus{
ID: types.UID(id),
SandboxStatuses: []*v1.PodSandboxStatus{
{Id: id},
},
ContainerStatuses: []*kubecontainer.Status{
{ID: kubecontainer.ContainerID{ID: id}, State: kubecontainer.ContainerStateRunning},
},
}
pleg.updateRunningPodMetric(podStatuses[i])
pleg.cache.Set(podStatuses[i].ID, podStatuses[i], nil, time.Now())
}
pleg.cache.UpdateTime(time.Now())
expectedMetric := `
# HELP kubelet_running_pods [ALPHA] Number of pods that have a running pod sandbox
# TYPE kubelet_running_pods gauge
kubelet_running_pods 5
`
testMetric(t, expectedMetric, metrics.RunningPodCount.FQName())
// stop sandbox containers for first 2 pods
for _, podStatus := range podStatuses[:2] {
podId := string(podStatus.ID)
newPodStatus := kubecontainer.PodStatus{
ID: podStatus.ID,
SandboxStatuses: []*v1.PodSandboxStatus{
{Id: podId},
},
ContainerStatuses: []*kubecontainer.Status{
// update state to container exited
{ID: kubecontainer.ContainerID{ID: podId}, State: kubecontainer.ContainerStateExited},
},
}
pleg.updateRunningPodMetric(&newPodStatus)
pleg.cache.Set(newPodStatus.ID, &newPodStatus, nil, time.Now())
}
pleg.cache.UpdateTime(time.Now())
expectedMetric = `
# HELP kubelet_running_pods [ALPHA] Number of pods that have a running pod sandbox
# TYPE kubelet_running_pods gauge
kubelet_running_pods 3
`
testMetric(t, expectedMetric, metrics.RunningPodCount.FQName())
}
func testMetric(t *testing.T, expectedMetric string, metricName string) {
err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(expectedMetric), metricName)
if err != nil {
t.Fatal(err)
}
}

View File

@ -19,14 +19,17 @@ package pleg
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/utils/clock"
@ -48,8 +51,6 @@ import (
// recommended to set the relist period short and have an auxiliary, longer
// periodic sync in kubelet as the safety net.
type GenericPLEG struct {
// The period for relisting.
relistPeriod time.Duration
// The container runtime.
runtime kubecontainer.Runtime
// The channel from which the subscriber listens events.
@ -65,6 +66,16 @@ type GenericPLEG struct {
// Pods that failed to have their status retrieved during a relist. These pods will be
// retried during the next relisting.
podsToReinspect map[types.UID]*kubecontainer.Pod
// Stop the Generic PLEG by closing the channel.
stopCh chan struct{}
// Locks the relisting of the Generic PLEG
relistLock sync.Mutex
// Indicates if the Generic PLEG is running or not
isRunning bool
// Locks the start/stop operation of Generic PLEG
runningMu sync.Mutex
// Indicates relisting related parameters
relistDuration *RelistDuration
}
// plegContainerState has a one-to-one mapping to the
@ -77,11 +88,6 @@ const (
plegContainerExited plegContainerState = "exited"
plegContainerUnknown plegContainerState = "unknown"
plegContainerNonExistent plegContainerState = "non-existent"
// The threshold needs to be greater than the relisting period + the
// relisting time, which can vary significantly. Set a conservative
// threshold to avoid flipping between healthy and unhealthy.
relistThreshold = 3 * time.Minute
)
func convertState(state kubecontainer.State) plegContainerState {
@ -108,12 +114,13 @@ type podRecord struct {
type podRecords map[types.UID]*podRecord
// NewGenericPLEG instantiates a new GenericPLEG object and return it.
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
relistPeriod: relistPeriod,
relistDuration: relistDuration,
runtime: runtime,
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
@ -129,7 +136,26 @@ func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
g.runningMu.Lock()
defer g.runningMu.Unlock()
if !g.isRunning {
g.isRunning = true
g.stopCh = make(chan struct{})
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}
func (g *GenericPLEG) Stop() {
g.runningMu.Lock()
defer g.runningMu.Unlock()
if g.isRunning {
close(g.stopCh)
g.isRunning = false
}
}
func (g *GenericPLEG) Update(relistDuration *RelistDuration) {
g.relistDuration = relistDuration
}
// Healthy check if PLEG work properly.
@ -142,8 +168,8 @@ func (g *GenericPLEG) Healthy() (bool, error) {
// Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
elapsed := g.clock.Since(relistTime)
if elapsed > relistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
if elapsed > g.relistDuration.RelistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
}
return true, nil
}
@ -186,9 +212,12 @@ func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
g.relistTime.Store(timestamp)
}
// relist queries the container runtime for list of pods/containers, compare
// Relist queries the container runtime for list of pods/containers, compare
// with the internal pods/containers, and generates events accordingly.
func (g *GenericPLEG) relist() {
func (g *GenericPLEG) Relist() {
g.relistLock.Lock()
defer g.relistLock.Unlock()
ctx := context.Background()
klog.V(5).InfoS("GenericPLEG: Relisting")
@ -249,7 +278,7 @@ func (g *GenericPLEG) relist() {
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err := g.updateCache(ctx, pod, pid); err != nil {
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
@ -262,6 +291,11 @@ func (g *GenericPLEG) relist() {
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
}
}
}
// Update the internal storage and send out the events.
@ -307,7 +341,7 @@ func (g *GenericPLEG) relist() {
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(ctx, pod, pid); err != nil {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
@ -390,18 +424,20 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
return oldStatus.IPs
}
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) error {
// updateCache tries to update the pod status in the kubelet cache and returns true if the
// pod status was actually updated in the cache. It will return false if the pod status
// was ignored by the cache.
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
if pod == nil {
// The pod is missing in the current relist. This means that
// the pod has no visible (active or inactive) containers.
klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid))
g.cache.Delete(pid)
return nil
return nil, true
}
timestamp := g.clock.Now()
// TODO: Consider adding a new runtime method
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
// all containers again.
status, err := g.runtime.GetPodStatus(ctx, pod.ID, pod.Name, pod.Namespace)
if err != nil {
// nolint:logcheck // Not using the result of klog.V inside the
@ -425,8 +461,21 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
status.IPs = g.getPodIPs(pid, status)
}
g.cache.Set(pod.ID, status, err, timestamp)
return err
// When we use Generic PLEG only, the PodStatus is saved in the cache without
// any validation of the existing status against the current timestamp.
// This works well when there is only Generic PLEG setting the PodStatus in the cache however,
// if we have multiple entities, such as Evented PLEG, while trying to set the PodStatus in the
// cache we may run into the racy timestamps given each of them were to calculate the timestamps
// in their respective execution flow. While Generic PLEG calculates this timestamp and gets
// the PodStatus, we can only calculate the corresponding timestamp in
// Evented PLEG after the event has been received by the Kubelet.
// For more details refer to:
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/3386-kubelet-evented-pleg#timestamp-of-the-pod-status
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && isEventedPLEGInUse() {
timestamp = status.TimeStamp
}
return err, g.cache.Set(pod.ID, status, err, timestamp)
}
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {

View File

@ -61,7 +61,7 @@ func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG {
// The channel capacity should be large enough to hold all events in a
// single test.
pleg := &GenericPLEG{
relistPeriod: time.Hour,
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute},
runtime: fakeRuntime,
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
podRecords: make(podRecords),
@ -126,7 +126,7 @@ func TestRelisting(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
// Report every running/exited container if we see them for the first time.
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerStarted, Data: "c2"},
@ -138,7 +138,7 @@ func TestRelisting(t *testing.T) {
// The second relist should not send out any event because no container has
// changed.
pleg.relist()
pleg.Relist()
actual = getEventsFromChannel(ch)
assert.True(t, len(actual) == 0, "no container has changed, event length should be 0")
@ -157,7 +157,7 @@ func TestRelisting(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
// Only report containers that transitioned to running or exited status.
expected = []*PodLifecycleEvent{
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
@ -193,7 +193,7 @@ func TestEventChannelFull(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
// Report every running/exited container if we see them for the first time.
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerStarted, Data: "c2"},
@ -218,7 +218,7 @@ func TestEventChannelFull(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
allEvents := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
{ID: "1234", Type: ContainerDied, Data: "c2"},
@ -258,7 +258,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
pleg.relist()
pleg.Relist()
getEventsFromChannel(ch)
}
@ -273,7 +273,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
},
}},
}
pleg.relist()
pleg.Relist()
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerDied, Data: "c2"},
{ID: "1234", Type: ContainerRemoved, Data: "c2"},
@ -297,14 +297,14 @@ func testReportMissingPods(t *testing.T, numRelists int) {
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
pleg.relist()
pleg.Relist()
getEventsFromChannel(ch)
}
// Container c2 was stopped and removed between relists. We should report
// the event.
runtime.AllPodList = []*containertest.FakePod{}
pleg.relist()
pleg.Relist()
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerDied, Data: "c2"},
{ID: "1234", Type: ContainerRemoved, Data: "c2"},
@ -315,9 +315,9 @@ func testReportMissingPods(t *testing.T, numRelists int) {
func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG {
pleg := &GenericPLEG{
relistPeriod: time.Hour,
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour},
runtime: runtimeMock,
eventChannel: make(chan *PodLifecycleEvent, 100),
eventChannel: make(chan *PodLifecycleEvent, 1000),
podRecords: make(podRecords),
cache: kubecontainer.NewCache(),
clock: clock.RealClock{},
@ -366,7 +366,7 @@ func TestRelistWithCache(t *testing.T) {
statusErr := fmt.Errorf("unable to get status")
runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Times(1)
pleg.relist()
pleg.Relist()
actualEvents := getEventsFromChannel(ch)
cases := []struct {
pod *kubecontainer.Pod
@ -387,7 +387,7 @@ func TestRelistWithCache(t *testing.T) {
// Return normal status for pods[1].
runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(statuses[1], nil).Times(1)
pleg.relist()
pleg.Relist()
actualEvents = getEventsFromChannel(ch)
cases = []struct {
pod *kubecontainer.Pod
@ -418,11 +418,11 @@ func TestRemoveCacheEntry(t *testing.T) {
runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
runtimeMock.EXPECT().GetPodStatus(ctx, pods[0].ID, "", "").Return(statuses[0], nil).Times(1)
// Does a relist to populate the cache.
pleg.relist()
pleg.Relist()
// Delete the pod from runtime. Verify that the cache entry has been
// removed after relisting.
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{}, nil).Times(1)
pleg.relist()
pleg.Relist()
actualStatus, actualErr := pleg.cache.Get(pods[0].ID)
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
assert.Equal(t, nil, actualErr)
@ -443,14 +443,14 @@ func TestHealthy(t *testing.T) {
// Relist and than advance the time by 1 minute. pleg should be healthy
// because this is within the allowed limit.
pleg.relist()
pleg.Relist()
clock.Step(time.Minute * 1)
ok, _ = pleg.Healthy()
assert.True(t, ok, "pleg should be healthy")
// Advance by relistThreshold without any relisting. pleg should be unhealthy
// because it has been longer than relistThreshold since a relist occurred.
clock.Step(relistThreshold)
clock.Step(pleg.relistDuration.RelistThreshold)
ok, _ = pleg.Healthy()
assert.False(t, ok, "pleg should be unhealthy")
}
@ -482,7 +482,7 @@ func TestRelistWithReinspection(t *testing.T) {
goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID}
// listing 1 - everything ok, infra container set up for pod
pleg.relist()
pleg.Relist()
actualEvents := getEventsFromChannel(ch)
actualStatus, actualErr := pleg.cache.Get(podID)
assert.Equal(t, goodStatus, actualStatus)
@ -504,7 +504,7 @@ func TestRelistWithReinspection(t *testing.T) {
}
runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(badStatus, errors.New("inspection error")).Times(1)
pleg.relist()
pleg.Relist()
actualEvents = getEventsFromChannel(ch)
actualStatus, actualErr = pleg.cache.Get(podID)
assert.Equal(t, badStatus, actualStatus)
@ -516,7 +516,7 @@ func TestRelistWithReinspection(t *testing.T) {
runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(goodStatus, nil).Times(1)
pleg.relist()
pleg.Relist()
actualEvents = getEventsFromChannel(ch)
actualStatus, actualErr = pleg.cache.Get(podID)
assert.Equal(t, goodStatus, actualStatus)
@ -550,7 +550,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
// Report every running/exited container if we see them for the first time.
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerStarted, Data: "c2"},
@ -562,7 +562,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
// The second relist should not send out any event because no container has
// changed.
pleg.relist()
pleg.Relist()
verifyEvents(t, expected, actual)
runtime.AllPodList = []*containertest.FakePod{
@ -580,7 +580,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
},
}},
}
pleg.relist()
pleg.Relist()
// Only report containers that transitioned to running or exited status.
expected = []*PodLifecycleEvent{
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
@ -639,7 +639,7 @@ func TestRelistIPChange(t *testing.T) {
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
pleg.relist()
pleg.Relist()
actualEvents := getEventsFromChannel(ch)
actualStatus, actualErr := pleg.cache.Get(pod.ID)
assert.Equal(t, status, actualStatus, tc.name)
@ -660,7 +660,7 @@ func TestRelistIPChange(t *testing.T) {
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
pleg.relist()
pleg.Relist()
actualEvents = getEventsFromChannel(ch)
actualStatus, actualErr = pleg.cache.Get(pod.ID)
// Must copy status to compare since its pointer gets passed through all
@ -704,7 +704,7 @@ func TestRunningPodAndContainerCount(t *testing.T) {
}},
}
pleg.relist()
pleg.Relist()
tests := []struct {
name string

View File

@ -17,12 +17,23 @@ limitations under the License.
package pleg
import (
"time"
"k8s.io/apimachinery/pkg/types"
)
// PodLifeCycleEventType define the event type of pod life cycle events.
type PodLifeCycleEventType string
type RelistDuration struct {
// The period for relisting.
RelistPeriod time.Duration
// The relisting threshold needs to be greater than the relisting period +
// the relisting time, which can vary significantly. Set a conservative
// threshold to avoid flipping between healthy and unhealthy.
RelistThreshold time.Duration
}
const (
// ContainerStarted - event type when the new state of container is running.
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
@ -52,6 +63,9 @@ type PodLifecycleEvent struct {
// PodLifecycleEventGenerator contains functions for generating pod life cycle events.
type PodLifecycleEventGenerator interface {
Start()
Stop()
Update(relistDuration *RelistDuration)
Watch() chan *PodLifecycleEvent
Healthy() (bool, error)
Relist()
}

File diff suppressed because it is too large Load Diff

View File

@ -121,7 +121,6 @@ service RuntimeService {
// GetContainerEvents gets container events from the CRI runtime
rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}
}
// ImageService defines the public APIs for managing images.
@ -540,6 +539,10 @@ message PodSandboxStatusResponse {
// debug, e.g. network namespace for linux container based container runtime.
// It should only be returned non-empty when Verbose is true.
map<string, string> info = 2;
// Container statuses
repeated ContainerStatus containers_statuses = 3;
// Timestamp at which container and pod statuses were recorded
int64 timestamp = 4;
}
// PodSandboxStateValue is the wrapper of PodSandboxState.
@ -1685,8 +1688,11 @@ message ContainerEventResponse {
// Creation timestamp of this event
int64 created_at = 3;
// ID of the sandbox container
PodSandboxMetadata pod_sandbox_metadata = 4;
// Sandbox status
PodSandboxStatus pod_sandbox_status = 4;
// Container statuses
repeated ContainerStatus containers_statuses = 5;
}
enum ContainerEventType {