mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #111384 from harche/evented_pleg_pr
Add Support for Evented PLEG
This commit is contained in:
commit
114594e1d2
@ -284,6 +284,14 @@ const (
|
||||
// Allows running an ephemeral container in pod namespaces to troubleshoot a running pod.
|
||||
EphemeralContainers featuregate.Feature = "EphemeralContainers"
|
||||
|
||||
// owner: @harche
|
||||
// kep: http://kep.k8s.io/3386
|
||||
// alpha: v1.25
|
||||
//
|
||||
// Allows using event-driven PLEG (pod lifecycle event generator) through kubelet
|
||||
// which avoids frequent relisting of containers which helps optimize performance.
|
||||
EventedPLEG featuregate.Feature = "EventedPLEG"
|
||||
|
||||
// owner: @andrewsykim @SergeyKanzhelev
|
||||
// GA: v1.20
|
||||
//
|
||||
@ -943,6 +951,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
EphemeralContainers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27
|
||||
|
||||
EventedPLEG: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default and remove after v1.22 based on KEP #1972 update
|
||||
|
||||
ExpandCSIVolumes: {Default: true, PreRelease: featuregate.GA}, // remove in 1.26
|
||||
|
@ -21,6 +21,8 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// Cache stores the PodStatus for the pods. It represents *all* the visible
|
||||
@ -36,7 +38,10 @@ import (
|
||||
// cache entries.
|
||||
type Cache interface {
|
||||
Get(types.UID) (*PodStatus, error)
|
||||
Set(types.UID, *PodStatus, error, time.Time)
|
||||
// Set updates the cache by setting the PodStatus for the pod only
|
||||
// if the data is newer than the cache based on the provided
|
||||
// time stamp. Returns if the cache was updated.
|
||||
Set(types.UID, *PodStatus, error, time.Time) (updated bool)
|
||||
// GetNewerThan is a blocking call that only returns the status
|
||||
// when it is newer than the given time.
|
||||
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
|
||||
@ -93,12 +98,22 @@ func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error
|
||||
return d.status, d.err
|
||||
}
|
||||
|
||||
// Set sets the PodStatus for the pod.
|
||||
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
|
||||
// Set sets the PodStatus for the pod only if the data is newer than the cache
|
||||
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
defer c.notify(id, timestamp)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
// Set the value in the cache only if it's not present already
|
||||
// or the timestamp in the cache is older than the current update timestamp
|
||||
if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
c.pods[id] = &data{status: status, err: err, modified: timestamp}
|
||||
c.notify(id, timestamp)
|
||||
return true
|
||||
}
|
||||
|
||||
// Delete removes the entry of the pod.
|
||||
@ -142,6 +157,29 @@ func (c *cache) get(id types.UID) *data {
|
||||
// Otherwise, it returns nil. The caller should acquire the lock.
|
||||
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
|
||||
d, ok := c.pods[id]
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
// Evented PLEG has CREATED, STARTED, STOPPED and DELETED events
|
||||
// However if the container creation fails for some reason there is no
|
||||
// CRI event received by the kubelet and that pod will get stuck a
|
||||
// GetNewerThan call in the pod workers. This is reproducible with
|
||||
// the node e2e test,
|
||||
// https://github.com/kubernetes/kubernetes/blob/83415e5c9e6e59a3d60a148160490560af2178a1/test/e2e_node/pod_hostnamefqdn_test.go#L161
|
||||
// which forces failure during pod creation. This issue also exists in
|
||||
// Generic PLEG but since it updates global timestamp periodically
|
||||
// the GetNewerThan call gets unstuck.
|
||||
|
||||
// During node e2e tests, it was observed this change does not have any
|
||||
// adverse impact on the behaviour of the Generic PLEG as well.
|
||||
switch {
|
||||
case !ok:
|
||||
return makeDefaultData(id)
|
||||
case ok && (d.modified.After(minTime) || (c.timestamp != nil && c.timestamp.After(minTime))):
|
||||
return d
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
|
||||
if !ok && globalTimestampIsNewer {
|
||||
// Status is not cached, but the global timestamp is newer than
|
||||
|
@ -122,6 +122,8 @@ type Runtime interface {
|
||||
// CheckpointContainer tells the runtime to checkpoint a container
|
||||
// and store the resulting archive to the checkpoint directory.
|
||||
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
|
||||
// Generate pod status from the CRI event
|
||||
GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*PodStatus, error)
|
||||
}
|
||||
|
||||
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
|
||||
@ -305,6 +307,8 @@ type PodStatus struct {
|
||||
// Status of the pod sandbox.
|
||||
// Only for kuberuntime now, other runtime may keep it nil.
|
||||
SandboxStatuses []*runtimeapi.PodSandboxStatus
|
||||
// Timestamp at which container and pod statuses were recorded
|
||||
TimeStamp time.Time
|
||||
}
|
||||
|
||||
// Status represents the status of a container.
|
||||
|
@ -40,7 +40,8 @@ func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*container.Po
|
||||
return c.Get(id)
|
||||
}
|
||||
|
||||
func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) {
|
||||
func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) (updated bool) {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *fakeCache) Delete(id types.UID) {
|
||||
|
@ -276,6 +276,15 @@ func (f *FakeRuntime) KillContainerInPod(container v1.Container, pod *v1.Pod) er
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
f.CalledFunctions = append(f.CalledFunctions, "GeneratePodStatus")
|
||||
status := f.PodStatus
|
||||
return &status, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) GetPodStatus(_ context.Context, uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
@ -168,6 +168,21 @@ func (mr *MockRuntimeMockRecorder) GarbageCollect(ctx, gcPolicy, allSourcesReady
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GarbageCollect", reflect.TypeOf((*MockRuntime)(nil).GarbageCollect), ctx, gcPolicy, allSourcesReady, evictNonDeletedPods)
|
||||
}
|
||||
|
||||
// GeneratePodStatus mocks base method.
|
||||
func (m *MockRuntime) GeneratePodStatus(event *v10.ContainerEventResponse) (*container.PodStatus, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GeneratePodStatus", event)
|
||||
ret0, _ := ret[0].(*container.PodStatus)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GeneratePodStatus indicates an expected call of GeneratePodStatus.
|
||||
func (mr *MockRuntimeMockRecorder) GeneratePodStatus(event interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GeneratePodStatus", reflect.TypeOf((*MockRuntime)(nil).GeneratePodStatus), event)
|
||||
}
|
||||
|
||||
// GetContainerLogs mocks base method.
|
||||
func (m *MockRuntime) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID container.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -792,5 +793,25 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options
|
||||
}
|
||||
|
||||
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
|
||||
return nil
|
||||
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := containerEventsStreamingClient.Recv()
|
||||
if err == io.EOF {
|
||||
klog.ErrorS(err, "container events stream is closed")
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to receive streaming container event")
|
||||
return err
|
||||
}
|
||||
if resp != nil {
|
||||
containerEventsCh <- resp
|
||||
klog.V(4).InfoS("container event received", "resp", resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -162,7 +162,13 @@ const (
|
||||
// Note that even though we set the period to 1s, the relisting itself can
|
||||
// take more than 1s to finish if the container runtime responds slowly
|
||||
// and/or when there are many container changes in one cycle.
|
||||
plegRelistPeriod = time.Second * 1
|
||||
genericPlegRelistPeriod = time.Second * 1
|
||||
genericPlegRelistThreshold = time.Minute * 3
|
||||
|
||||
// Generic PLEG relist period and threshold when used with Evented PLEG.
|
||||
eventedPlegRelistPeriod = time.Second * 300
|
||||
eventedPlegRelistThreshold = time.Minute * 10
|
||||
eventedPlegMaxStreamRetries = 5
|
||||
|
||||
// backOffPeriod is the period to back off when pod syncing results in an
|
||||
// error. It is also used as the base period for the exponential backoff
|
||||
@ -699,9 +705,37 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
|
||||
}
|
||||
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
|
||||
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on
|
||||
genericRelistDuration := &pleg.RelistDuration{
|
||||
RelistPeriod: eventedPlegRelistPeriod,
|
||||
RelistThreshold: eventedPlegRelistThreshold,
|
||||
}
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
||||
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
|
||||
// Evented PLEG should be able to reset the Generic PLEG relisting duration
|
||||
// to the default value.
|
||||
eventedRelistDuration := &pleg.RelistDuration{
|
||||
RelistPeriod: genericPlegRelistPeriod,
|
||||
RelistThreshold: genericPlegRelistThreshold,
|
||||
}
|
||||
klet.eventedPleg = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
|
||||
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
|
||||
} else {
|
||||
genericRelistDuration := &pleg.RelistDuration{
|
||||
RelistPeriod: genericPlegRelistPeriod,
|
||||
RelistThreshold: genericPlegRelistThreshold,
|
||||
}
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
||||
}
|
||||
|
||||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
|
||||
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy)
|
||||
}
|
||||
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil {
|
||||
klog.ErrorS(err, "Pod CIDR update failed")
|
||||
}
|
||||
@ -1062,6 +1096,9 @@ type Kubelet struct {
|
||||
// Generates pod events.
|
||||
pleg pleg.PodLifecycleEventGenerator
|
||||
|
||||
// Evented PLEG
|
||||
eventedPleg pleg.PodLifecycleEventGenerator
|
||||
|
||||
// Store kubecontainer.PodStatus for all pods.
|
||||
podCache kubecontainer.Cache
|
||||
|
||||
@ -1485,6 +1522,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
|
||||
// Start the pod lifecycle event generator.
|
||||
kl.pleg.Start()
|
||||
|
||||
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
kl.eventedPleg.Start()
|
||||
}
|
||||
|
||||
kl.syncLoop(ctx, updates, kl)
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ func newTestKubeletWithImageList(
|
||||
kubelet.resyncInterval = 10 * time.Second
|
||||
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
|
||||
// Relist period does not affect the tests.
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, kubelet.podCache, clock.RealClock{})
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
|
||||
kubelet.clock = fakeClock
|
||||
|
||||
nodeRef := &v1.ObjectReference{
|
||||
|
@ -490,6 +490,29 @@ func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string)
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func (m *kubeGenericRuntimeManager) convertToKubeContainerStatus(status *runtimeapi.ContainerStatus) (cStatus *kubecontainer.Status) {
|
||||
cStatus = toKubeContainerStatus(status, m.runtimeName)
|
||||
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
|
||||
// Populate the termination message if needed.
|
||||
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
|
||||
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
|
||||
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
|
||||
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
|
||||
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
|
||||
if checkLogs {
|
||||
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
|
||||
}
|
||||
// Enrich the termination message written by the application is not empty
|
||||
if len(tMessage) != 0 {
|
||||
if len(cStatus.Message) != 0 {
|
||||
cStatus.Message += ": "
|
||||
}
|
||||
cStatus.Message += tMessage
|
||||
}
|
||||
}
|
||||
return cStatus
|
||||
}
|
||||
|
||||
// getPodContainerStatuses gets all containers' statuses for the pod.
|
||||
func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context, uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) {
|
||||
// Select all containers of the given pod.
|
||||
@ -521,25 +544,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context,
|
||||
if status == nil {
|
||||
return nil, remote.ErrContainerStatusNil
|
||||
}
|
||||
cStatus := toKubeContainerStatus(status, m.runtimeName)
|
||||
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
|
||||
// Populate the termination message if needed.
|
||||
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
|
||||
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
|
||||
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
|
||||
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
|
||||
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
|
||||
if checkLogs {
|
||||
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
|
||||
}
|
||||
// Enrich the termination message written by the application is not empty
|
||||
if len(tMessage) != 0 {
|
||||
if len(cStatus.Message) != 0 {
|
||||
cStatus.Message += ": "
|
||||
}
|
||||
cStatus.Message += tMessage
|
||||
}
|
||||
}
|
||||
cStatus := m.convertToKubeContainerStatus(status)
|
||||
statuses = append(statuses, cStatus)
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilversion "k8s.io/apimachinery/pkg/util/version"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
@ -43,6 +44,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider/plugin"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
@ -967,6 +969,26 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(ctx context.Context, p
|
||||
return
|
||||
}
|
||||
|
||||
func (m *kubeGenericRuntimeManager) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
|
||||
podIPs := m.determinePodSandboxIPs(event.PodSandboxStatus.Metadata.Namespace, event.PodSandboxStatus.Metadata.Name, event.PodSandboxStatus)
|
||||
|
||||
kubeContainerStatuses := []*kubecontainer.Status{}
|
||||
for _, status := range event.ContainersStatuses {
|
||||
kubeContainerStatuses = append(kubeContainerStatuses, m.convertToKubeContainerStatus(status))
|
||||
}
|
||||
|
||||
sort.Sort(containerStatusByCreated(kubeContainerStatuses))
|
||||
|
||||
return &kubecontainer.PodStatus{
|
||||
ID: kubetypes.UID(event.PodSandboxStatus.Metadata.Uid),
|
||||
Name: event.PodSandboxStatus.Metadata.Name,
|
||||
Namespace: event.PodSandboxStatus.Metadata.Namespace,
|
||||
IPs: podIPs,
|
||||
SandboxStatuses: []*runtimeapi.PodSandboxStatus{event.PodSandboxStatus},
|
||||
ContainerStatuses: kubeContainerStatuses,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetPodStatus retrieves the status of the pod, including the
|
||||
// information of all containers in the pod that are visible in Runtime.
|
||||
func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
|
||||
@ -1001,6 +1023,9 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
|
||||
klog.V(4).InfoS("getSandboxIDByPodUID got sandbox IDs for pod", "podSandboxID", podSandboxIDs, "pod", klog.KObj(pod))
|
||||
|
||||
sandboxStatuses := []*runtimeapi.PodSandboxStatus{}
|
||||
containerStatuses := []*kubecontainer.Status{}
|
||||
var timestamp time.Time
|
||||
|
||||
podIPs := []string{}
|
||||
for idx, podSandboxID := range podSandboxIDs {
|
||||
resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
|
||||
@ -1024,16 +1049,45 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
|
||||
if idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
||||
podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status)
|
||||
}
|
||||
|
||||
if idx == 0 && utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
if resp.Timestamp == 0 {
|
||||
// If the Evented PLEG is enabled in the kubelet, but not in the runtime
|
||||
// then the pod status we get will not have the timestamp set.
|
||||
// e.g. CI job 'pull-kubernetes-e2e-gce-alpha-features' will runs with
|
||||
// features gate enabled, which includes Evented PLEG, but uses the
|
||||
// runtime without Evented PLEG support.
|
||||
klog.V(4).InfoS("Runtime does not set pod status timestamp", "pod", klog.KObj(pod))
|
||||
containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
|
||||
if err != nil {
|
||||
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
|
||||
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// Get the statuses of all containers visible to the pod and
|
||||
// timestamp from sandboxStatus.
|
||||
timestamp = time.Unix(resp.Timestamp, 0)
|
||||
for _, cs := range resp.ContainersStatuses {
|
||||
cStatus := m.convertToKubeContainerStatus(cs)
|
||||
containerStatuses = append(containerStatuses, cStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get statuses of all containers visible in the pod.
|
||||
containerStatuses, err := m.getPodContainerStatuses(ctx, uid, name, namespace)
|
||||
if err != nil {
|
||||
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
|
||||
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
// Get statuses of all containers visible in the pod.
|
||||
containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
|
||||
if err != nil {
|
||||
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
|
||||
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.logReduction.ClearID(podFullName)
|
||||
|
||||
return &kubecontainer.PodStatus{
|
||||
@ -1043,6 +1097,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
|
||||
IPs: podIPs,
|
||||
SandboxStatuses: sandboxStatuses,
|
||||
ContainerStatuses: containerStatuses,
|
||||
TimeStamp: timestamp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
398
pkg/kubelet/pleg/evented.go
Normal file
398
pkg/kubelet/pleg/evented.go
Normal file
@ -0,0 +1,398 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// The frequency with which global timestamp of the cache is to
|
||||
// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
|
||||
// call, after this period it will be unblocked.
|
||||
const globalCacheUpdatePeriod = 5 * time.Second
|
||||
|
||||
var (
|
||||
eventedPLEGUsage = false
|
||||
eventedPLEGUsageMu = sync.RWMutex{}
|
||||
)
|
||||
|
||||
// isEventedPLEGInUse indicates whether Evented PLEG is in use. Even after enabling
|
||||
// the Evented PLEG feature gate, there could be several reasons it may not be in use.
|
||||
// e.g. Streaming data issues from the runtime or the runtime does not implement the
|
||||
// container events stream.
|
||||
func isEventedPLEGInUse() bool {
|
||||
eventedPLEGUsageMu.Lock()
|
||||
defer eventedPLEGUsageMu.Unlock()
|
||||
return eventedPLEGUsage
|
||||
}
|
||||
|
||||
// setEventedPLEGUsage should only be accessed from
|
||||
// Start/Stop of Evented PLEG.
|
||||
func setEventedPLEGUsage(enable bool) {
|
||||
eventedPLEGUsageMu.RLock()
|
||||
defer eventedPLEGUsageMu.RUnlock()
|
||||
eventedPLEGUsage = enable
|
||||
}
|
||||
|
||||
type EventedPLEG struct {
|
||||
// The container runtime.
|
||||
runtime kubecontainer.Runtime
|
||||
// The runtime service.
|
||||
runtimeService internalapi.RuntimeService
|
||||
// The channel from which the subscriber listens events.
|
||||
eventChannel chan *PodLifecycleEvent
|
||||
// Cache for storing the runtime states required for syncing pods.
|
||||
cache kubecontainer.Cache
|
||||
// For testability.
|
||||
clock clock.Clock
|
||||
// GenericPLEG is used to force relist when required.
|
||||
genericPleg PodLifecycleEventGenerator
|
||||
// The maximum number of retries when getting container events from the runtime.
|
||||
eventedPlegMaxStreamRetries int
|
||||
// Indicates relisting related parameters
|
||||
relistDuration *RelistDuration
|
||||
// Stop the Evented PLEG by closing the channel.
|
||||
stopCh chan struct{}
|
||||
// Stops the periodic update of the cache global timestamp.
|
||||
stopCacheUpdateCh chan struct{}
|
||||
// Locks the start/stop operation of the Evented PLEG.
|
||||
runningMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewEventedPLEG instantiates a new EventedPLEG object and return it.
|
||||
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
|
||||
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
|
||||
relistDuration *RelistDuration, clock clock.Clock) PodLifecycleEventGenerator {
|
||||
return &EventedPLEG{
|
||||
runtime: runtime,
|
||||
runtimeService: runtimeService,
|
||||
eventChannel: eventChannel,
|
||||
cache: cache,
|
||||
genericPleg: genericPleg,
|
||||
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
|
||||
relistDuration: relistDuration,
|
||||
clock: clock,
|
||||
}
|
||||
}
|
||||
|
||||
// Watch returns a channel from which the subscriber can receive PodLifecycleEvent events.
|
||||
func (e *EventedPLEG) Watch() chan *PodLifecycleEvent {
|
||||
return e.eventChannel
|
||||
}
|
||||
|
||||
// Relist relists all containers using GenericPLEG
|
||||
func (e *EventedPLEG) Relist() {
|
||||
e.genericPleg.Relist()
|
||||
}
|
||||
|
||||
// Start starts the Evented PLEG
|
||||
func (e *EventedPLEG) Start() {
|
||||
e.runningMu.Lock()
|
||||
defer e.runningMu.Unlock()
|
||||
if isEventedPLEGInUse() {
|
||||
return
|
||||
}
|
||||
setEventedPLEGUsage(true)
|
||||
e.stopCh = make(chan struct{})
|
||||
e.stopCacheUpdateCh = make(chan struct{})
|
||||
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
|
||||
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
|
||||
}
|
||||
|
||||
// Stop stops the Evented PLEG
|
||||
func (e *EventedPLEG) Stop() {
|
||||
e.runningMu.Lock()
|
||||
defer e.runningMu.Unlock()
|
||||
if !isEventedPLEGInUse() {
|
||||
return
|
||||
}
|
||||
setEventedPLEGUsage(false)
|
||||
close(e.stopCh)
|
||||
close(e.stopCacheUpdateCh)
|
||||
}
|
||||
|
||||
// In case the Evented PLEG experiences undetectable issues in the underlying
|
||||
// GRPC connection there is a remote chance the pod might get stuck in a
|
||||
// given state while it has progressed in its life cycle. This function will be
|
||||
// called periodically to update the global timestamp of the cache so that those
|
||||
// pods stuck at GetNewerThan in pod workers will get unstuck.
|
||||
func (e *EventedPLEG) updateGlobalCache() {
|
||||
e.cache.UpdateTime(time.Now())
|
||||
}
|
||||
|
||||
// Update the relisting period and threshold
|
||||
func (e *EventedPLEG) Update(relistDuration *RelistDuration) {
|
||||
e.genericPleg.Update(relistDuration)
|
||||
}
|
||||
|
||||
// Healthy check if PLEG work properly.
|
||||
func (e *EventedPLEG) Healthy() (bool, error) {
|
||||
// GenericPLEG is declared unhealthy when relisting time is more
|
||||
// than the relistThreshold. In case EventedPLEG is turned on,
|
||||
// relistingPeriod and relistingThreshold are adjusted to higher
|
||||
// values. So the health check of Generic PLEG should check
|
||||
// the adjusted values of relistingPeriod and relistingThreshold.
|
||||
|
||||
// EventedPLEG is declared unhealthy only if eventChannel is out of capacity.
|
||||
if len(e.eventChannel) == cap(e.eventChannel) {
|
||||
return false, fmt.Errorf("EventedPLEG: pleg event channel capacity is full with %v events", len(e.eventChannel))
|
||||
}
|
||||
|
||||
timestamp := e.clock.Now()
|
||||
metrics.PLEGLastSeen.Set(float64(timestamp.Unix()))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) watchEventsChannel() {
|
||||
containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, cap(e.eventChannel))
|
||||
defer close(containerEventsResponseCh)
|
||||
|
||||
// Get the container events from the runtime.
|
||||
go func() {
|
||||
numAttempts := 0
|
||||
for {
|
||||
if numAttempts >= e.eventedPlegMaxStreamRetries {
|
||||
if isEventedPLEGInUse() {
|
||||
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
|
||||
klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working")
|
||||
e.Stop()
|
||||
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
|
||||
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
|
||||
e.genericPleg.Start()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh)
|
||||
if err != nil {
|
||||
numAttempts++
|
||||
e.Relist() // Force a relist to get the latest container and pods running metric.
|
||||
klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if isEventedPLEGInUse() {
|
||||
e.processCRIEvents(containerEventsResponseCh)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeapi.ContainerEventResponse) {
|
||||
for event := range containerEventsResponseCh {
|
||||
podID := types.UID(event.PodSandboxStatus.Metadata.Uid)
|
||||
shouldSendPLEGEvent := false
|
||||
|
||||
status, err := e.runtime.GeneratePodStatus(event)
|
||||
if err != nil {
|
||||
// nolint:logcheck // Not using the result of klog.V inside the
|
||||
// if branch is okay, we just use it to determine whether the
|
||||
// additional "podStatus" key and its value should be added.
|
||||
if klog.V(6).Enabled() {
|
||||
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
|
||||
} else {
|
||||
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
|
||||
}
|
||||
} else {
|
||||
if klogV := klog.V(6); klogV.Enabled() {
|
||||
klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
|
||||
} else {
|
||||
klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID)
|
||||
}
|
||||
// Preserve the pod IP across cache updates if the new IP is empty.
|
||||
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
||||
// a pod status after network teardown, but the kubernetes API expects
|
||||
// the completed pod's IP to be available after the pod is dead.
|
||||
status.IPs = e.getPodIPs(podID, status)
|
||||
}
|
||||
|
||||
e.updateRunningPodMetric(status)
|
||||
e.updateRunningContainerMetric(status)
|
||||
|
||||
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
|
||||
for _, sandbox := range status.SandboxStatuses {
|
||||
if sandbox.Id == event.ContainerId {
|
||||
// When the CONTAINER_DELETED_EVENT is received by the kubelet,
|
||||
// the runtime has indicated that the container has been removed
|
||||
// by the runtime and hence, it must be removed from the cache
|
||||
// of kubelet too.
|
||||
e.cache.Delete(podID)
|
||||
}
|
||||
}
|
||||
shouldSendPLEGEvent = true
|
||||
} else {
|
||||
if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) {
|
||||
shouldSendPLEGEvent = true
|
||||
}
|
||||
}
|
||||
|
||||
if shouldSendPLEGEvent {
|
||||
e.processCRIEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) {
|
||||
switch event.ContainerEventType {
|
||||
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
|
||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
||||
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
|
||||
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
|
||||
// We only need to update the pod status on container create.
|
||||
// But we don't have to generate any PodLifeCycleEvent. Container creation related
|
||||
// PodLifeCycleEvent is ignored by the existing Generic PLEG as well.
|
||||
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and
|
||||
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273
|
||||
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
|
||||
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
|
||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
|
||||
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
|
||||
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
|
||||
// In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of
|
||||
// Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169
|
||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
|
||||
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
|
||||
if len(status.IPs) != 0 {
|
||||
return status.IPs
|
||||
}
|
||||
|
||||
oldStatus, err := e.cache.Get(pid)
|
||||
if err != nil || len(oldStatus.IPs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, sandboxStatus := range status.SandboxStatuses {
|
||||
// If at least one sandbox is ready, then use this status update's pod IP
|
||||
if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
||||
return status.IPs
|
||||
}
|
||||
}
|
||||
|
||||
// For pods with no ready containers or sandboxes (like exited pods)
|
||||
// use the old status' pod IP
|
||||
return oldStatus.IPs
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) sendPodLifecycleEvent(event *PodLifecycleEvent) {
|
||||
select {
|
||||
case e.eventChannel <- event:
|
||||
default:
|
||||
// record how many events were discarded due to channel out of capacity
|
||||
metrics.PLEGDiscardEvents.Inc()
|
||||
klog.ErrorS(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
|
||||
}
|
||||
}
|
||||
|
||||
func getPodSandboxState(podStatus *kubecontainer.PodStatus) kubecontainer.State {
|
||||
// increase running pod count when cache doesn't contain podID
|
||||
var sandboxId string
|
||||
for _, sandbox := range podStatus.SandboxStatuses {
|
||||
sandboxId = sandbox.Id
|
||||
// pod must contain only one sandbox
|
||||
break
|
||||
}
|
||||
|
||||
for _, containerStatus := range podStatus.ContainerStatuses {
|
||||
if containerStatus.ID.ID == sandboxId {
|
||||
if containerStatus.State == kubecontainer.ContainerStateRunning {
|
||||
return containerStatus.State
|
||||
}
|
||||
}
|
||||
}
|
||||
return kubecontainer.ContainerStateExited
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) {
|
||||
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
||||
}
|
||||
// cache miss condition: The pod status object will have empty state if missed in cache
|
||||
if len(cachedPodStatus.SandboxStatuses) < 1 {
|
||||
sandboxState := getPodSandboxState(podStatus)
|
||||
if sandboxState == kubecontainer.ContainerStateRunning {
|
||||
metrics.RunningPodCount.Inc()
|
||||
}
|
||||
} else {
|
||||
oldSandboxState := getPodSandboxState(cachedPodStatus)
|
||||
currentSandboxState := getPodSandboxState(podStatus)
|
||||
|
||||
if oldSandboxState == kubecontainer.ContainerStateRunning && currentSandboxState != kubecontainer.ContainerStateRunning {
|
||||
metrics.RunningPodCount.Dec()
|
||||
} else if oldSandboxState != kubecontainer.ContainerStateRunning && currentSandboxState == kubecontainer.ContainerStateRunning {
|
||||
metrics.RunningPodCount.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getContainerStateCount(podStatus *kubecontainer.PodStatus) map[kubecontainer.State]int {
|
||||
containerStateCount := make(map[kubecontainer.State]int)
|
||||
for _, container := range podStatus.ContainerStatuses {
|
||||
containerStateCount[container.State]++
|
||||
}
|
||||
return containerStateCount
|
||||
}
|
||||
|
||||
func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) {
|
||||
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
||||
}
|
||||
|
||||
// cache miss condition: The pod status object will have empty state if missed in cache
|
||||
if len(cachedPodStatus.SandboxStatuses) < 1 {
|
||||
containerStateCount := getContainerStateCount(podStatus)
|
||||
for state, count := range containerStateCount {
|
||||
// add currently obtained count
|
||||
metrics.RunningContainerCount.WithLabelValues(string(state)).Add(float64(count))
|
||||
}
|
||||
} else {
|
||||
oldContainerStateCount := getContainerStateCount(cachedPodStatus)
|
||||
currentContainerStateCount := getContainerStateCount(podStatus)
|
||||
|
||||
// old and new set of container states may vary;
|
||||
// get a unique set of container states combining both
|
||||
containerStates := make(map[kubecontainer.State]bool)
|
||||
for state := range oldContainerStateCount {
|
||||
containerStates[state] = true
|
||||
}
|
||||
for state := range currentContainerStateCount {
|
||||
containerStates[state] = true
|
||||
}
|
||||
|
||||
// update the metric via difference of old and current counts
|
||||
for state := range containerStates {
|
||||
diff := currentContainerStateCount[state] - oldContainerStateCount[state]
|
||||
metrics.RunningContainerCount.WithLabelValues(string(state)).Add(float64(diff))
|
||||
}
|
||||
}
|
||||
}
|
133
pkg/kubelet/pleg/evented_test.go
Normal file
133
pkg/kubelet/pleg/evented_test.go
Normal file
@ -0,0 +1,133 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
v1 "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
critest "k8s.io/cri-api/pkg/apis/testing"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func newTestEventedPLEG() *EventedPLEG {
|
||||
return &EventedPLEG{
|
||||
runtime: &containertest.FakeRuntime{},
|
||||
clock: testingclock.NewFakeClock(time.Time{}),
|
||||
cache: kubecontainer.NewCache(),
|
||||
runtimeService: critest.NewFakeRuntimeService(),
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthyEventedPLEG(t *testing.T) {
|
||||
metrics.Register()
|
||||
pleg := newTestEventedPLEG()
|
||||
|
||||
_, _, events := createTestPodsStatusesAndEvents(100)
|
||||
for _, event := range events[:5] {
|
||||
pleg.eventChannel <- event
|
||||
}
|
||||
|
||||
// test if healthy when event channel has 5 events
|
||||
isHealthy, err := pleg.Healthy()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, true, isHealthy)
|
||||
|
||||
// send remaining 95 events and make channel out of capacity
|
||||
for _, event := range events[5:] {
|
||||
pleg.eventChannel <- event
|
||||
}
|
||||
// pleg is unhealthy when channel is out of capacity
|
||||
isHealthy, err = pleg.Healthy()
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, false, isHealthy)
|
||||
}
|
||||
|
||||
func TestUpdateRunningPodMetric(t *testing.T) {
|
||||
metrics.Register()
|
||||
pleg := newTestEventedPLEG()
|
||||
|
||||
podStatuses := make([]*kubecontainer.PodStatus, 5)
|
||||
for i := range podStatuses {
|
||||
id := fmt.Sprintf("test-pod-%d", i)
|
||||
podStatuses[i] = &kubecontainer.PodStatus{
|
||||
ID: types.UID(id),
|
||||
SandboxStatuses: []*v1.PodSandboxStatus{
|
||||
{Id: id},
|
||||
},
|
||||
ContainerStatuses: []*kubecontainer.Status{
|
||||
{ID: kubecontainer.ContainerID{ID: id}, State: kubecontainer.ContainerStateRunning},
|
||||
},
|
||||
}
|
||||
|
||||
pleg.updateRunningPodMetric(podStatuses[i])
|
||||
pleg.cache.Set(podStatuses[i].ID, podStatuses[i], nil, time.Now())
|
||||
|
||||
}
|
||||
pleg.cache.UpdateTime(time.Now())
|
||||
|
||||
expectedMetric := `
|
||||
# HELP kubelet_running_pods [ALPHA] Number of pods that have a running pod sandbox
|
||||
# TYPE kubelet_running_pods gauge
|
||||
kubelet_running_pods 5
|
||||
`
|
||||
testMetric(t, expectedMetric, metrics.RunningPodCount.FQName())
|
||||
|
||||
// stop sandbox containers for first 2 pods
|
||||
for _, podStatus := range podStatuses[:2] {
|
||||
podId := string(podStatus.ID)
|
||||
newPodStatus := kubecontainer.PodStatus{
|
||||
ID: podStatus.ID,
|
||||
SandboxStatuses: []*v1.PodSandboxStatus{
|
||||
{Id: podId},
|
||||
},
|
||||
ContainerStatuses: []*kubecontainer.Status{
|
||||
// update state to container exited
|
||||
{ID: kubecontainer.ContainerID{ID: podId}, State: kubecontainer.ContainerStateExited},
|
||||
},
|
||||
}
|
||||
|
||||
pleg.updateRunningPodMetric(&newPodStatus)
|
||||
pleg.cache.Set(newPodStatus.ID, &newPodStatus, nil, time.Now())
|
||||
}
|
||||
pleg.cache.UpdateTime(time.Now())
|
||||
|
||||
expectedMetric = `
|
||||
# HELP kubelet_running_pods [ALPHA] Number of pods that have a running pod sandbox
|
||||
# TYPE kubelet_running_pods gauge
|
||||
kubelet_running_pods 3
|
||||
`
|
||||
testMetric(t, expectedMetric, metrics.RunningPodCount.FQName())
|
||||
}
|
||||
|
||||
func testMetric(t *testing.T, expectedMetric string, metricName string) {
|
||||
err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(expectedMetric), metricName)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
@ -19,14 +19,17 @@ package pleg
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/utils/clock"
|
||||
@ -48,8 +51,6 @@ import (
|
||||
// recommended to set the relist period short and have an auxiliary, longer
|
||||
// periodic sync in kubelet as the safety net.
|
||||
type GenericPLEG struct {
|
||||
// The period for relisting.
|
||||
relistPeriod time.Duration
|
||||
// The container runtime.
|
||||
runtime kubecontainer.Runtime
|
||||
// The channel from which the subscriber listens events.
|
||||
@ -65,6 +66,16 @@ type GenericPLEG struct {
|
||||
// Pods that failed to have their status retrieved during a relist. These pods will be
|
||||
// retried during the next relisting.
|
||||
podsToReinspect map[types.UID]*kubecontainer.Pod
|
||||
// Stop the Generic PLEG by closing the channel.
|
||||
stopCh chan struct{}
|
||||
// Locks the relisting of the Generic PLEG
|
||||
relistLock sync.Mutex
|
||||
// Indicates if the Generic PLEG is running or not
|
||||
isRunning bool
|
||||
// Locks the start/stop operation of Generic PLEG
|
||||
runningMu sync.Mutex
|
||||
// Indicates relisting related parameters
|
||||
relistDuration *RelistDuration
|
||||
}
|
||||
|
||||
// plegContainerState has a one-to-one mapping to the
|
||||
@ -77,11 +88,6 @@ const (
|
||||
plegContainerExited plegContainerState = "exited"
|
||||
plegContainerUnknown plegContainerState = "unknown"
|
||||
plegContainerNonExistent plegContainerState = "non-existent"
|
||||
|
||||
// The threshold needs to be greater than the relisting period + the
|
||||
// relisting time, which can vary significantly. Set a conservative
|
||||
// threshold to avoid flipping between healthy and unhealthy.
|
||||
relistThreshold = 3 * time.Minute
|
||||
)
|
||||
|
||||
func convertState(state kubecontainer.State) plegContainerState {
|
||||
@ -108,15 +114,16 @@ type podRecord struct {
|
||||
type podRecords map[types.UID]*podRecord
|
||||
|
||||
// NewGenericPLEG instantiates a new GenericPLEG object and return it.
|
||||
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
||||
relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
|
||||
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
|
||||
relistDuration *RelistDuration, cache kubecontainer.Cache,
|
||||
clock clock.Clock) PodLifecycleEventGenerator {
|
||||
return &GenericPLEG{
|
||||
relistPeriod: relistPeriod,
|
||||
runtime: runtime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
||||
podRecords: make(podRecords),
|
||||
cache: cache,
|
||||
clock: clock,
|
||||
relistDuration: relistDuration,
|
||||
runtime: runtime,
|
||||
eventChannel: eventChannel,
|
||||
podRecords: make(podRecords),
|
||||
cache: cache,
|
||||
clock: clock,
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +136,26 @@ func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
|
||||
|
||||
// Start spawns a goroutine to relist periodically.
|
||||
func (g *GenericPLEG) Start() {
|
||||
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
|
||||
g.runningMu.Lock()
|
||||
defer g.runningMu.Unlock()
|
||||
if !g.isRunning {
|
||||
g.isRunning = true
|
||||
g.stopCh = make(chan struct{})
|
||||
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) Stop() {
|
||||
g.runningMu.Lock()
|
||||
defer g.runningMu.Unlock()
|
||||
if g.isRunning {
|
||||
close(g.stopCh)
|
||||
g.isRunning = false
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) Update(relistDuration *RelistDuration) {
|
||||
g.relistDuration = relistDuration
|
||||
}
|
||||
|
||||
// Healthy check if PLEG work properly.
|
||||
@ -142,8 +168,8 @@ func (g *GenericPLEG) Healthy() (bool, error) {
|
||||
// Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
|
||||
metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
|
||||
elapsed := g.clock.Since(relistTime)
|
||||
if elapsed > relistThreshold {
|
||||
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
|
||||
if elapsed > g.relistDuration.RelistThreshold {
|
||||
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
@ -186,9 +212,12 @@ func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
|
||||
g.relistTime.Store(timestamp)
|
||||
}
|
||||
|
||||
// relist queries the container runtime for list of pods/containers, compare
|
||||
// Relist queries the container runtime for list of pods/containers, compare
|
||||
// with the internal pods/containers, and generates events accordingly.
|
||||
func (g *GenericPLEG) relist() {
|
||||
func (g *GenericPLEG) Relist() {
|
||||
g.relistLock.Lock()
|
||||
defer g.relistLock.Unlock()
|
||||
|
||||
ctx := context.Background()
|
||||
klog.V(5).InfoS("GenericPLEG: Relisting")
|
||||
|
||||
@ -249,7 +278,7 @@ func (g *GenericPLEG) relist() {
|
||||
// inspecting the pod and getting the PodStatus to update the cache
|
||||
// serially may take a while. We should be aware of this and
|
||||
// parallelize if needed.
|
||||
if err := g.updateCache(ctx, pod, pid); err != nil {
|
||||
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
|
||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
||||
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||
|
||||
@ -262,6 +291,11 @@ func (g *GenericPLEG) relist() {
|
||||
// from the list (we don't want the reinspection code below to inspect it a second time in
|
||||
// this relist execution)
|
||||
delete(g.podsToReinspect, pid)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
|
||||
if !updated {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update the internal storage and send out the events.
|
||||
@ -307,7 +341,7 @@ func (g *GenericPLEG) relist() {
|
||||
if len(g.podsToReinspect) > 0 {
|
||||
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
|
||||
for pid, pod := range g.podsToReinspect {
|
||||
if err := g.updateCache(ctx, pod, pid); err != nil {
|
||||
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
|
||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
||||
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||
needsReinspection[pid] = pod
|
||||
@ -390,18 +424,20 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus)
|
||||
return oldStatus.IPs
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) error {
|
||||
// updateCache tries to update the pod status in the kubelet cache and returns true if the
|
||||
// pod status was actually updated in the cache. It will return false if the pod status
|
||||
// was ignored by the cache.
|
||||
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
|
||||
if pod == nil {
|
||||
// The pod is missing in the current relist. This means that
|
||||
// the pod has no visible (active or inactive) containers.
|
||||
klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid))
|
||||
g.cache.Delete(pid)
|
||||
return nil
|
||||
return nil, true
|
||||
}
|
||||
|
||||
timestamp := g.clock.Now()
|
||||
// TODO: Consider adding a new runtime method
|
||||
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
||||
// all containers again.
|
||||
|
||||
status, err := g.runtime.GetPodStatus(ctx, pod.ID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
// nolint:logcheck // Not using the result of klog.V inside the
|
||||
@ -425,8 +461,21 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
|
||||
status.IPs = g.getPodIPs(pid, status)
|
||||
}
|
||||
|
||||
g.cache.Set(pod.ID, status, err, timestamp)
|
||||
return err
|
||||
// When we use Generic PLEG only, the PodStatus is saved in the cache without
|
||||
// any validation of the existing status against the current timestamp.
|
||||
// This works well when there is only Generic PLEG setting the PodStatus in the cache however,
|
||||
// if we have multiple entities, such as Evented PLEG, while trying to set the PodStatus in the
|
||||
// cache we may run into the racy timestamps given each of them were to calculate the timestamps
|
||||
// in their respective execution flow. While Generic PLEG calculates this timestamp and gets
|
||||
// the PodStatus, we can only calculate the corresponding timestamp in
|
||||
// Evented PLEG after the event has been received by the Kubelet.
|
||||
// For more details refer to:
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/3386-kubelet-evented-pleg#timestamp-of-the-pod-status
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && isEventedPLEGInUse() {
|
||||
timestamp = status.TimeStamp
|
||||
}
|
||||
|
||||
return err, g.cache.Set(pod.ID, status, err, timestamp)
|
||||
}
|
||||
|
||||
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
|
||||
|
@ -61,11 +61,11 @@ func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG {
|
||||
// The channel capacity should be large enough to hold all events in a
|
||||
// single test.
|
||||
pleg := &GenericPLEG{
|
||||
relistPeriod: time.Hour,
|
||||
runtime: fakeRuntime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
|
||||
podRecords: make(podRecords),
|
||||
clock: clock,
|
||||
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute},
|
||||
runtime: fakeRuntime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
|
||||
podRecords: make(podRecords),
|
||||
clock: clock,
|
||||
}
|
||||
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock}
|
||||
}
|
||||
@ -126,7 +126,7 @@ func TestRelisting(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Report every running/exited container if we see them for the first time.
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerStarted, Data: "c2"},
|
||||
@ -138,7 +138,7 @@ func TestRelisting(t *testing.T) {
|
||||
|
||||
// The second relist should not send out any event because no container has
|
||||
// changed.
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actual = getEventsFromChannel(ch)
|
||||
assert.True(t, len(actual) == 0, "no container has changed, event length should be 0")
|
||||
|
||||
@ -157,7 +157,7 @@ func TestRelisting(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Only report containers that transitioned to running or exited status.
|
||||
expected = []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
|
||||
@ -193,7 +193,7 @@ func TestEventChannelFull(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Report every running/exited container if we see them for the first time.
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerStarted, Data: "c2"},
|
||||
@ -218,7 +218,7 @@ func TestEventChannelFull(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
allEvents := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
@ -258,7 +258,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
|
||||
}
|
||||
// Relist and drain the events from the channel.
|
||||
for i := 0; i < numRelists; i++ {
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
getEventsFromChannel(ch)
|
||||
}
|
||||
|
||||
@ -273,7 +273,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c2"},
|
||||
@ -297,14 +297,14 @@ func testReportMissingPods(t *testing.T, numRelists int) {
|
||||
}
|
||||
// Relist and drain the events from the channel.
|
||||
for i := 0; i < numRelists; i++ {
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
getEventsFromChannel(ch)
|
||||
}
|
||||
|
||||
// Container c2 was stopped and removed between relists. We should report
|
||||
// the event.
|
||||
runtime.AllPodList = []*containertest.FakePod{}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c2"},
|
||||
@ -315,12 +315,12 @@ func testReportMissingPods(t *testing.T, numRelists int) {
|
||||
|
||||
func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG {
|
||||
pleg := &GenericPLEG{
|
||||
relistPeriod: time.Hour,
|
||||
runtime: runtimeMock,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
podRecords: make(podRecords),
|
||||
cache: kubecontainer.NewCache(),
|
||||
clock: clock.RealClock{},
|
||||
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour},
|
||||
runtime: runtimeMock,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 1000),
|
||||
podRecords: make(podRecords),
|
||||
cache: kubecontainer.NewCache(),
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
return pleg
|
||||
}
|
||||
@ -366,7 +366,7 @@ func TestRelistWithCache(t *testing.T) {
|
||||
statusErr := fmt.Errorf("unable to get status")
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Times(1)
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
cases := []struct {
|
||||
pod *kubecontainer.Pod
|
||||
@ -387,7 +387,7 @@ func TestRelistWithCache(t *testing.T) {
|
||||
|
||||
// Return normal status for pods[1].
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(statuses[1], nil).Times(1)
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
cases = []struct {
|
||||
pod *kubecontainer.Pod
|
||||
@ -418,11 +418,11 @@ func TestRemoveCacheEntry(t *testing.T) {
|
||||
runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, pods[0].ID, "", "").Return(statuses[0], nil).Times(1)
|
||||
// Does a relist to populate the cache.
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Delete the pod from runtime. Verify that the cache entry has been
|
||||
// removed after relisting.
|
||||
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{}, nil).Times(1)
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualStatus, actualErr := pleg.cache.Get(pods[0].ID)
|
||||
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
@ -443,14 +443,14 @@ func TestHealthy(t *testing.T) {
|
||||
|
||||
// Relist and than advance the time by 1 minute. pleg should be healthy
|
||||
// because this is within the allowed limit.
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
clock.Step(time.Minute * 1)
|
||||
ok, _ = pleg.Healthy()
|
||||
assert.True(t, ok, "pleg should be healthy")
|
||||
|
||||
// Advance by relistThreshold without any relisting. pleg should be unhealthy
|
||||
// because it has been longer than relistThreshold since a relist occurred.
|
||||
clock.Step(relistThreshold)
|
||||
clock.Step(pleg.relistDuration.RelistThreshold)
|
||||
ok, _ = pleg.Healthy()
|
||||
assert.False(t, ok, "pleg should be unhealthy")
|
||||
}
|
||||
@ -482,7 +482,7 @@ func TestRelistWithReinspection(t *testing.T) {
|
||||
goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID}
|
||||
|
||||
// listing 1 - everything ok, infra container set up for pod
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
actualStatus, actualErr := pleg.cache.Get(podID)
|
||||
assert.Equal(t, goodStatus, actualStatus)
|
||||
@ -504,7 +504,7 @@ func TestRelistWithReinspection(t *testing.T) {
|
||||
}
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(badStatus, errors.New("inspection error")).Times(1)
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(podID)
|
||||
assert.Equal(t, badStatus, actualStatus)
|
||||
@ -516,7 +516,7 @@ func TestRelistWithReinspection(t *testing.T) {
|
||||
runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(goodStatus, nil).Times(1)
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(podID)
|
||||
assert.Equal(t, goodStatus, actualStatus)
|
||||
@ -550,7 +550,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Report every running/exited container if we see them for the first time.
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerStarted, Data: "c2"},
|
||||
@ -562,7 +562,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
|
||||
|
||||
// The second relist should not send out any event because no container has
|
||||
// changed.
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
verifyEvents(t, expected, actual)
|
||||
|
||||
runtime.AllPodList = []*containertest.FakePod{
|
||||
@ -580,7 +580,7 @@ func TestRelistingWithSandboxes(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
// Only report containers that transitioned to running or exited status.
|
||||
expected = []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
|
||||
@ -639,7 +639,7 @@ func TestRelistIPChange(t *testing.T) {
|
||||
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
actualStatus, actualErr := pleg.cache.Get(pod.ID)
|
||||
assert.Equal(t, status, actualStatus, tc.name)
|
||||
@ -660,7 +660,7 @@ func TestRelistIPChange(t *testing.T) {
|
||||
runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
|
||||
runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(pod.ID)
|
||||
// Must copy status to compare since its pointer gets passed through all
|
||||
@ -704,7 +704,7 @@ func TestRunningPodAndContainerCount(t *testing.T) {
|
||||
}},
|
||||
}
|
||||
|
||||
pleg.relist()
|
||||
pleg.Relist()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -17,12 +17,23 @@ limitations under the License.
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// PodLifeCycleEventType define the event type of pod life cycle events.
|
||||
type PodLifeCycleEventType string
|
||||
|
||||
type RelistDuration struct {
|
||||
// The period for relisting.
|
||||
RelistPeriod time.Duration
|
||||
// The relisting threshold needs to be greater than the relisting period +
|
||||
// the relisting time, which can vary significantly. Set a conservative
|
||||
// threshold to avoid flipping between healthy and unhealthy.
|
||||
RelistThreshold time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
// ContainerStarted - event type when the new state of container is running.
|
||||
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
|
||||
@ -52,6 +63,9 @@ type PodLifecycleEvent struct {
|
||||
// PodLifecycleEventGenerator contains functions for generating pod life cycle events.
|
||||
type PodLifecycleEventGenerator interface {
|
||||
Start()
|
||||
Stop()
|
||||
Update(relistDuration *RelistDuration)
|
||||
Watch() chan *PodLifecycleEvent
|
||||
Healthy() (bool, error)
|
||||
Relist()
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -121,7 +121,6 @@ service RuntimeService {
|
||||
|
||||
// GetContainerEvents gets container events from the CRI runtime
|
||||
rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}
|
||||
|
||||
}
|
||||
|
||||
// ImageService defines the public APIs for managing images.
|
||||
@ -544,6 +543,10 @@ message PodSandboxStatusResponse {
|
||||
// debug, e.g. network namespace for linux container based container runtime.
|
||||
// It should only be returned non-empty when Verbose is true.
|
||||
map<string, string> info = 2;
|
||||
// Container statuses
|
||||
repeated ContainerStatus containers_statuses = 3;
|
||||
// Timestamp at which container and pod statuses were recorded
|
||||
int64 timestamp = 4;
|
||||
}
|
||||
|
||||
// PodSandboxStateValue is the wrapper of PodSandboxState.
|
||||
@ -1693,8 +1696,11 @@ message ContainerEventResponse {
|
||||
// Creation timestamp of this event
|
||||
int64 created_at = 3;
|
||||
|
||||
// ID of the sandbox container
|
||||
PodSandboxMetadata pod_sandbox_metadata = 4;
|
||||
// Sandbox status
|
||||
PodSandboxStatus pod_sandbox_status = 4;
|
||||
|
||||
// Container statuses
|
||||
repeated ContainerStatus containers_statuses = 5;
|
||||
}
|
||||
|
||||
enum ContainerEventType {
|
||||
@ -1709,4 +1715,4 @@ enum ContainerEventType {
|
||||
|
||||
// Container deleted
|
||||
CONTAINER_DELETED_EVENT = 3;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user