Merge pull request #98850 from yangjunmyfm192085/run-test14

Structured Logging migration: modify volume and container part logs o…
This commit is contained in:
Kubernetes Prow Robot 2021-03-17 11:45:19 -07:00 committed by GitHub
commit b5c6434f6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 111 additions and 172 deletions

View File

@ -82,6 +82,6 @@ func (cgc *realContainerGC) GarbageCollect() error {
} }
func (cgc *realContainerGC) DeleteAllUnusedContainers() error { func (cgc *realContainerGC) DeleteAllUnusedContainers() error {
klog.Infof("attempting to delete unused containers") klog.InfoS("Attempting to delete unused containers")
return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), true) return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), true)
} }

View File

@ -31,7 +31,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/third_party/forked/golang/expansion" "k8s.io/kubernetes/third_party/forked/golang/expansion"
utilsnet "k8s.io/utils/net" utilsnet "k8s.io/utils/net"
@ -82,13 +81,13 @@ func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus
} }
// Check RestartPolicy for dead container // Check RestartPolicy for dead container
if pod.Spec.RestartPolicy == v1.RestartPolicyNever { if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
klog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod)) klog.V(4).InfoS("Already ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
return false return false
} }
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
// Check the exit code. // Check the exit code.
if status.ExitCode == 0 { if status.ExitCode == 0 {
klog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, format.Pod(pod)) klog.V(4).InfoS("Already successfully ran container, do nothing", "pod", klog.KObj(pod), "containerName", container.Name)
return false return false
} }
} }
@ -341,7 +340,7 @@ func MakePortMappings(container *v1.Container) (ports []PortMapping) {
// Protect against a port name being used more than once in a container. // Protect against a port name being used more than once in a container.
if _, ok := names[name]; ok { if _, ok := names[name]; ok {
klog.Warningf("Port name conflicted, %q is defined more than once", name) klog.InfoS("Port name conflicted, it is defined more than once", "portName", name)
continue continue
} }
ports = append(ports, pm) ports = append(ports, pm)

View File

@ -204,7 +204,7 @@ func BuildContainerID(typ, ID string) ContainerID {
func ParseContainerID(containerID string) ContainerID { func ParseContainerID(containerID string) ContainerID {
var id ContainerID var id ContainerID
if err := id.ParseString(containerID); err != nil { if err := id.ParseString(containerID); err != nil {
klog.Error(err) klog.ErrorS(err, "Parsing containerID failed")
} }
return id return id
} }

View File

@ -63,11 +63,11 @@ func (kl *Kubelet) podVolumesExist(podUID types.UID) bool {
// There are some volume plugins such as flexvolume might not have mounts. See issue #61229 // There are some volume plugins such as flexvolume might not have mounts. See issue #61229
volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID) volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID)
if err != nil { if err != nil {
klog.Errorf("pod %q found, but error %v occurred during checking mounted volumes from disk", podUID, err) klog.ErrorS(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID)
return true return true
} }
if len(volumePaths) > 0 { if len(volumePaths) > 0 {
klog.V(4).Infof("pod %q found, but volumes are still mounted on disk %v", podUID, volumePaths) klog.V(4).InfoS("Pod found, but volumes are still mounted on disk", "podUID", podUID, "volumePaths", volumePaths)
return true return true
} }
@ -86,7 +86,7 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *v1.Pod, o
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to instantiate mounter for volume: %s using plugin: %s with a root cause: %v", spec.Name(), plugin.GetPluginName(), err) return nil, fmt.Errorf("failed to instantiate mounter for volume: %s using plugin: %s with a root cause: %v", spec.Name(), plugin.GetPluginName(), err)
} }
klog.V(10).Infof("Using volume plugin %q to mount %s", plugin.GetPluginName(), spec.Name()) klog.V(10).InfoS("Using volume plugin for mount", "volumePluginName", plugin.GetPluginName(), "volumeName", spec.Name())
return physicalMounter, nil return physicalMounter, nil
} }
@ -118,7 +118,7 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
// TODO: getMountedVolumePathListFromDisk() call may be redundant with // TODO: getMountedVolumePathListFromDisk() call may be redundant with
// kl.getPodVolumePathListFromDisk(). Can this be cleaned up? // kl.getPodVolumePathListFromDisk(). Can this be cleaned up?
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
klog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up", uid) klog.V(3).InfoS("Orphaned pod found, but volumes are not cleaned up", "podUID", uid)
continue continue
} }
@ -167,18 +167,18 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
continue continue
} }
klog.V(3).Infof("Orphaned pod %q found, removing", uid) klog.V(3).InfoS("Orphaned pod found, removing", "podUID", uid)
if err := removeall.RemoveAllOneFilesystem(kl.mounter, kl.getPodDir(uid)); err != nil { if err := removeall.RemoveAllOneFilesystem(kl.mounter, kl.getPodDir(uid)); err != nil {
klog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err) klog.ErrorS(err, "Failed to remove orphaned pod dir", "podUID", uid)
orphanRemovalErrors = append(orphanRemovalErrors, err) orphanRemovalErrors = append(orphanRemovalErrors, err)
} }
} }
logSpew := func(errs []error) { logSpew := func(errs []error) {
if len(errs) > 0 { if len(errs) > 0 {
klog.Errorf("%v : There were a total of %v errors similar to this. Turn up verbosity to see them.", errs[0], len(errs)) klog.ErrorS(errs[0], "There were many similar errors. Turn up verbosity to see them.", "numErrs", len(errs))
for _, err := range errs { for _, err := range errs {
klog.V(5).Infof("Orphan pod: %v", err) klog.V(5).InfoS("Orphan pod", "err", err)
} }
} }
} }

