mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
kubelet: migrate pleg to contextual logging
Signed-off-by: Oksana Baranova <oksana.baranova@intel.com>
This commit is contained in:
parent
19e8e59d06
commit
2474369227
@ -152,6 +152,7 @@ linters-settings: # please keep this alphabetized
|
|||||||
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
||||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||||
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
||||||
|
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
|
||||||
|
|
||||||
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
||||||
# NewContext calls have to go through klog. Once it is GA, we can lift
|
# NewContext calls have to go through klog. Once it is GA, we can lift
|
||||||
|
@ -198,6 +198,7 @@ linters-settings: # please keep this alphabetized
|
|||||||
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
||||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||||
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
||||||
|
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
|
||||||
|
|
||||||
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
||||||
# NewContext calls have to go through klog. Once it is GA, we can lift
|
# NewContext calls have to go through klog. Once it is GA, we can lift
|
||||||
|
@ -201,6 +201,7 @@ linters-settings: # please keep this alphabetized
|
|||||||
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
||||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||||
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
||||||
|
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
|
||||||
|
|
||||||
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
||||||
# NewContext calls have to go through klog. Once it is GA, we can lift
|
# NewContext calls have to go through klog. Once it is GA, we can lift
|
||||||
|
@ -48,6 +48,7 @@ contextual k8s.io/kubernetes/pkg/controller/.*
|
|||||||
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
contextual k8s.io/kubernetes/pkg/scheduler/.*
|
||||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||||
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
|
||||||
|
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
|
||||||
|
|
||||||
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
||||||
# NewContext calls have to go through klog. Once it is GA, we can lift
|
# NewContext calls have to go through klog. Once it is GA, we can lift
|
||||||
|
@ -738,7 +738,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
RelistPeriod: eventedPlegRelistPeriod,
|
RelistPeriod: eventedPlegRelistPeriod,
|
||||||
RelistThreshold: eventedPlegRelistThreshold,
|
RelistThreshold: eventedPlegRelistThreshold,
|
||||||
}
|
}
|
||||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
||||||
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
|
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
|
||||||
// Evented PLEG should be able to reset the Generic PLEG relisting duration
|
// Evented PLEG should be able to reset the Generic PLEG relisting duration
|
||||||
// to the default value.
|
// to the default value.
|
||||||
@ -746,7 +746,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
RelistPeriod: genericPlegRelistPeriod,
|
RelistPeriod: genericPlegRelistPeriod,
|
||||||
RelistThreshold: genericPlegRelistThreshold,
|
RelistThreshold: genericPlegRelistThreshold,
|
||||||
}
|
}
|
||||||
klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
|
klet.eventedPleg, err = pleg.NewEventedPLEG(logger, klet.containerRuntime, klet.runtimeService, eventChannel,
|
||||||
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
|
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -756,7 +756,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
RelistPeriod: genericPlegRelistPeriod,
|
RelistPeriod: genericPlegRelistPeriod,
|
||||||
RelistThreshold: genericPlegRelistThreshold,
|
RelistThreshold: genericPlegRelistThreshold,
|
||||||
}
|
}
|
||||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
|
||||||
}
|
}
|
||||||
|
|
||||||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
|
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
|
||||||
|
@ -334,7 +334,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.resyncInterval = 10 * time.Second
|
kubelet.resyncInterval = 10 * time.Second
|
||||||
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
|
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
|
||||||
// Relist period does not affect the tests.
|
// Relist period does not affect the tests.
|
||||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
|
kubelet.pleg = pleg.NewGenericPLEG(logger, fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
|
||||||
kubelet.clock = fakeClock
|
kubelet.clock = fakeClock
|
||||||
|
|
||||||
nodeRef := &v1.ObjectReference{
|
nodeRef := &v1.ObjectReference{
|
||||||
|
@ -83,10 +83,12 @@ type EventedPLEG struct {
|
|||||||
stopCacheUpdateCh chan struct{}
|
stopCacheUpdateCh chan struct{}
|
||||||
// Locks the start/stop operation of the Evented PLEG.
|
// Locks the start/stop operation of the Evented PLEG.
|
||||||
runningMu sync.Mutex
|
runningMu sync.Mutex
|
||||||
|
// logger is used for contextual logging
|
||||||
|
logger klog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventedPLEG instantiates a new EventedPLEG object and return it.
|
// NewEventedPLEG instantiates a new EventedPLEG object and return it.
|
||||||
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
|
func NewEventedPLEG(logger klog.Logger, runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
|
||||||
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
|
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
|
||||||
relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) {
|
relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) {
|
||||||
handler, ok := genericPleg.(podLifecycleEventGeneratorHandler)
|
handler, ok := genericPleg.(podLifecycleEventGeneratorHandler)
|
||||||
@ -102,6 +104,7 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru
|
|||||||
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
|
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
|
||||||
relistDuration: relistDuration,
|
relistDuration: relistDuration,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
|
logger: logger,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,7 +187,7 @@ func (e *EventedPLEG) watchEventsChannel() {
|
|||||||
if numAttempts >= e.eventedPlegMaxStreamRetries {
|
if numAttempts >= e.eventedPlegMaxStreamRetries {
|
||||||
if isEventedPLEGInUse() {
|
if isEventedPLEGInUse() {
|
||||||
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
|
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
|
||||||
klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working")
|
e.logger.V(4).Info("Fall back to Generic PLEG relisting since Evented PLEG is not working")
|
||||||
e.Stop()
|
e.Stop()
|
||||||
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
|
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
|
||||||
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
|
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
|
||||||
@ -200,7 +203,7 @@ func (e *EventedPLEG) watchEventsChannel() {
|
|||||||
metrics.EventedPLEGConnErr.Inc()
|
metrics.EventedPLEGConnErr.Inc()
|
||||||
numAttempts++
|
numAttempts++
|
||||||
e.Relist() // Force a relist to get the latest container and pods running metric.
|
e.Relist() // Force a relist to get the latest container and pods running metric.
|
||||||
klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err)
|
e.logger.V(4).Info("Evented PLEG: Failed to get container events, retrying: ", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -221,7 +224,7 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
|
|||||||
// b) in worst case, a relist will eventually sync the pod status.
|
// b) in worst case, a relist will eventually sync the pod status.
|
||||||
// TODO(#114371): Figure out a way to handle this case instead of ignoring.
|
// TODO(#114371): Figure out a way to handle this case instead of ignoring.
|
||||||
if event.PodSandboxStatus == nil || event.PodSandboxStatus.Metadata == nil {
|
if event.PodSandboxStatus == nil || event.PodSandboxStatus.Metadata == nil {
|
||||||
klog.ErrorS(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event)
|
e.logger.Error(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,15 +237,15 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
|
|||||||
// if branch is okay, we just use it to determine whether the
|
// if branch is okay, we just use it to determine whether the
|
||||||
// additional "podStatus" key and its value should be added.
|
// additional "podStatus" key and its value should be added.
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
|
e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
|
||||||
} else {
|
} else {
|
||||||
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID)
|
e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if klogV := klog.V(6); klogV.Enabled() {
|
if klogV := e.logger.V(6); klogV.Enabled() {
|
||||||
klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
|
e.logger.Info("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
|
||||||
} else {
|
} else {
|
||||||
klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID)
|
e.logger.V(4).Info("Evented PLEG: Generated pod status from the received event", "podUID", podID)
|
||||||
}
|
}
|
||||||
// Preserve the pod IP across cache updates if the new IP is empty.
|
// Preserve the pod IP across cache updates if the new IP is empty.
|
||||||
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
||||||
@ -282,23 +285,23 @@ func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse)
|
|||||||
switch event.ContainerEventType {
|
switch event.ContainerEventType {
|
||||||
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
|
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
|
||||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
||||||
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
|
e.logger.V(4).Info("Received Container Stopped Event", "event", event.String())
|
||||||
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
|
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
|
||||||
// We only need to update the pod status on container create.
|
// We only need to update the pod status on container create.
|
||||||
// But we don't have to generate any PodLifeCycleEvent. Container creation related
|
// But we don't have to generate any PodLifeCycleEvent. Container creation related
|
||||||
// PodLifeCycleEvent is ignored by the existing Generic PLEG as well.
|
// PodLifeCycleEvent is ignored by the existing Generic PLEG as well.
|
||||||
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and
|
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and
|
||||||
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273
|
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273
|
||||||
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
|
e.logger.V(4).Info("Received Container Created Event", "event", event.String())
|
||||||
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
|
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
|
||||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
|
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
|
||||||
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
|
e.logger.V(4).Info("Received Container Started Event", "event", event.String())
|
||||||
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
|
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
|
||||||
// In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of
|
// In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of
|
||||||
// Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169
|
// Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169
|
||||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
|
||||||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
|
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
|
||||||
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
|
e.logger.V(4).Info("Received Container Deleted Event", "event", event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -330,7 +333,7 @@ func (e *EventedPLEG) sendPodLifecycleEvent(event *PodLifecycleEvent) {
|
|||||||
default:
|
default:
|
||||||
// record how many events were discarded due to channel out of capacity
|
// record how many events were discarded due to channel out of capacity
|
||||||
metrics.PLEGDiscardEvents.Inc()
|
metrics.PLEGDiscardEvents.Inc()
|
||||||
klog.ErrorS(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
|
e.logger.Error(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -356,7 +359,7 @@ func getPodSandboxState(podStatus *kubecontainer.PodStatus) kubecontainer.State
|
|||||||
func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) {
|
func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) {
|
||||||
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
||||||
}
|
}
|
||||||
// cache miss condition: The pod status object will have empty state if missed in cache
|
// cache miss condition: The pod status object will have empty state if missed in cache
|
||||||
if len(cachedPodStatus.SandboxStatuses) < 1 {
|
if len(cachedPodStatus.SandboxStatuses) < 1 {
|
||||||
@ -387,7 +390,7 @@ func getContainerStateCount(podStatus *kubecontainer.PodStatus) map[kubecontaine
|
|||||||
func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) {
|
func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) {
|
||||||
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
cachedPodStatus, err := e.cache.Get(podStatus.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cache miss condition: The pod status object will have empty state if missed in cache
|
// cache miss condition: The pod status object will have empty state if missed in cache
|
||||||
|
@ -78,6 +78,8 @@ type GenericPLEG struct {
|
|||||||
relistDuration *RelistDuration
|
relistDuration *RelistDuration
|
||||||
// Mutex to serialize updateCache called by relist vs UpdateCache interface
|
// Mutex to serialize updateCache called by relist vs UpdateCache interface
|
||||||
podCacheMutex sync.Mutex
|
podCacheMutex sync.Mutex
|
||||||
|
// logger is used for contextual logging
|
||||||
|
logger klog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// plegContainerState has a one-to-one mapping to the
|
// plegContainerState has a one-to-one mapping to the
|
||||||
@ -116,10 +118,11 @@ type podRecord struct {
|
|||||||
type podRecords map[types.UID]*podRecord
|
type podRecords map[types.UID]*podRecord
|
||||||
|
|
||||||
// NewGenericPLEG instantiates a new GenericPLEG object and return it.
|
// NewGenericPLEG instantiates a new GenericPLEG object and return it.
|
||||||
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
|
func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
|
||||||
relistDuration *RelistDuration, cache kubecontainer.Cache,
|
relistDuration *RelistDuration, cache kubecontainer.Cache,
|
||||||
clock clock.Clock) PodLifecycleEventGenerator {
|
clock clock.Clock) PodLifecycleEventGenerator {
|
||||||
return &GenericPLEG{
|
return &GenericPLEG{
|
||||||
|
logger: logger,
|
||||||
relistDuration: relistDuration,
|
relistDuration: relistDuration,
|
||||||
runtime: runtime,
|
runtime: runtime,
|
||||||
eventChannel: eventChannel,
|
eventChannel: eventChannel,
|
||||||
@ -176,12 +179,12 @@ func (g *GenericPLEG) Healthy() (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
|
func generateEvents(logger klog.Logger, podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
|
||||||
if newState == oldState {
|
if newState == oldState {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
|
logger.V(4).Info("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
|
||||||
switch newState {
|
switch newState {
|
||||||
case plegContainerRunning:
|
case plegContainerRunning:
|
||||||
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
|
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
|
||||||
@ -221,7 +224,8 @@ func (g *GenericPLEG) Relist() {
|
|||||||
defer g.relistLock.Unlock()
|
defer g.relistLock.Unlock()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
klog.V(5).InfoS("GenericPLEG: Relisting")
|
|
||||||
|
g.logger.V(5).Info("GenericPLEG: Relisting")
|
||||||
|
|
||||||
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
|
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
|
||||||
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
|
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
|
||||||
@ -235,7 +239,7 @@ func (g *GenericPLEG) Relist() {
|
|||||||
// Get all the pods.
|
// Get all the pods.
|
||||||
podList, err := g.runtime.GetPods(ctx, true)
|
podList, err := g.runtime.GetPods(ctx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
|
g.logger.Error(err, "GenericPLEG: Unable to retrieve pods")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,7 +258,7 @@ func (g *GenericPLEG) Relist() {
|
|||||||
// Get all containers in the old and the new pod.
|
// Get all containers in the old and the new pod.
|
||||||
allContainers := getContainersFromPods(oldPod, pod)
|
allContainers := getContainersFromPods(oldPod, pod)
|
||||||
for _, container := range allContainers {
|
for _, container := range allContainers {
|
||||||
events := computeEvents(oldPod, pod, &container.ID)
|
events := computeEvents(g.logger, oldPod, pod, &container.ID)
|
||||||
for _, e := range events {
|
for _, e := range events {
|
||||||
updateEvents(eventsByPodID, e)
|
updateEvents(eventsByPodID, e)
|
||||||
}
|
}
|
||||||
@ -282,7 +286,7 @@ func (g *GenericPLEG) Relist() {
|
|||||||
// parallelize if needed.
|
// parallelize if needed.
|
||||||
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
|
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
|
||||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
// Rely on updateCache calling GetPodStatus to log the actual error.
|
||||||
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||||
|
|
||||||
// make sure we try to reinspect the pod during the next relisting
|
// make sure we try to reinspect the pod during the next relisting
|
||||||
needsReinspection[pid] = pod
|
needsReinspection[pid] = pod
|
||||||
@ -315,7 +319,7 @@ func (g *GenericPLEG) Relist() {
|
|||||||
case g.eventChannel <- events[i]:
|
case g.eventChannel <- events[i]:
|
||||||
default:
|
default:
|
||||||
metrics.PLEGDiscardEvents.Inc()
|
metrics.PLEGDiscardEvents.Inc()
|
||||||
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
|
g.logger.Error(nil, "Event channel is full, discard this relist() cycle event")
|
||||||
}
|
}
|
||||||
// Log exit code of containers when they finished in a particular event
|
// Log exit code of containers when they finished in a particular event
|
||||||
if events[i].Type == ContainerDied {
|
if events[i].Type == ContainerDied {
|
||||||
@ -331,7 +335,7 @@ func (g *GenericPLEG) Relist() {
|
|||||||
}
|
}
|
||||||
if containerID, ok := events[i].Data.(string); ok {
|
if containerID, ok := events[i].Data.(string); ok {
|
||||||
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
|
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
|
||||||
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
|
g.logger.V(2).Info("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -341,11 +345,11 @@ func (g *GenericPLEG) Relist() {
|
|||||||
if g.cacheEnabled() {
|
if g.cacheEnabled() {
|
||||||
// reinspect any pods that failed inspection during the previous relist
|
// reinspect any pods that failed inspection during the previous relist
|
||||||
if len(g.podsToReinspect) > 0 {
|
if len(g.podsToReinspect) > 0 {
|
||||||
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
|
g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection")
|
||||||
for pid, pod := range g.podsToReinspect {
|
for pid, pod := range g.podsToReinspect {
|
||||||
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
|
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
|
||||||
// Rely on updateCache calling GetPodStatus to log the actual error.
|
// Rely on updateCache calling GetPodStatus to log the actual error.
|
||||||
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
|
g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||||
needsReinspection[pid] = pod
|
needsReinspection[pid] = pod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -386,7 +390,7 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe
|
|||||||
return containers
|
return containers
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
|
func computeEvents(logger klog.Logger, oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
|
||||||
var pid types.UID
|
var pid types.UID
|
||||||
if oldPod != nil {
|
if oldPod != nil {
|
||||||
pid = oldPod.ID
|
pid = oldPod.ID
|
||||||
@ -395,7 +399,7 @@ func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.Contain
|
|||||||
}
|
}
|
||||||
oldState := getContainerState(oldPod, cid)
|
oldState := getContainerState(oldPod, cid)
|
||||||
newState := getContainerState(newPod, cid)
|
newState := getContainerState(newPod, cid)
|
||||||
return generateEvents(pid, cid.ID, oldState, newState)
|
return generateEvents(logger, pid, cid.ID, oldState, newState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GenericPLEG) cacheEnabled() bool {
|
func (g *GenericPLEG) cacheEnabled() bool {
|
||||||
@ -433,7 +437,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
|
|||||||
if pod == nil {
|
if pod == nil {
|
||||||
// The pod is missing in the current relist. This means that
|
// The pod is missing in the current relist. This means that
|
||||||
// the pod has no visible (active or inactive) containers.
|
// the pod has no visible (active or inactive) containers.
|
||||||
klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid))
|
g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid))
|
||||||
g.cache.Delete(pid)
|
g.cache.Delete(pid)
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
@ -448,15 +452,15 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
|
|||||||
// if branch is okay, we just use it to determine whether the
|
// if branch is okay, we just use it to determine whether the
|
||||||
// additional "podStatus" key and its value should be added.
|
// additional "podStatus" key and its value should be added.
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
|
g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
|
||||||
} else {
|
} else {
|
||||||
klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
|
g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if klogV := klog.V(6); klogV.Enabled() {
|
if klogV := g.logger.V(6); klogV.Enabled() {
|
||||||
klogV.InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
|
g.logger.Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
|
||||||
} else {
|
} else {
|
||||||
klog.V(4).InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
|
g.logger.V(4).Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
|
||||||
}
|
}
|
||||||
// Preserve the pod IP across cache updates if the new IP is empty.
|
// Preserve the pod IP across cache updates if the new IP is empty.
|
||||||
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
||||||
|
Loading…
Reference in New Issue
Block a user