Migrate pkg/kubelet/kubelet.go to structured logging

This commit is contained in:
Navid Shaikh 2021-03-06 01:23:11 +05:30
parent 80ff14a47d
commit be91ea5bd1

View File

@ -272,18 +272,18 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
klog.InfoS("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
@ -307,7 +307,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
klog.Warningf("Using dockershim is deprecated, please consider using a full-fledged CRI implementation")
klog.InfoS("Using dockershim is deprecated, please consider using a full-fledged CRI implementation")
if err := runDockershim(
kubeCfg,
kubeDeps,
@ -458,17 +458,17 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeInformers.Core().V1().Nodes().Informer().HasSynced() {
return true
}
klog.Infof("kubelet nodes not sync")
klog.InfoS("Kubelet nodes not sync")
return false
}
kubeInformers.Start(wait.NeverStop)
klog.Info("Kubelet client is not nil")
klog.InfoS("Kubelet client is not nil")
} else {
// we dont have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.Info("Kubelet client is nil")
klog.InfoS("Kubelet client is nil")
}
// construct a node reference used for events
@ -488,7 +488,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := net.ParseIP(ipEntry)
if ip == nil {
klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
@ -576,7 +576,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
klog.Info("Experimental host user namespace defaulting is enabled.")
klog.InfoS("Experimental host user namespace defaulting is enabled")
}
machineInfo, err := klet.cadvisor.MachineInfo()
@ -693,7 +693,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
klog.Errorf("Pod CIDR update failed %v", err)
klog.ErrorS(err, "Pod CIDR update failed")
}
// setup containerGC
@ -1267,11 +1267,11 @@ func (kl *Kubelet) setupDataDirs() error {
if selinux.SELinuxEnabled() {
err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginRegistrationDir, err)
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
}
err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginsDir, err)
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
}
}
return nil
@ -1282,7 +1282,7 @@ func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
klog.Errorf("Container garbage collection failed: %v", err)
klog.ErrorS(err, "Container garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
@ -1292,13 +1292,13 @@ func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure = false
}
klog.V(vLevel).Infof("Container garbage collection succeeded")
klog.V(vLevel).InfoS("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
klog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
return
}
@ -1306,11 +1306,11 @@ func (kl *Kubelet) StartGarbageCollection() {
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
if prevImageGCFailed {
klog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
// Only create an event for repeated failures
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
klog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
}
prevImageGCFailed = true
} else {
@ -1320,7 +1320,7 @@ func (kl *Kubelet) StartGarbageCollection() {
prevImageGCFailed = false
}
klog.V(vLevel).Infof("Image garbage collection succeeded")
klog.V(vLevel).InfoS("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}
@ -1371,7 +1371,8 @@ func (kl *Kubelet) initializeModules() error {
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.Fatalf("Failed to start cAdvisor %v", err)
klog.ErrorS(err, "Failed to start cAdvisor")
os.Exit(1)
}
// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
@ -1381,12 +1382,14 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
node, err := kl.getNodeAnyWay()
if err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.Fatalf("Kubelet failed to get node info: %v", err)
klog.ErrorS(err, "Kubelet failed to get node info")
os.Exit(1)
}
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.Fatalf("Failed to start ContainerManager %v", err)
klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
@ -1399,13 +1402,13 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager
klog.V(4).Infof("starting plugin manager")
klog.V(4).InfoS("Starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
err = kl.shutdownManager.Start()
if err != nil {
// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
klog.Errorf("Failed to start node shutdown manager: %v", err)
klog.ErrorS(err, "Failed to start node shutdown manager")
}
}
@ -1415,7 +1418,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
@ -1425,7 +1428,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
klog.ErrorS(err, "failed to intialize internal modules")
os.Exit(1)
}
// Start volume manager
@ -1539,7 +1543,9 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
klog.V(3).InfoS("First seen time not recorded for pod",
"podUID", pod.UID,
"pod", klog.KObj(pod))
}
}
@ -1634,7 +1640,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
podKilled = true
} else {
klog.Errorf("killPod for pod %q (podStatus=%v) failed: %v", format.Pod(pod), podStatus, err)
klog.ErrorS(err, "killPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
}
}
// Create and Update pod's Cgroups
@ -1647,7 +1653,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
@ -1664,24 +1670,24 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
var err error
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil || node.DeletionTimestamp != nil {
klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
@ -1690,7 +1696,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return err
}
@ -1699,7 +1705,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return err
}
}
@ -1844,7 +1850,7 @@ func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.Info("Starting kubelet main sync loop.")
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
@ -1868,7 +1874,7 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.Errorf("skipping pod synchronization - %v", err)
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
@ -1924,7 +1930,7 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
@ -1951,9 +1957,9 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
@ -1968,11 +1974,11 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
@ -2005,11 +2011,11 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
klog.V(4).Infof("SyncLoop (housekeeping)")
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
klog.ErrorS(err, "Failed cleaning pods")
}
}
}
@ -2038,7 +2044,7 @@ func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mir
// check whether we are ready to delete the pod from the API server (all status up to date)
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
if pod.DeletionTimestamp != nil && containersTerminal {
klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
klog.V(4).Infof("Pod has completed execution and should be deleted from the API server", "pod", klog.KObj(pod), "syncType", syncType)
kl.statusManager.TerminatePod(pod)
return
}
@ -2143,7 +2149,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
klog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
kl.probeManager.RemovePod(pod)
}
@ -2202,19 +2208,19 @@ func (kl *Kubelet) updateRuntimeUp() {
s, err := kl.containerRuntime.Status()
if err != nil {
klog.Errorf("Container runtime sanity check failed: %v", err)
klog.ErrorS(err, "Container runtime sanity check failed")
return
}
if s == nil {
klog.Errorf("Container runtime status is nil")
klog.ErrorS(nil, "Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
klog.V(4).Infof("Container runtime status: %v", s)
klog.V(4).InfoS("Container runtime status", "status", s)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
klog.Errorf("Container runtime network not ready: %v", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
@ -2223,9 +2229,8 @@ func (kl *Kubelet) updateRuntimeUp() {
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
err := fmt.Errorf("Container runtime not ready: %v", runtimeReady)
klog.Error(err)
kl.runtimeState.setRuntimeState(err)
klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}
kl.runtimeState.setRuntimeState(nil)
@ -2264,7 +2269,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
func (kl *Kubelet) ListenAndServePodResources() {
socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager)
@ -2294,13 +2299,13 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.Errorf(err.Error())
klog.ErrorS(err, "Error getting node")
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
continue
}
kl.updateRuntimeUp()