View File

@ -49,7 +49,7 @@ func newPodContainerDeletor(runtime kubecontainer.Runtime, containersToKeep int)
for { for {
id := <-buffer id := <-buffer
if err := runtime.DeleteContainer(id); err != nil { if err := runtime.DeleteContainer(id); err != nil {
klog.Warningf("[pod_container_deletor] DeleteContainer returned error for (id=%v): %v", id, err) klog.InfoS("DeleteContainer returned error", "containerID", id, "err", err)
} }
} }
}, 0, wait.NeverStop) }, 0, wait.NeverStop)
@ -76,7 +76,7 @@ func getContainersToDeleteInPod(filterContainerID string, podStatus *kubecontain
}(filterContainerID, podStatus) }(filterContainerID, podStatus)
if filterContainerID != "" && matchedContainer == nil { if filterContainerID != "" && matchedContainer == nil {
klog.Warningf("Container %q not found in pod's containers", filterContainerID) klog.InfoS("Container not found in pod's containers", "containerID", filterContainerID)
return containerStatusbyCreatedList{} return containerStatusbyCreatedList{}
} }
@ -110,7 +110,7 @@ func (p *podContainerDeletor) deleteContainersInPod(filterContainerID string, po
select { select {
case p.worker <- candidate.ID: case p.worker <- candidate.ID:
default: default:
klog.Warningf("Failed to issue the request to remove container %v", candidate.ID) klog.InfoS("Failed to issue the request to remove container", "containerID", candidate.ID)
} }
} }
} }

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/eviction"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/queue"
) )
@ -188,7 +187,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
} }
if err != nil { if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err) klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(update.Pod), "podUID", update.Pod.UID)
} }
p.wrapUp(update.Pod.UID, err) p.wrapUp(update.Pod.UID, err)
} }

View File

