Merge pull request #130213 from zhifei92/migrate-kubelet-container-to-contextual-logging

chore(kubelet): migrate container to contextual logging
This commit is contained in:
Kubernetes Prow Robot
2025-09-05 15:41:25 -07:00
committed by GitHub
11 changed files with 39 additions and 24 deletions

View File

@@ -83,6 +83,6 @@ func (cgc *realContainerGC) GarbageCollect(ctx context.Context) error {
}
func (cgc *realContainerGC) DeleteAllUnusedContainers(ctx context.Context) error {
klog.InfoS("Attempting to delete unused containers")
klog.FromContext(ctx).Info("Attempting to delete unused containers")
return cgc.runtime.GarbageCollect(ctx, cgc.policy, cgc.sourcesReadyProvider.AllReady(), true)
}

View File

@@ -79,7 +79,7 @@ type RuntimeHelper interface {
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
func ShouldContainerBeRestarted(logger klog.Logger, container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
// Once a pod has been marked deleted, it should not be restarted
if pod.DeletionTimestamp != nil {
return false
@@ -104,13 +104,13 @@ func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus
return podutil.ContainerShouldRestart(*container, pod.Spec, int32(status.ExitCode))
}
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
klog.V(4).InfoS("Already ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
logger.V(4).Info("Already ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
return false
}
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
// Check the exit code.
if status.ExitCode == 0 {
klog.V(4).InfoS("Already successfully ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
logger.V(4).Info("Already successfully ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
return false
}
}
@@ -366,7 +366,7 @@ func AllContainersAreWindowsHostProcess(pod *v1.Pod) bool {
}
// MakePortMappings creates internal port mapping from api port mapping.
func MakePortMappings(container *v1.Container) (ports []PortMapping) {
func MakePortMappings(logger klog.Logger, container *v1.Container) (ports []PortMapping) {
names := make(map[string]struct{})
for _, p := range container.Ports {
pm := PortMapping{
@@ -395,7 +395,7 @@ func MakePortMappings(container *v1.Container) (ports []PortMapping) {
// Protect against a port name being used more than once in a container.
if _, ok := names[name]; ok {
klog.InfoS("Port name conflicted, it is defined more than once", "portName", name)
logger.Info("Port name conflicted, it is defined more than once", "portName", name)
continue
}
ports = append(ports, pm)

View File

@@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -394,6 +395,7 @@ func TestGetContainerSpec(t *testing.T) {
}
func TestShouldContainerBeRestarted(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
@@ -463,7 +465,7 @@ func TestShouldContainerBeRestarted(t *testing.T) {
for i, policy := range policies {
pod.Spec.RestartPolicy = policy
e := expected[c.Name][i]
r := ShouldContainerBeRestarted(&c, pod, podStatus)
r := ShouldContainerBeRestarted(logger, &c, pod, podStatus)
if r != e {
t.Errorf("Restart for container %q with restart policy %q expected %t, got %t",
c.Name, policy, e, r)
@@ -484,7 +486,7 @@ func TestShouldContainerBeRestarted(t *testing.T) {
for i, policy := range policies {
pod.Spec.RestartPolicy = policy
e := expected[c.Name][i]
r := ShouldContainerBeRestarted(&c, pod, podStatus)
r := ShouldContainerBeRestarted(logger, &c, pod, podStatus)
if r != e {
t.Errorf("Restart for container %q with restart policy %q expected %t, got %t",
c.Name, policy, e, r)
@@ -546,6 +548,7 @@ func TestHasPrivilegedContainer(t *testing.T) {
}
func TestMakePortMappings(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
port := func(name string, protocol v1.Protocol, containerPort, hostPort int32, ip string) v1.ContainerPort {
return v1.ContainerPort{
Name: name,
@@ -635,7 +638,7 @@ func TestMakePortMappings(t *testing.T) {
}
for i, tt := range tests {
actual := MakePortMappings(tt.container)
actual := MakePortMappings(logger, tt.container)
assert.Equal(t, tt.expectedPortMappings, actual, "[%d]", i)
}
}

View File

@@ -235,7 +235,10 @@ func BuildContainerID(typ, ID string) ContainerID {
func ParseContainerID(containerID string) ContainerID {
var id ContainerID
if err := id.ParseString(containerID); err != nil {
klog.ErrorS(err, "Parsing containerID failed")
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// This should be replaced with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
logger.Error(err, "Parsing containerID failed")
}
return id
}

View File

@@ -27,10 +27,10 @@ type TestRuntimeCache struct {
}
// UpdateCacheWithLock updates the cache with the lock.
func (r *TestRuntimeCache) UpdateCacheWithLock() error {
func (r *TestRuntimeCache) UpdateCacheWithLock(ctx context.Context) error {
r.Lock()
defer r.Unlock()
return r.updateCache(context.Background())
return r.updateCache(ctx)
}
// GetCachedPods returns the cached pods.

View File

@@ -17,13 +17,13 @@ limitations under the License.
package container_test
import (
"context"
"reflect"
"testing"
"time"
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func comparePods(t *testing.T, expected []*ctest.FakePod, actual []*Pod) {
@@ -38,12 +38,12 @@ func comparePods(t *testing.T, expected []*ctest.FakePod, actual []*Pod) {
}
func TestGetPods(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
runtime := &ctest.FakeRuntime{}
expected := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = expected
cache := NewTestRuntimeCache(runtime)
actual, err := cache.GetPods(ctx)
actual, err := cache.GetPods(tCtx)
if err != nil {
t.Errorf("unexpected error %v", err)
}
@@ -52,26 +52,32 @@ func TestGetPods(t *testing.T) {
}
func TestForceUpdateIfOlder(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
runtime := &ctest.FakeRuntime{}
cache := NewTestRuntimeCache(runtime)
// Cache old pods.
oldpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}}
runtime.PodList = oldpods
cache.UpdateCacheWithLock()
if err := cache.UpdateCacheWithLock(tCtx); err != nil {
t.Errorf("unexpected error %v", err)
}
// Update the runtime to new pods.
newpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = newpods
// An older timestamp should not force an update.
cache.ForceUpdateIfOlder(ctx, time.Now().Add(-20*time.Minute))
if err := cache.ForceUpdateIfOlder(tCtx, time.Now().Add(-20*time.Minute)); err != nil {
t.Errorf("unexpected error %v", err)
}
actual := cache.GetCachedPods()
comparePods(t, oldpods, actual)
// A newer timestamp should force an update.
cache.ForceUpdateIfOlder(ctx, time.Now().Add(20*time.Second))
if err := cache.ForceUpdateIfOlder(tCtx, time.Now().Add(20*time.Second)); err != nil {
t.Errorf("unexpected error %v", err)
}
actual = cache.GetCachedPods()
comparePods(t, newpods, actual)
}

View File

@@ -33,7 +33,7 @@ func NewFakeCache(runtime container.Runtime) container.Cache {
}
func (c *fakeCache) Get(id types.UID) (*container.PodStatus, error) {
return c.runtime.GetPodStatus(context.Background(), id, "", "")
return c.runtime.GetPodStatus(context.TODO(), id, "", "")
}
func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*container.PodStatus, error) {

View File

@@ -2108,6 +2108,9 @@ func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontaine
// convertToAPIContainerStatuses converts the given internal container
// statuses into API container statuses.
func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecontainer.PodStatus, previousStatus []v1.ContainerStatus, containers []v1.Container, hasInitContainers, isInitContainer bool) []v1.ContainerStatus {
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// This should be replaced with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
convertContainerStatus := func(cs *kubecontainer.Status, oldStatus *v1.ContainerStatus) *v1.ContainerStatus {
cid := cs.ID.String()
status := &v1.ContainerStatus{
@@ -2436,7 +2439,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
}
}
// If a container should be restarted in next syncpod, it is *Waiting*.
if !kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
if !kubecontainer.ShouldContainerBeRestarted(logger, &container, pod, podStatus) {
continue
}
status := statuses[container.Name]

View File

@@ -1184,7 +1184,7 @@ func (m *kubeGenericRuntimeManager) computeInitContainerActions(ctx context.Cont
if isInitContainerFailed(status) {
restartOnFailure := restartOnFailure
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerRestartRules) {
restartOnFailure = kubecontainer.ShouldContainerBeRestarted(container, pod, podStatus)
restartOnFailure = kubecontainer.ShouldContainerBeRestarted(logger, container, pod, podStatus)
}
if !restartOnFailure {
changes.KillPod = true

View File

@@ -1114,7 +1114,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
// If container does not exist, or is not running, check whether we
// need to restart it.
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
if kubecontainer.ShouldContainerBeRestarted(logger, &container, pod, podStatus) {
logger.V(3).Info("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod))
changes.ContainersToStart = append(changes.ContainersToStart, idx)
if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {

View File

@@ -116,7 +116,7 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(ctx context.Context
portMappings := []*runtimeapi.PortMapping{}
for _, c := range pod.Spec.Containers {
containerPortMappings := kubecontainer.MakePortMappings(&c)
containerPortMappings := kubecontainer.MakePortMappings(logger, &c)
for idx := range containerPortMappings {
port := containerPortMappings[idx]