mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #47806 from dcbw/fix-pod-ip-race
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.. kubelet: fix inconsistent display of terminated pod IPs PLEG and kubelet race when reading and sending pod status to the apiserver. PLEG inserts status into a cache, and then signals kubelet. Kubelet then eventually reads the status out of that cache, but in the mean time the status could have been changed by PLEG. When a pod exits, pod status will no longer include the pod's IP address because the network plugin/runtime will report "" for terminated pod IPs. If this status gets inserted into the PLEG cache before kubelet gets the status out of the cache, kubelet will see a blank pod IP address. This happens in about 1/5 of cases when pods are short-lived, and somewhat less frequently for longer running pods. To ensure consistency for properties of dead pods, copy an old status update's IP address over to the new status update if (a) the new status update's IP is missing and (b) all sandboxes of the pod are dead/not-ready (eg, no possibility for a valid IP from the sandbox). Fixes: https://github.com/kubernetes/kubernetes/issues/47265 Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=1449373 @eparis @freehan @kubernetes/rh-networking @kubernetes/sig-network-misc
This commit is contained in:
commit
28df7a1cae
@ -14,6 +14,7 @@ go_library(
|
||||
"pleg.go",
|
||||
],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/metrics:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
)
|
||||
@ -329,6 +330,41 @@ func (g *GenericPLEG) cacheEnabled() bool {
|
||||
return g.cache != nil
|
||||
}
|
||||
|
||||
// Preserve an older cached status' pod IP if the new status has no pod IP
|
||||
// and its sandboxes have exited
|
||||
func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string {
|
||||
if status.IP != "" {
|
||||
return status.IP
|
||||
}
|
||||
|
||||
oldStatus, err := g.cache.Get(pid)
|
||||
if err != nil || oldStatus.IP == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
for _, sandboxStatus := range status.SandboxStatuses {
|
||||
// If at least one sandbox is ready, then use this status update's pod IP
|
||||
if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
||||
return status.IP
|
||||
}
|
||||
}
|
||||
|
||||
if len(status.SandboxStatuses) == 0 {
|
||||
// Without sandboxes (which built-in runtimes like rkt don't report)
|
||||
// look at all the container statuses, and if any containers are
|
||||
// running then use the new pod IP
|
||||
for _, containerStatus := range status.ContainerStatuses {
|
||||
if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning {
|
||||
return status.IP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For pods with no ready containers or sandboxes (like exited pods)
|
||||
// use the old status' pod IP
|
||||
return oldStatus.IP
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||
if pod == nil {
|
||||
// The pod is missing in the current relist. This means that
|
||||
@ -343,6 +379,14 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||
// all containers again.
|
||||
status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
|
||||
glog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
|
||||
if err == nil {
|
||||
// Preserve the pod IP across cache updates if the new IP is empty.
|
||||
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
||||
// a pod status after network teardown, but the kubernetes API expects
|
||||
// the completed pod's IP to be available after the pod is dead.
|
||||
status.IP = g.getPodIP(pid, status)
|
||||
}
|
||||
|
||||
g.cache.Set(pod.ID, status, err, timestamp)
|
||||
return err
|
||||
}
|
||||
|
@ -496,3 +496,58 @@ func TestRelistingWithSandboxes(t *testing.T) {
|
||||
actual = getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestRelistIPChange(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
ch := pleg.Watch()
|
||||
|
||||
id := types.UID("test-pod-0")
|
||||
cState := kubecontainer.ContainerStateRunning
|
||||
container := createTestContainer("c0", cState)
|
||||
pod := &kubecontainer.Pod{
|
||||
ID: id,
|
||||
Containers: []*kubecontainer.Container{container},
|
||||
}
|
||||
ipAddr := "192.168.1.5/24"
|
||||
status := &kubecontainer.PodStatus{
|
||||
ID: id,
|
||||
IP: ipAddr,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}},
|
||||
}
|
||||
event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
|
||||
|
||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
||||
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
actualStatus, actualErr := pleg.cache.Get(pod.ID)
|
||||
assert.Equal(t, status, actualStatus, "test0")
|
||||
assert.Nil(t, actualErr, "test0")
|
||||
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
||||
|
||||
// Clear the IP address and mark the container terminated
|
||||
container = createTestContainer("c0", kubecontainer.ContainerStateExited)
|
||||
pod = &kubecontainer.Pod{
|
||||
ID: id,
|
||||
Containers: []*kubecontainer.Container{container},
|
||||
}
|
||||
status = &kubecontainer.PodStatus{
|
||||
ID: id,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}},
|
||||
}
|
||||
event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID}
|
||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
||||
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(pod.ID)
|
||||
// Must copy status to compare since its pointer gets passed through all
|
||||
// the way to the event
|
||||
statusCopy := *status
|
||||
statusCopy.IP = ipAddr
|
||||
assert.Equal(t, &statusCopy, actualStatus, "test0")
|
||||
assert.Nil(t, actualErr, "test0")
|
||||
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user