@ -51,15 +51,15 @@ func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult,
// If the container logs directory does not exist, create it. // If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil { if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err) klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
} }
} }
select { select {
case u := <-updates: case u := <-updates:
klog.Infof("processing manifest with %d pods", len(u.Pods)) klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
result, err := kl.runOnce(u.Pods, runOnceRetryDelay) result, err := kl.runOnce(u.Pods, runOnceRetryDelay)
klog.Infof("finished processing %d pods", len(u.Pods)) klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
return result, err return result, err
case <-time.After(runOnceManifestDelay): case <-time.After(runOnceManifestDelay):
return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay) return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
@ -85,27 +85,27 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results []
}(pod) }(pod)
} }
klog.Infof("Waiting for %d pods", len(admitted)) klog.InfoS("Waiting for pods", "numPods", len(admitted))
failedPods := []string{} failedPods := []string{}
for i := 0; i < len(admitted); i++ { for i := 0; i < len(admitted); i++ {
res := <-ch res := <-ch
results = append(results, res) results = append(results, res)
if res.Err != nil { if res.Err != nil {
faliedContainerName, err := kl.getFailedContainers(res.Pod) failedContainerName, err := kl.getFailedContainers(res.Pod)
if err != nil { if err != nil {
klog.Infof("unable to get failed containers' names for pod %q, error:%v", format.Pod(res.Pod), err) klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
} else { } else {
klog.Infof("unable to start pod %q because container:%v failed", format.Pod(res.Pod), faliedContainerName) klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
} }
failedPods = append(failedPods, format.Pod(res.Pod)) failedPods = append(failedPods, format.Pod(res.Pod))
} else { } else {
klog.Infof("started pod %q", format.Pod(res.Pod)) klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
} }
} }
if len(failedPods) > 0 { if len(failedPods) > 0 {
return results, fmt.Errorf("error running pods: %v", failedPods) return results, fmt.Errorf("error running pods: %v", failedPods)
} }
klog.Infof("%d pods started", len(pods)) klog.InfoS("Pods started", "numPods", len(pods))
return results, err return results, err
} }
@ -120,14 +120,14 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
} }
if kl.isPodRunning(pod, status) { if kl.isPodRunning(pod, status) {
klog.Infof("pod %q containers running", format.Pod(pod)) klog.InfoS("Pod's containers running", "pod", klog.KObj(pod))
return nil return nil
} }
klog.Infof("pod %q containers not running: syncing", format.Pod(pod)) klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
klog.Infof("Creating a mirror pod for static pod %q", format.Pod(pod)) klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil { if err := kl.podManager.CreateMirrorPod(pod); err != nil {
klog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err) klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if err = kl.syncPod(syncPodOptions{ if err = kl.syncPod(syncPodOptions{
@ -142,7 +142,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries) return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
} }
// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration. // TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
klog.Infof("pod %q containers synced, waiting for %v", format.Pod(pod), delay) klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
time.Sleep(delay) time.Sleep(delay)
retry++ retry++
delay *= runOnceRetryDelayBackoff delay *= runOnceRetryDelayBackoff
@ -154,7 +154,7 @@ func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bo
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
cs := status.FindContainerStatusByName(c.Name) cs := status.FindContainerStatusByName(c.Name)
if cs == nil || cs.State != kubecontainer.ContainerStateRunning { if cs == nil || cs.State != kubecontainer.ContainerStateRunning {
klog.Infof("Container %q for pod %q not running", c.Name, format.Pod(pod)) klog.InfoS("Container not running", "pod", klog.KObj(pod), "containerName", c.Name)
return false return false
} }
} }

View File

