mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Migrate pkg/kubelet/status to structured logging
This commit is contained in:
parent
96be00df69
commit
dbe5476a2a
@ -37,7 +37,6 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
|
||||||
statusutil "k8s.io/kubernetes/pkg/util/pod"
|
statusutil "k8s.io/kubernetes/pkg/util/pod"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -151,11 +150,11 @@ func (m *manager) Start() {
|
|||||||
// on the master, where the kubelet is responsible for bootstrapping the pods
|
// on the master, where the kubelet is responsible for bootstrapping the pods
|
||||||
// of the master components.
|
// of the master components.
|
||||||
if m.kubeClient == nil {
|
if m.kubeClient == nil {
|
||||||
klog.Infof("Kubernetes client is nil, not starting status manager.")
|
klog.InfoS("Kubernetes client is nil, not starting status manager")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.Info("Starting to sync pod status with apiserver")
|
klog.InfoS("Starting to sync pod status with apiserver")
|
||||||
//lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
|
//lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
|
||||||
syncTicker := time.Tick(syncPeriod)
|
syncTicker := time.Tick(syncPeriod)
|
||||||
// syncPod and syncBatch share the same go routine to avoid sync races.
|
// syncPod and syncBatch share the same go routine to avoid sync races.
|
||||||
@ -163,11 +162,13 @@ func (m *manager) Start() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case syncRequest := <-m.podStatusChannel:
|
case syncRequest := <-m.podStatusChannel:
|
||||||
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
|
klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel",
|
||||||
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
|
"podUID", syncRequest.podUID,
|
||||||
|
"statusVersion", syncRequest.status.version,
|
||||||
|
"status", syncRequest.status.status)
|
||||||
m.syncPod(syncRequest.podUID, syncRequest.status)
|
m.syncPod(syncRequest.podUID, syncRequest.status)
|
||||||
case <-syncTicker:
|
case <-syncTicker:
|
||||||
klog.V(5).Infof("Status Manager: syncing batch")
|
klog.V(5).InfoS("Status Manager: syncing batch")
|
||||||
// remove any entries in the status channel since the batch will handle them
|
// remove any entries in the status channel since the batch will handle them
|
||||||
for i := len(m.podStatusChannel); i > 0; i-- {
|
for i := len(m.podStatusChannel); i > 0; i-- {
|
||||||
<-m.podStatusChannel
|
<-m.podStatusChannel
|
||||||
@ -204,28 +205,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
|||||||
|
|
||||||
pod, ok := m.podManager.GetPodByUID(podUID)
|
pod, ok := m.podManager.GetPodByUID(podUID)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
|
klog.V(4).InfoS("Pod has been deleted, no need to update readiness", "podUID", string(podUID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldStatus, found := m.podStatuses[pod.UID]
|
oldStatus, found := m.podStatuses[pod.UID]
|
||||||
if !found {
|
if !found {
|
||||||
klog.Warningf("Container readiness changed before pod has synced: %q - %q",
|
klog.InfoS("Container readiness changed before pod has synced",
|
||||||
format.Pod(pod), containerID.String())
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the container to update.
|
// Find the container to update.
|
||||||
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
|
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Warningf("Container readiness changed for unknown container: %q - %q",
|
klog.InfoS("Container readiness changed for unknown container",
|
||||||
format.Pod(pod), containerID.String())
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if containerStatus.Ready == ready {
|
if containerStatus.Ready == ready {
|
||||||
klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
|
klog.V(4).InfoS("Container readiness unchanged",
|
||||||
format.Pod(pod), containerID.String())
|
"ready", ready,
|
||||||
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,7 +251,7 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
|||||||
if conditionIndex != -1 {
|
if conditionIndex != -1 {
|
||||||
status.Conditions[conditionIndex] = condition
|
status.Conditions[conditionIndex] = condition
|
||||||
} else {
|
} else {
|
||||||
klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
|
klog.InfoS("PodStatus missing condition type", "conditionType", conditionType, "status", status)
|
||||||
status.Conditions = append(status.Conditions, condition)
|
status.Conditions = append(status.Conditions, condition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -261,28 +266,31 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine
|
|||||||
|
|
||||||
pod, ok := m.podManager.GetPodByUID(podUID)
|
pod, ok := m.podManager.GetPodByUID(podUID)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
|
klog.V(4).InfoS("Pod has been deleted, no need to update startup", "podUID", string(podUID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldStatus, found := m.podStatuses[pod.UID]
|
oldStatus, found := m.podStatuses[pod.UID]
|
||||||
if !found {
|
if !found {
|
||||||
klog.Warningf("Container startup changed before pod has synced: %q - %q",
|
klog.InfoS("Container startup changed before pod has synced",
|
||||||
format.Pod(pod), containerID.String())
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the container to update.
|
// Find the container to update.
|
||||||
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
|
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Warningf("Container startup changed for unknown container: %q - %q",
|
klog.InfoS("Container startup changed for unknown container",
|
||||||
format.Pod(pod), containerID.String())
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if containerStatus.Started != nil && *containerStatus.Started == started {
|
if containerStatus.Started != nil && *containerStatus.Started == started {
|
||||||
klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
|
klog.V(4).InfoS("Container startup unchanged",
|
||||||
format.Pod(pod), containerID.String())
|
"pod", klog.KObj(pod),
|
||||||
|
"containerID", containerID.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,11 +401,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
|
|||||||
|
|
||||||
// Check for illegal state transition in containers
|
// Check for illegal state transition in containers
|
||||||
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
|
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
|
||||||
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
|
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
|
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
|
||||||
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
|
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -426,7 +434,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
|
|||||||
// The intent here is to prevent concurrent updates to a pod's status from
|
// The intent here is to prevent concurrent updates to a pod's status from
|
||||||
// clobbering each other so the phase of a pod progresses monotonically.
|
// clobbering each other so the phase of a pod progresses monotonically.
|
||||||
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
|
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
|
||||||
klog.V(5).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
klog.V(5).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
|
||||||
return false // No new status.
|
return false // No new status.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,14 +448,18 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
|
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
|
||||||
klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
|
klog.V(5).InfoS("Status Manager: adding pod with new status to podStatusChannel",
|
||||||
pod.UID, newStatus.version, newStatus.status)
|
"pod", klog.KObj(pod),
|
||||||
|
"podUID", pod.UID,
|
||||||
|
"statusVersion", newStatus.version,
|
||||||
|
"status", newStatus.status)
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
// Let the periodic syncBatch handle the update if the channel is full.
|
// Let the periodic syncBatch handle the update if the channel is full.
|
||||||
// We can't block, since we hold the mutex lock.
|
// We can't block, since we hold the mutex lock.
|
||||||
klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
|
klog.V(4).InfoS("Skipping the status update for pod for now because the channel is full",
|
||||||
format.Pod(pod), status)
|
"pod", klog.KObj(pod),
|
||||||
|
"status", status)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -480,7 +492,7 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
|
|||||||
defer m.podStatusesLock.Unlock()
|
defer m.podStatusesLock.Unlock()
|
||||||
for key := range m.podStatuses {
|
for key := range m.podStatuses {
|
||||||
if _, ok := podUIDs[key]; !ok {
|
if _, ok := podUIDs[key]; !ok {
|
||||||
klog.V(5).Infof("Removing %q from status map.", key)
|
klog.V(5).InfoS("Removing pod from status map.", "podUID", key)
|
||||||
delete(m.podStatuses, key)
|
delete(m.podStatuses, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -507,7 +519,9 @@ func (m *manager) syncBatch() {
|
|||||||
syncedUID := kubetypes.MirrorPodUID(uid)
|
syncedUID := kubetypes.MirrorPodUID(uid)
|
||||||
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
|
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
|
||||||
if mirrorUID == "" {
|
if mirrorUID == "" {
|
||||||
klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
|
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
|
||||||
|
"podUID", uid,
|
||||||
|
"pod", klog.KRef(status.podNamespace, status.podName))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
syncedUID = mirrorUID
|
syncedUID = mirrorUID
|
||||||
@ -526,7 +540,7 @@ func (m *manager) syncBatch() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for _, update := range updatedStatuses {
|
for _, update := range updatedStatuses {
|
||||||
klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
|
klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID)
|
||||||
m.syncPod(update.podUID, update.status)
|
m.syncPod(update.podUID, update.status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -534,42 +548,51 @@ func (m *manager) syncBatch() {
|
|||||||
// syncPod syncs the given status with the API server. The caller must not hold the lock.
|
// syncPod syncs the given status with the API server. The caller must not hold the lock.
|
||||||
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
||||||
if !m.needsUpdate(uid, status) {
|
if !m.needsUpdate(uid, status) {
|
||||||
klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
|
klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: make me easier to express from client code
|
// TODO: make me easier to express from client code
|
||||||
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
|
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
|
klog.V(3).Infof("Pod does not exist on the server",
|
||||||
|
"podUID", uid,
|
||||||
|
"pod", klog.KRef(status.podNamespace, status.podName))
|
||||||
// If the Pod is deleted the status will be cleared in
|
// If the Pod is deleted the status will be cleared in
|
||||||
// RemoveOrphanedStatuses, so we just ignore the update here.
|
// RemoveOrphanedStatuses, so we just ignore the update here.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
|
klog.InfoS("Failed to get status for pod",
|
||||||
|
"podUID", uid,
|
||||||
|
"pod", klog.KRef(status.podNamespace, status.podName),
|
||||||
|
"error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
translatedUID := m.podManager.TranslatePodUID(pod.UID)
|
translatedUID := m.podManager.TranslatePodUID(pod.UID)
|
||||||
// Type convert original uid just for the purpose of comparison.
|
// Type convert original uid just for the purpose of comparison.
|
||||||
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
|
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
|
||||||
klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
|
klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update",
|
||||||
|
"pod", klog.KObj(pod),
|
||||||
|
"oldPodUID", uid,
|
||||||
|
"podUID", translatedUID)
|
||||||
m.deletePodStatus(uid)
|
m.deletePodStatus(uid)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldStatus := pod.Status.DeepCopy()
|
oldStatus := pod.Status.DeepCopy()
|
||||||
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
|
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
|
||||||
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
|
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patchBytes", patchBytes)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
|
klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if unchanged {
|
if unchanged {
|
||||||
klog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version)
|
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
|
||||||
} else {
|
} else {
|
||||||
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
|
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status)
|
||||||
pod = newPod
|
pod = newPod
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -585,10 +608,10 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
|||||||
}
|
}
|
||||||
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
|
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
|
klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
|
klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
|
||||||
m.deletePodStatus(uid)
|
m.deletePodStatus(uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -627,14 +650,14 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
|
|||||||
// The pod could be a static pod, so we should translate first.
|
// The pod could be a static pod, so we should translate first.
|
||||||
pod, ok := m.podManager.GetPodByUID(uid)
|
pod, ok := m.podManager.GetPodByUID(uid)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.V(4).Infof("Pod %q has been deleted, no need to reconcile", string(uid))
|
klog.V(4).InfoS("Pod has been deleted, no need to reconcile", "podUID", string(uid))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
|
// If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
|
||||||
if kubetypes.IsStaticPod(pod) {
|
if kubetypes.IsStaticPod(pod) {
|
||||||
mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
|
mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
|
klog.V(4).InfoS("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
pod = mirrorPod
|
pod = mirrorPod
|
||||||
@ -648,8 +671,9 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
|
|||||||
// reconcile is not needed. Just return.
|
// reconcile is not needed. Just return.
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
klog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %s", format.Pod(pod),
|
klog.V(3).InfoS("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered",
|
||||||
diff.ObjectDiff(podStatus, &status))
|
"pod", klog.KObj(pod),
|
||||||
|
"statusDiff", diff.ObjectDiff(podStatus, &status))
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user