@ -72,7 +72,7 @@ func NewInitializedVolumePluginMgr(
csiDriversSynced = csiDriverInformer.Informer().HasSynced csiDriversSynced = csiDriverInformer.Informer().HasSynced
} else { } else {
klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister") klog.InfoS("KubeClient is nil. Skip initialization of CSIDriverLister")
} }
kvh := &kubeletVolumeHost{ kvh := &kubeletVolumeHost{
@ -176,13 +176,13 @@ func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister // WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
func (kvh *kubeletVolumeHost) WaitForCacheSync() error { func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
if kvh.csiDriversSynced == nil { if kvh.csiDriversSynced == nil {
klog.Error("csiDriversSynced not found on KubeletVolumeHost") klog.ErrorS(nil, "CsiDriversSynced not found on KubeletVolumeHost")
return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost") return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
} }
synced := []cache.InformerSynced{kvh.csiDriversSynced} synced := []cache.InformerSynced{kvh.csiDriversSynced}
if !cache.WaitForCacheSync(wait.NeverStop, synced...) { if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
klog.Warning("failed to wait for cache sync for CSIDriverLister") klog.InfoS("Failed to wait for cache sync for CSIDriverLister")
return fmt.Errorf("failed to wait for cache sync for CSIDriverLister") return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
} }

View File

@ -461,9 +461,7 @@ func (asw *actualStateOfWorld) addVolume(
} else { } else {
// If volume object already exists, update the fields such as device path // If volume object already exists, update the fields such as device path
volumeObj.devicePath = devicePath volumeObj.devicePath = devicePath
klog.V(2).Infof("Volume %q is already added to attachedVolume list, update device path %q", klog.V(2).InfoS("Volume is already added to attachedVolume list, update device path", "volumeName", volumeName, "path", devicePath)
volumeName,
devicePath)
} }
asw.attachedVolumes[volumeName] = volumeObj asw.attachedVolumes[volumeName] = volumeObj
@ -530,9 +528,7 @@ func (asw *actualStateOfWorld) MarkVolumeAsResized(
podName, podName,
volumeName) volumeName)
} }
klog.V(5).InfoS("Pod volume has been resized", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
klog.V(5).Infof("Volume %s(OuterVolumeSpecName %s) of pod %s has been resized",
volumeName, podObj.outerVolumeSpecName, podName)
podObj.fsResizeRequired = false podObj.fsResizeRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil return nil
@ -548,12 +544,7 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
asw.volumePluginMgr.FindPluginBySpec(podObj.volumeSpec) asw.volumePluginMgr.FindPluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {
// Log and continue processing // Log and continue processing
klog.Errorf( klog.ErrorS(nil, "MarkRemountRequired failed to FindPluginBySpec for volume", "uniquePodName", podObj.podName, "podUID", podObj.podUID, "volumeName", volumeName, "volumeSpecName", podObj.volumeSpec.Name())
"MarkRemountRequired failed to FindPluginBySpec for pod %q (podUid %q) volume: %q (volSpecName: %q)",
podObj.podName,
podObj.podUID,
volumeObj.volumeName,
podObj.volumeSpec.Name())
continue continue
} }
@ -572,14 +563,13 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
defer asw.Unlock() defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName] volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists { if !volumeExists {
klog.Warningf("MarkFSResizeRequired for volume %s failed as volume not exist", volumeName) klog.InfoS("MarkFSResizeRequired for volume failed as volume does not exist", "volumeName", volumeName)
return return
} }
podObj, podExists := volumeObj.mountedPods[podName] podObj, podExists := volumeObj.mountedPods[podName]
if !podExists { if !podExists {
klog.Warningf("MarkFSResizeRequired for volume %s failed "+ klog.InfoS("MarkFSResizeRequired for volume failed because the pod does not exist", "uniquePodName", podName, "volumeName", volumeName)
"as pod(%s) not exist", volumeName, podName)
return return
} }
@ -587,18 +577,13 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec) asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {
// Log and continue processing // Log and continue processing
klog.Errorf( klog.ErrorS(nil, "MarkFSResizeRequired failed to find expandable plugin for volume", "uniquePodName", podObj.podName, "volumeName", volumeObj.volumeName, "volumeSpecName", podObj.volumeSpec.Name())
"MarkFSResizeRequired failed to find expandable plugin for pod %q volume: %q (volSpecName: %q)",
podObj.podName,
volumeObj.volumeName,
podObj.volumeSpec.Name())
return return
} }
if volumePlugin.RequiresFSResize() { if volumePlugin.RequiresFSResize() {
if !podObj.fsResizeRequired { if !podObj.fsResizeRequired {
klog.V(3).Infof("PVC volume %s(OuterVolumeSpecName %s) of pod %s requires file system resize", klog.V(3).InfoS("PVC volume of the pod requires file system resize", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
volumeName, podObj.outerVolumeSpecName, podName)
podObj.fsResizeRequired = true podObj.fsResizeRequired = true
} }
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj asw.attachedVolumes[volumeName].mountedPods[podName] = podObj

View File

@ -41,7 +41,6 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration" "k8s.io/kubernetes/pkg/volume/csimigration"
@ -139,7 +138,7 @@ type processedPods struct {
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
klog.Infof("Desired state populator starts to run") klog.InfoS("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) { wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady() done := sourcesReady.AllReady()
dswp.populatorLoop() dswp.populatorLoop()
@ -171,11 +170,7 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
// findAndRemoveDeletedPods() is called independently of the main // findAndRemoveDeletedPods() is called independently of the main
// populator loop. // populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration { if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
klog.V(5).Infof( klog.V(5).InfoS("Skipping findAndRemoveDeletedPods(). ", "nextRetryTime", dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration), "retryDuration", dswp.getPodStatusRetryDuration)
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return return
} }
@ -234,7 +229,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
// It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable // It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable
// So the uniqueVolumeName should remain the same after the attachability change // So the uniqueVolumeName should remain the same after the attachability change
dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false) dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false)
klog.Infof("Volume %v changes from attachable to non-attachable.", volumeToMount.VolumeName) klog.InfoS("Volume changes from attachable to non-attachable", "volumeName", volumeToMount.VolumeName)
continue continue
} }
} }
@ -256,9 +251,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
var getPodsErr error var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false) runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
if getPodsErr != nil { if getPodsErr != nil {
klog.Errorf( klog.ErrorS(getPodsErr, "kubeContainerRuntime.findAndRemoveDeletedPods returned error")
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
getPodsErr)
continue continue
} }
@ -282,19 +275,19 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
} }
if runningContainers { if runningContainers {
klog.V(4).Infof( klog.V(4).InfoS("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod))
"Pod %q still has one or more containers in the non-exited state. Therefore, it will not be removed from desired state.",
format.Pod(volumeToMount.Pod))
continue continue
} }
exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
var volumeToMountSpecName string
if volumeToMount.VolumeSpec != nil {
volumeToMountSpecName = volumeToMount.VolumeSpec.Name()
}
if !exists && podExists { if !exists && podExists {
klog.V(4).Infof( klog.V(4).InfoS("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
volumeToMount.GenerateMsgDetailed(fmt.Sprintf("Actual state has not yet has this volume mounted information and pod (%q) still exists in pod manager, skip removing volume from desired state",
format.Pod(volumeToMount.Pod)), ""))
continue continue
} }
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Removing volume from desired state", "")) klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
dswp.desiredStateOfWorld.DeletePodFromVolume( dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.PodName, volumeToMount.VolumeName)
@ -332,18 +325,14 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
for _, podVolume := range pod.Spec.Volumes { for _, podVolume := range pod.Spec.Volumes {
if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) { if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
// Volume is not used in the pod, ignore it. // Volume is not used in the pod, ignore it.
klog.V(4).Infof("Skipping unused volume %q for pod %q", podVolume.Name, format.Pod(pod)) klog.V(4).InfoS("Skipping unused volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
continue continue
} }
pvc, volumeSpec, volumeGidValue, err := pvc, volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod, mounts, devices) dswp.createVolumeSpec(podVolume, pod, mounts, devices)
if err != nil { if err != nil {
klog.Errorf( klog.ErrorS(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
"Error processing volume %q for pod %q: %v",
podVolume.Name,
format.Pod(pod),
err)
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
allVolumesAdded = false allVolumesAdded = false
continue continue
@ -353,20 +342,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
_, err = dswp.desiredStateOfWorld.AddPodToVolume( _, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
if err != nil { if err != nil {
klog.Errorf( klog.ErrorS(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
"Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v",
podVolume.Name,
volumeSpec.Name(),
uniquePodName,
err)
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
allVolumesAdded = false allVolumesAdded = false
} else { } else {
klog.V(4).Infof( klog.V(4).InfoS("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
"Added volume %q (volSpec=%q) for pod %q to desired state.",
podVolume.Name,
volumeSpec.Name(),
uniquePodName)
} }
if expandInUsePV { if expandInUsePV {
@ -429,8 +409,7 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
// we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly. // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
if volumeSpec.ReadOnly { if volumeSpec.ReadOnly {
// This volume is used as read only by this pod, we don't perform resize for read only volumes. // This volume is used as read only by this pod, we don't perform resize for read only volumes.
klog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+ klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
"as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name)
return return
} }
if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) { if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) {
@ -538,11 +517,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
ephemeral = true ephemeral = true
} }
if pvcSource != nil { if pvcSource != nil {
klog.V(5).Infof( klog.V(5).InfoS("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
"Found PVC, ClaimName: %q/%q",
pod.Namespace,
pvcSource.ClaimName)
// If podVolume is a PVC, fetch the real PV behind the claim // If podVolume is a PVC, fetch the real PV behind the claim
pvc, err := dswp.getPVCExtractPV( pvc, err := dswp.getPVCExtractPV(
pod.Namespace, pvcSource.ClaimName) pod.Namespace, pvcSource.ClaimName)
@ -561,14 +536,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
) )
} }
pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID
klog.V(5).InfoS("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
klog.V(5).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
pod.Namespace,
pvcSource.ClaimName,
pvcUID,
pvName)
// Fetch actual PV object // Fetch actual PV object
volumeSpec, volumeGidValue, err := volumeSpec, volumeGidValue, err :=
dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID) dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID)
@ -579,15 +547,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
pvcSource.ClaimName, pvcSource.ClaimName,
err) err)
} }
klog.V(5).InfoS("Extracted volumeSpec from bound PV and PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName, "volumeSpecName", volumeSpec.Name())
klog.V(5).Infof(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name(),
pvName,
pod.Namespace,
pvcSource.ClaimName,
pvcUID)
migratable, err := dswp.csiMigratedPluginManager.IsMigratable(volumeSpec) migratable, err := dswp.csiMigratedPluginManager.IsMigratable(volumeSpec)
if err != nil { if err != nil {
return nil, nil, "", err return nil, nil, "", err

View File

@ -154,7 +154,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods. // desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() { if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.Infof("Reconciler: start to sync state") klog.InfoS("Reconciler: start to sync state")
rc.sync() rc.sync()
} }
} }
@ -182,7 +182,7 @@ func (rc *reconciler) unmountVolumes() {
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() { for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it // Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume( err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil && if err != nil &&
@ -190,10 +190,10 @@ func (rc *reconciler) unmountVolumes() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", "")) klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
} }
} }
} }
@ -208,7 +208,7 @@ func (rc *reconciler) mountAttachVolumes() {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume. // for controller to finish attaching volume.
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", "")) klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
err := rc.operationExecutor.VerifyControllerAttachedVolume( err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount, volumeToMount.VolumeToMount,
rc.nodeName, rc.nodeName,
@ -218,10 +218,10 @@ func (rc *reconciler) mountAttachVolumes() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", "")) klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
} }
} else { } else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
@ -231,17 +231,17 @@ func (rc *reconciler) mountAttachVolumes() {
VolumeSpec: volumeToMount.VolumeSpec, VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName, NodeName: rc.nodeName,
} }
klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", "")) klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil && if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) && !nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", "")) klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
} }
} }
} else if !volMounted || cache.IsRemountRequiredError(err) { } else if !volMounted || cache.IsRemountRequiredError(err) {
@ -251,7 +251,7 @@ func (rc *reconciler) mountAttachVolumes() {
if isRemount { if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested." remountingLogStr = "Volume is already mounted to pod, but remount was requested."
} }
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
err := rc.operationExecutor.MountVolume( err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout, rc.waitForAttachTimeout,
volumeToMount.VolumeToMount, volumeToMount.VolumeToMount,
@ -262,18 +262,18 @@ func (rc *reconciler) mountAttachVolumes() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
if remountingLogStr == "" { if remountingLogStr == "" {
klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
} else { } else {
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
} }
} }
} else if cache.IsFSResizeRequiredError(err) && } else if cache.IsFSResizeRequiredError(err) &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) { utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", "")) klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
err := rc.operationExecutor.ExpandInUseVolume( err := rc.operationExecutor.ExpandInUseVolume(
volumeToMount.VolumeToMount, volumeToMount.VolumeToMount,
rc.actualStateOfWorld) rc.actualStateOfWorld)
@ -282,10 +282,10 @@ func (rc *reconciler) mountAttachVolumes() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error()) klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
} }
if err == nil { if err == nil {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", "")) klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
} }
} }
} }
@ -298,7 +298,7 @@ func (rc *reconciler) unmountDetachDevices() {
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
if attachedVolume.DeviceMayBeMounted() { if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it // Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice( err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil) attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil && if err != nil &&
@ -306,20 +306,20 @@ func (rc *reconciler) unmountDetachDevices() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", "")) klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
} }
} else { } else {
// Volume is attached to node, detach it // Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin. // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName) rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath))) klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else { } else {
// Only detach if kubelet detach is enabled // Only detach if kubelet detach is enabled
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", "")) klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume( err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil && if err != nil &&
@ -327,10 +327,10 @@ func (rc *reconciler) unmountDetachDevices() {
!exponentialbackoff.IsExponentialBackoff(err) { !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors. // Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
} }
if err == nil { if err == nil {
klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", "")) klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
} }
} }
} }
@ -386,14 +386,14 @@ func (rc *reconciler) syncStates() {
// Get volumes information by reading the pod's directory // Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil { if err != nil {
klog.Errorf("Cannot get volumes from disk %v", err) klog.ErrorS(err, "Cannot get volumes from disk")
return return
} }
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume) volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
volumeNeedReport := []v1.UniqueVolumeName{} volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes { for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName) klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct // There is nothing to reconstruct
continue continue
} }
@ -404,11 +404,11 @@ func (rc *reconciler) syncStates() {
if volumeInDSW { if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that // Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW. // reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName) klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "pod", klog.KObj(reconstructedVolume.pod), "volumeSpecName", volume.volumeSpecName)
continue continue
} }
// No pod needs the volume. // No pod needs the volume.
klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err) klog.InfoS("Could not construct volume information, cleaning up mounts", "pod", klog.KObj(reconstructedVolume.pod), "volumeSpecName", volume.volumeSpecName, "error", err)
rc.cleanupMounts(volume) rc.cleanupMounts(volume)
continue continue
} }
@ -419,22 +419,20 @@ func (rc *reconciler) syncStates() {
// this new kubelet so reconcile() calls SetUp and re-mounts the // this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary. // volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName) klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue continue
} }
// There is no pod that uses the volume. // There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.Warning("Volume is in pending operation, skip cleaning up mounts") klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
} }
klog.V(2).Infof( klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
"Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
} }
if len(volumesNeedUpdate) > 0 { if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil { if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.Errorf("Error occurred during reconstruct volume from disk: %v", err) klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
} }
} }
if len(volumeNeedReport) > 0 { if len(volumeNeedReport) > 0 {
@ -443,8 +441,7 @@ func (rc *reconciler) syncStates() {
} }
func (rc *reconciler) cleanupMounts(volume podVolume) { func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points", klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
volume.podName, volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{ mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName, PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName), VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
@ -456,7 +453,7 @@ func (rc *reconciler) cleanupMounts(volume podVolume) {
// to unmount both volume and device in the same routine. // to unmount both volume and device in the same routine.
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil { if err != nil {
klog.Errorf(mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error()) klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return return
} }
} }
@ -600,13 +597,13 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) { func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil { if fetchErr != nil {
klog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr) klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else { } else {
for _, attachedVolume := range node.Status.VolumesAttached { for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists { if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).Infof("Update devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath) klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
} }
} }
} }
@ -636,7 +633,7 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108 //TODO: the devicePath might not be correct for some volume plugins: see issue #54108
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath) volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
if err != nil { if err != nil {
klog.Errorf("Could not add volume information to actual state of world: %v", err) klog.ErrorS(err, "Could not add volume information to actual state of world")
continue continue
} }
markVolumeOpts := operationexecutor.MarkVolumeOpts{ markVolumeOpts := operationexecutor.MarkVolumeOpts{
@ -652,23 +649,23 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
} }
err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if err != nil { if err != nil {
klog.Errorf("Could not add pod to volume information to actual state of world: %v", err) klog.ErrorS(err, "Could not add pod to volume information to actual state of world")
continue continue
} }
klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName) klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "podName", volume.podName, "volumeName", volume.volumeName)
// If the volume has device to mount, we mark its device as mounted. // If the volume has device to mount, we mark its device as mounted.
if volume.deviceMounter != nil || volume.blockVolumeMapper != nil { if volume.deviceMounter != nil || volume.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(volume) deviceMountPath, err := getDeviceMountPath(volume)
if err != nil { if err != nil {
klog.Errorf("Could not find device mount path for volume %s", volume.volumeName) klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", volume.volumeName)
continue continue
} }
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath) err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
if err != nil { if err != nil {
klog.Errorf("Could not mark device is mounted to actual state of world: %v", err) klog.ErrorS(err, "Could not mark device is mounted to actual state of world")
continue continue
} }
klog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName) klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "podName", volume.podName, "volumeName", volume.volumeName)
} }
} }
return nil return nil
@ -710,13 +707,13 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
volumePluginPath := path.Join(volumesDir, pluginName) volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath) volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil { if err != nil {
klog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err) klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue continue
} }
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName) unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs { for _, volumeName := range volumePluginDirs {
volumePath := path.Join(volumePluginPath, volumeName) volumePath := path.Join(volumePluginPath, volumeName)
klog.V(5).Infof("podName: %v, volume path from volume plugin directory: %v, ", podName, volumePath) klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{ volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName), podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName, volumeSpecName: volumeName,
@ -728,6 +725,6 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
} }
} }
} }
klog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes) klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes)
return volumes, nil return volumes, nil
} }

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics" "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator" "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
@ -267,15 +266,15 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str
} }
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh) go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
klog.V(2).Infof("The desired_state_of_world populator starts") klog.V(2).InfoS("The desired_state_of_world populator starts")
klog.Infof("Starting Kubelet Volume Manager") klog.InfoS("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh) go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr) metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
<-stopCh <-stopCh
klog.Infof("Shutting down Kubelet Volume Manager") klog.InfoS("Shutting down Kubelet Volume Manager")
} }
func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
@ -371,7 +370,7 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
return nil return nil
} }
klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod)) klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
uniquePodName := util.GetUniquePodName(pod) uniquePodName := util.GetUniquePodName(pod)
// Some pods expect to have Setup called over and over again to update. // Some pods expect to have Setup called over and over again to update.
@ -402,7 +401,7 @@ func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
err) err)
} }
klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod)) klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
return nil return nil
} }