From 2b1f38a83b5f8d6b8a4d3fff528d043ee5c3b119 Mon Sep 17 00:00:00 2001 From: zhangzhifei16 Date: Tue, 22 Jul 2025 10:57:04 +0800 Subject: [PATCH] chore(kubelet): migrate config to contextual logging. --- hack/golangci-hints.yaml | 1 + hack/golangci.yaml | 1 + hack/logcheck.conf | 1 + pkg/kubelet/config/apiserver.go | 10 ++--- pkg/kubelet/config/common.go | 20 +++++----- pkg/kubelet/config/common_test.go | 20 ++++++---- pkg/kubelet/config/config.go | 36 +++++++++-------- pkg/kubelet/config/file.go | 54 +++++++++++++------------- pkg/kubelet/config/file_linux.go | 18 ++++----- pkg/kubelet/config/file_linux_test.go | 20 ++++++---- pkg/kubelet/config/file_test.go | 7 +++- pkg/kubelet/config/file_unsupported.go | 6 +-- pkg/kubelet/config/http.go | 28 ++++++------- pkg/kubelet/config/http_test.go | 17 +++++--- pkg/kubelet/config/mux.go | 11 +++--- pkg/kubelet/config/mux_test.go | 27 +++++++------ pkg/kubelet/kubelet.go | 8 ++-- 17 files changed, 160 insertions(+), 125 deletions(-) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 4f2d0910ea6..4bb77316bcc 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -209,6 +209,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* + contextual k8s.io/kubernetes/pkg/kubelet/config/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 314012cf5df..b387c8e9060 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -223,6 +223,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* + contextual k8s.io/kubernetes/pkg/kubelet/config/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 2c27499bb5e..3c48704c4a6 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -55,6 +55,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* +contextual k8s.io/kubernetes/pkg/kubelet/config/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index b67f6c34fec..60c040299c4 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -34,22 +34,22 @@ import ( const WaitForAPIServerSyncPeriod = 1 * time.Second // NewSourceApiserver creates a config source that watches and pulls from the apiserver. -func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) { +func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName))) // The Reflector responsible for watching pods at the apiserver should be run only after // the node sync with the apiserver has completed. - klog.InfoS("Waiting for node sync before watching apiserver pods") + logger.Info("Waiting for node sync before watching apiserver pods") go func() { for { if nodeHasSynced() { - klog.V(4).InfoS("node sync completed") + logger.V(4).Info("node sync completed") break } time.Sleep(WaitForAPIServerSyncPeriod) - klog.V(4).InfoS("node sync has not completed yet") + logger.V(4).Info("node sync has not completed yet") } - klog.InfoS("Watching apiserver") + logger.Info("Watching apiserver") newSourceApiserverFromLW(lw, updates) }() } diff --git a/pkg/kubelet/config/common.go b/pkg/kubelet/config/common.go index 48f6ba77ada..3aeb35321a9 100644 --- a/pkg/kubelet/config/common.go +++ b/pkg/kubelet/config/common.go @@ -57,7 +57,7 @@ func generatePodName(name string, nodeName types.NodeName) string { return fmt.Sprintf("%s-%s", name, strings.ToLower(string(nodeName))) } -func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error { +func applyDefaults(logger klog.Logger, pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error { if len(pod.UID) == 0 { hasher := md5.New() hash.DeepHashObject(hasher, pod) @@ -70,16 +70,16 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.Node fmt.Fprintf(hasher, "url:%s", source) } pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:])) - klog.V(5).InfoS("Generated UID", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source) + logger.V(5).Info("Generated UID", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source) } pod.Name = generatePodName(pod.Name, nodeName) - klog.V(5).InfoS("Generated pod name", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source) + logger.V(5).Info("Generated pod name", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source) if pod.Namespace == "" { pod.Namespace = metav1.NamespaceDefault } - klog.V(5).InfoS("Set namespace for pod", "pod", klog.KObj(pod), "source", source) + logger.V(5).Info("Set namespace for pod", "pod", klog.KObj(pod), "source", source) // Set the Host field to indicate this pod is scheduled on the current node. pod.Spec.NodeName = string(nodeName) @@ -104,7 +104,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.Node return nil } -type defaultFunc func(pod *api.Pod) error +type defaultFunc func(logger klog.Logger, pod *api.Pod) error // A static pod tried to use a ClusterTrustBundle projected volume source. var ErrStaticPodTriedToUseClusterTrustBundle = errors.New("static pods may not use ClusterTrustBundle projected volume sources") @@ -113,7 +113,7 @@ var ErrStaticPodTriedToUseClusterTrustBundle = errors.New("static pods may not u var ErrStaticPodTriedToUseResourceClaims = errors.New("static pods may not use ResourceClaims") // tryDecodeSinglePod takes data and tries to extract valid Pod config information from it. -func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) { +func tryDecodeSinglePod(logger klog.Logger, data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) { // JSON is valid YAML, so this should work for everything. json, err := utilyaml.ToJSON(data) if err != nil { @@ -135,7 +135,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v } // Apply default values and validate the pod. - if err = defaultFn(newPod); err != nil { + if err = defaultFn(logger, newPod); err != nil { return true, pod, err } if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 { @@ -143,7 +143,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v } v1Pod := &v1.Pod{} if err := k8s_api_v1.Convert_core_Pod_To_v1_Pod(newPod, v1Pod, nil); err != nil { - klog.ErrorS(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod)) + logger.Error(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod)) return true, nil, err } @@ -177,7 +177,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v return true, v1Pod, nil } -func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods v1.PodList, err error) { +func tryDecodePodList(logger klog.Logger, data []byte, defaultFn defaultFunc) (parsed bool, pods v1.PodList, err error) { obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data) if err != nil { return false, pods, err @@ -196,7 +196,7 @@ func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods v1. if newPod.Name == "" { return true, pods, fmt.Errorf("invalid pod: name is needed for the pod") } - if err = defaultFn(newPod); err != nil { + if err = defaultFn(logger, newPod); err != nil { return true, pods, err } if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 { diff --git a/pkg/kubelet/config/common_test.go b/pkg/kubelet/config/common_test.go index 4a7c98250ca..19e26814d62 100644 --- a/pkg/kubelet/config/common_test.go +++ b/pkg/kubelet/config/common_test.go @@ -29,6 +29,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/api/legacyscheme" podtest "k8s.io/kubernetes/pkg/api/pod/testing" "k8s.io/kubernetes/pkg/apis/core" @@ -37,9 +39,10 @@ import ( "k8s.io/utils/ptr" ) -func noDefault(*core.Pod) error { return nil } +func noDefault(klog.Logger, *core.Pod) error { return nil } func TestDecodeSinglePod(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) grace := int64(30) enableServiceLinks := v1.DefaultEnableServiceLinks pod := &v1.Pod{ @@ -80,7 +83,7 @@ func TestDecodeSinglePod(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - parsed, podOut, err := tryDecodeSinglePod(json, noDefault) + parsed, podOut, err := tryDecodeSinglePod(logger, json, noDefault) if !parsed { t.Errorf("expected to have parsed file: (%s)", string(json)) } @@ -98,7 +101,7 @@ func TestDecodeSinglePod(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - parsed, podOut, err = tryDecodeSinglePod(yaml, noDefault) + parsed, podOut, err = tryDecodeSinglePod(logger, yaml, noDefault) if !parsed { t.Errorf("expected to have parsed file: (%s)", string(yaml)) } @@ -112,6 +115,7 @@ func TestDecodeSinglePod(t *testing.T) { } func TestDecodeSinglePodRejectsClusterTrustBundleVolumes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) grace := int64(30) enableServiceLinks := v1.DefaultEnableServiceLinks pod := &v1.Pod{ @@ -175,13 +179,14 @@ func TestDecodeSinglePodRejectsClusterTrustBundleVolumes(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - _, _, err = tryDecodeSinglePod(json, noDefault) + _, _, err = tryDecodeSinglePod(logger, json, noDefault) if !strings.Contains(err.Error(), "may not reference clustertrustbundles") { t.Errorf("Got error %q, want %q", err, fmt.Errorf("static pods may not reference clustertrustbundles API objects")) } } func TestDecodeSinglePodRejectsResourceClaims(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) grace := int64(30) enableServiceLinks := v1.DefaultEnableServiceLinks pod := &v1.Pod{ @@ -231,13 +236,14 @@ func TestDecodeSinglePodRejectsResourceClaims(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - _, _, err = tryDecodeSinglePod(json, noDefault) + _, _, err = tryDecodeSinglePod(logger, json, noDefault) if !strings.Contains(err.Error(), "may not reference resourceclaims") { t.Errorf("Got error %q, want %q", err, fmt.Errorf("static pods may not reference resourceclaims API objects")) } } func TestDecodePodList(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) grace := int64(30) enableServiceLinks := v1.DefaultEnableServiceLinks pod := &v1.Pod{ @@ -282,7 +288,7 @@ func TestDecodePodList(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - parsed, podListOut, err := tryDecodePodList(json, noDefault) + parsed, podListOut, err := tryDecodePodList(logger, json, noDefault) if !parsed { t.Errorf("expected to have parsed file: (%s)", string(json)) } @@ -301,7 +307,7 @@ func TestDecodePodList(t *testing.T) { t.Errorf("unexpected error: %v", err) } - parsed, podListOut, err = tryDecodePodList(yaml, noDefault) + parsed, podListOut, err = tryDecodePodList(logger, yaml, noDefault) if !parsed { t.Errorf("expected to have parsed file: (%s): %v", string(yaml), err) continue diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 10a42c82322..dc0728c8f36 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -99,9 +99,12 @@ func (c *PodConfig) SeenAllSources(seenSources sets.Set[string]) bool { if c.pods == nil { return false } + // Use klog.TODO() because we currently do not have a proper context to pass in. + // Replace this with an appropriate logger when refactoring this function to accept a logger parameter. + logger := klog.TODO() c.sourcesLock.Lock() defer c.sourcesLock.Unlock() - klog.V(5).InfoS("Looking for sources, have seen", "sources", sets.List(c.sources), "seenSources", seenSources) + logger.V(5).Info("Looking for sources, have seen", "sources", sets.List(c.sources), "seenSources", seenSources) return seenSources.HasAll(sets.List(c.sources)...) && c.pods.seenSources(sets.List(c.sources)...) } @@ -157,12 +160,12 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio // Merge normalizes a set of incoming changes from different sources into a map of all Pods // and ensures that redundant changes are filtered out, and then pushes zero or more minimal // updates onto the update channel. Ensures that updates are delivered in order. -func (s *podStorage) Merge(source string, change interface{}) error { +func (s *podStorage) Merge(ctx context.Context, source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes, removes, reconciles := s.merge(source, change) + adds, updates, deletes, removes, reconciles := s.merge(ctx, source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications @@ -216,9 +219,10 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { +func (s *podStorage) merge(ctx context.Context, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() + logger := klog.FromContext(ctx) addPods := []*v1.Pod{} updatePods := []*v1.Pod{} @@ -235,7 +239,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de // After updated, new pod will be stored in the pod cache *pods*. // Notice that *pods* and *oldPods* could be the same cache. updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) { - filtered := filterInvalidPods(newPods, source, s.recorder) + filtered := filterInvalidPods(logger, newPods, source, s.recorder) for _, ref := range filtered { // Annotate the pod with the source before any comparison. if ref.Annotations == nil { @@ -258,7 +262,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } continue } - recordFirstSeenTime(ref) + recordFirstSeenTime(logger, ref) pods[ref.UID] = ref addPods = append(addPods, ref) } @@ -268,16 +272,16 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de switch update.Op { case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE: if update.Op == kubetypes.ADD { - klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) + logger.V(4).Info("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) } else if update.Op == kubetypes.DELETE { - klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) + logger.V(4).Info("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) } else { - klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) + logger.V(4).Info("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) } updatePodsFunc(update.Pods, pods, pods) case kubetypes.REMOVE: - klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) + logger.V(4).Info("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) for _, value := range update.Pods { if existing, found := pods[value.UID]; found { // this is a delete @@ -289,7 +293,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } case kubetypes.SET: - klog.V(4).InfoS("Setting pods for source", "source", source) + logger.V(4).Info("Setting pods for source", "source", source) s.markSourceSet(source) // Clear the old map entries by just creating a new map oldPods := pods @@ -303,7 +307,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } default: - klog.InfoS("Received invalid update type", "type", update) + logger.Info("Received invalid update type", "type", update) } @@ -330,14 +334,14 @@ func (s *podStorage) seenSources(sources ...string) bool { return s.sourcesSeen.HasAll(sources...) } -func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) { +func filterInvalidPods(logger klog.Logger, pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) { names := sets.Set[string]{} for i, pod := range pods { // Pods from each source are assumed to have passed validation individually. // This function only checks if there is any naming conflict. name := kubecontainer.GetPodFullName(pod) if names.Has(name) { - klog.InfoS("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source) + logger.Info("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source) recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s due to duplicate pod name %q, ignoring", format.Pod(pod), source, pod.Name) continue } else { @@ -393,8 +397,8 @@ func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool { } // recordFirstSeenTime records the first seen time of this pod. -func recordFirstSeenTime(pod *v1.Pod) { - klog.V(4).InfoS("Receiving a new pod", "pod", klog.KObj(pod)) +func recordFirstSeenTime(logger klog.Logger, pod *v1.Pod) { + logger.V(4).Info("Receiving a new pod", "pod", klog.KObj(pod)) pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = kubetypes.NewTimestamp().GetString() } diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 79e2af6ed62..bf48553acbc 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -60,13 +60,13 @@ type sourceFile struct { } // NewSourceFile watches a config file for changes. -func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { +func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { // "github.com/sigma/go-inotify" requires a path without trailing "/" path = strings.TrimRight(path, string(os.PathSeparator)) config := newSourceFile(path, nodeName, period, updates) - klog.V(1).InfoS("Watching path", "path", path) - config.run() + logger.V(1).Info("Watching path", "path", path) + config.run(logger) } func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile { @@ -89,36 +89,36 @@ func newSourceFile(path string, nodeName types.NodeName, period time.Duration, u } } -func (s *sourceFile) run() { +func (s *sourceFile) run(logger klog.Logger) { listTicker := time.NewTicker(s.period) go func() { // Read path immediately to speed up startup. - if err := s.listConfig(); err != nil { - klog.ErrorS(err, "Unable to read config path", "path", s.path) + if err := s.listConfig(logger); err != nil { + logger.Error(err, "Unable to read config path", "path", s.path) } for { select { case <-listTicker.C: - if err := s.listConfig(); err != nil { - klog.ErrorS(err, "Unable to read config path", "path", s.path) + if err := s.listConfig(logger); err != nil { + logger.Error(err, "Unable to read config path", "path", s.path) } case e := <-s.watchEvents: - if err := s.consumeWatchEvent(e); err != nil { - klog.ErrorS(err, "Unable to process watch event") + if err := s.consumeWatchEvent(logger, e); err != nil { + logger.Error(err, "Unable to process watch event") } } } }() - s.startWatch() + s.startWatch(logger) } -func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { - return applyDefaults(pod, source, true, s.nodeName) +func (s *sourceFile) applyDefaults(logger klog.Logger, pod *api.Pod, source string) error { + return applyDefaults(logger, pod, source, true, s.nodeName) } -func (s *sourceFile) listConfig() error { +func (s *sourceFile) listConfig(logger klog.Logger) error { path := s.path statInfo, err := os.Stat(path) if err != nil { @@ -132,7 +132,7 @@ func (s *sourceFile) listConfig() error { switch { case statInfo.Mode().IsDir(): - pods, err := s.extractFromDir(path) + pods, err := s.extractFromDir(logger, path) if err != nil { return err } @@ -144,7 +144,7 @@ func (s *sourceFile) listConfig() error { return s.replaceStore(pods...) case statInfo.Mode().IsRegular(): - pod, err := s.extractFromFile(path) + pod, err := s.extractFromFile(logger, path) if err != nil { return err } @@ -158,7 +158,7 @@ func (s *sourceFile) listConfig() error { // Get as many pod manifests as we can from a directory. Return an error if and only if something // prevented us from reading anything at all. Do not return an error if only some files // were problematic. -func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) { +func (s *sourceFile) extractFromDir(logger klog.Logger, name string) ([]*v1.Pod, error) { dirents, err := filepath.Glob(filepath.Join(name, "[^.]*")) if err != nil { return nil, fmt.Errorf("glob failed: %v", err) @@ -173,32 +173,32 @@ func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) { for _, path := range dirents { statInfo, err := os.Stat(path) if err != nil { - klog.ErrorS(err, "Could not get metadata", "path", path) + logger.Error(err, "Could not get metadata", "path", path) continue } switch { case statInfo.Mode().IsDir(): - klog.ErrorS(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path) + logger.Error(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path) case statInfo.Mode().IsRegular(): - pod, err := s.extractFromFile(path) + pod, err := s.extractFromFile(logger, path) if err != nil { if !os.IsNotExist(err) { - klog.ErrorS(err, "Could not process manifest file", "path", path) + logger.Error(err, "Could not process manifest file", "path", path) } } else { pods = append(pods, pod) } default: - klog.ErrorS(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode()) + logger.Error(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode()) } } return pods, nil } // extractFromFile parses a file for Pod configuration information. -func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { - klog.V(3).InfoS("Reading config file", "path", filename) +func (s *sourceFile) extractFromFile(logger klog.Logger, filename string) (pod *v1.Pod, err error) { + logger.V(3).Info("Reading config file", "path", filename) defer func() { if err == nil && pod != nil { objKey, keyErr := cache.MetaNamespaceKeyFunc(pod) @@ -221,11 +221,11 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { return pod, err } - defaultFn := func(pod *api.Pod) error { - return s.applyDefaults(pod, filename) + defaultFn := func(logger klog.Logger, pod *api.Pod) error { + return s.applyDefaults(logger, pod, filename) } - parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn) + parsed, pod, podErr := tryDecodeSinglePod(logger, data, defaultFn) if parsed { if podErr != nil { return pod, podErr diff --git a/pkg/kubelet/config/file_linux.go b/pkg/kubelet/config/file_linux.go index 42d86f86872..f05010f8ec0 100644 --- a/pkg/kubelet/config/file_linux.go +++ b/pkg/kubelet/config/file_linux.go @@ -48,7 +48,7 @@ func (e *retryableError) Error() string { return e.message } -func (s *sourceFile) startWatch() { +func (s *sourceFile) startWatch(logger klog.Logger) { backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod) backOffID := "watch" @@ -57,8 +57,8 @@ func (s *sourceFile) startWatch() { return } - if err := s.doWatch(); err != nil { - klog.ErrorS(err, "Unable to read config path", "path", s.path) + if err := s.doWatch(logger); err != nil { + logger.Error(err, "Unable to read config path", "path", s.path) if _, retryable := err.(*retryableError); !retryable { backOff.Next(backOffID, time.Now()) } @@ -66,7 +66,7 @@ func (s *sourceFile) startWatch() { }, retryPeriod) } -func (s *sourceFile) doWatch() error { +func (s *sourceFile) doWatch(logger klog.Logger) error { _, err := os.Stat(s.path) if err != nil { if !os.IsNotExist(err) { @@ -91,7 +91,7 @@ func (s *sourceFile) doWatch() error { for { select { case event := <-w.Events: - if err = s.produceWatchEvent(&event); err != nil { + if err = s.produceWatchEvent(logger, &event); err != nil { return fmt.Errorf("error while processing inotify event (%+v): %v", event, err) } case err = <-w.Errors: @@ -100,10 +100,10 @@ func (s *sourceFile) doWatch() error { } } -func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error { +func (s *sourceFile) produceWatchEvent(logger klog.Logger, e *fsnotify.Event) error { // Ignore file start with dots if strings.HasPrefix(filepath.Base(e.Name), ".") { - klog.V(4).InfoS("Ignored pod manifest, because it starts with dots", "eventName", e.Name) + logger.V(4).Info("Ignored pod manifest, because it starts with dots", "eventName", e.Name) return nil } var eventType podEventType @@ -127,10 +127,10 @@ func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error { return nil } -func (s *sourceFile) consumeWatchEvent(e *watchEvent) error { +func (s *sourceFile) consumeWatchEvent(logger klog.Logger, e *watchEvent) error { switch e.eventType { case podAdd, podModify: - pod, err := s.extractFromFile(e.fileName) + pod, err := s.extractFromFile(logger, e.fileName) if err != nil { return fmt.Errorf("can't process config file %q: %v", e.fileName, err) } diff --git a/pkg/kubelet/config/file_linux_test.go b/pkg/kubelet/config/file_linux_test.go index bcc97de1754..b7e17eb6550 100644 --- a/pkg/kubelet/config/file_linux_test.go +++ b/pkg/kubelet/config/file_linux_test.go @@ -40,20 +40,23 @@ import ( "k8s.io/kubernetes/pkg/apis/core/validation" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestExtractFromNonExistentFile(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}, 1) lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch) - err := lw.doWatch() + err := lw.doWatch(logger) if err == nil { t.Errorf("Expected error") } } func TestUpdateOnNonExistentFile(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}) - NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch) + NewSourceFile(logger, "random_non_existent_path", "localhost", time.Millisecond, ch) select { case got := <-ch: update := got.(kubetypes.PodUpdate) @@ -68,6 +71,7 @@ func TestUpdateOnNonExistentFile(t *testing.T) { } func TestReadPodsFromFileExistAlready(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) hostname := types.NodeName("random-test-hostname") var testCases = getTestCases(hostname) @@ -81,7 +85,7 @@ func TestReadPodsFromFileExistAlready(t *testing.T) { file := testCase.writeToFile(dirName, "test_pod_manifest", t) ch := make(chan interface{}) - NewSourceFile(file, hostname, time.Millisecond, ch) + NewSourceFile(logger, file, hostname, time.Millisecond, ch) select { case got := <-ch: update := got.(kubetypes.PodUpdate) @@ -228,6 +232,7 @@ func createSymbolicLink(link, target, name string, t *testing.T) string { } func watchFileAdded(watchDir bool, symlink bool, t *testing.T) { + logger, _ := ktesting.NewTestContext(t) hostname := types.NodeName("random-test-hostname") var testCases = getTestCases(hostname) @@ -253,9 +258,9 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) { ch := make(chan interface{}) if watchDir { - NewSourceFile(dirName, hostname, 100*time.Millisecond, ch) + NewSourceFile(logger, dirName, hostname, 100*time.Millisecond, ch) } else { - NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch) + NewSourceFile(logger, filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch) } expectEmptyUpdate(t, ch) @@ -281,6 +286,7 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) { } func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *testing.T) { + logger, _ := ktesting.NewTestContext(t) hostname := types.NodeName("random-test-hostname") var testCases = getTestCases(hostname) @@ -319,9 +325,9 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test }() if watchDir { - NewSourceFile(dirName, hostname, period, ch) + NewSourceFile(logger, dirName, hostname, period, ch) } else { - NewSourceFile(file, hostname, period, ch) + NewSourceFile(logger, file, hostname, period, ch) } // await fsnotify to be ready diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index e0b7251788f..8aecb35af78 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -24,6 +24,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestExtractFromBadDataFile(t *testing.T) { @@ -33,6 +34,7 @@ func TestExtractFromBadDataFile(t *testing.T) { } defer removeAll(dirName, t) + logger, _ := ktesting.NewTestContext(t) fileName := filepath.Join(dirName, "test_pod_config") err = os.WriteFile(fileName, []byte{1, 2, 3}, 0555) if err != nil { @@ -41,7 +43,7 @@ func TestExtractFromBadDataFile(t *testing.T) { ch := make(chan interface{}, 1) lw := newSourceFile(fileName, "localhost", time.Millisecond, ch) - err = lw.listConfig() + err = lw.listConfig(logger) if err == nil { t.Fatalf("expected error, got nil") } @@ -55,9 +57,10 @@ func TestExtractFromEmptyDir(t *testing.T) { } defer removeAll(dirName, t) + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}, 1) lw := newSourceFile(dirName, "localhost", time.Millisecond, ch) - err = lw.listConfig() + err = lw.listConfig(logger) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/kubelet/config/file_unsupported.go b/pkg/kubelet/config/file_unsupported.go index 7d5c77c28d7..1b2c56a6660 100644 --- a/pkg/kubelet/config/file_unsupported.go +++ b/pkg/kubelet/config/file_unsupported.go @@ -25,10 +25,10 @@ import ( "k8s.io/klog/v2" ) -func (s *sourceFile) startWatch() { - klog.ErrorS(nil, "Watching source file is unsupported in this build") +func (s *sourceFile) startWatch(logger klog.Logger) { + logger.Error(nil, "Watching source file is unsupported in this build") } -func (s *sourceFile) consumeWatchEvent(e *watchEvent) error { +func (s *sourceFile) consumeWatchEvent(logger klog.Logger, e *watchEvent) error { return fmt.Errorf("consuming watch event is unsupported in this build") } diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index b3760878143..123f5edf01e 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -43,7 +43,7 @@ type sourceURL struct { } // NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes. -func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { +func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { config := &sourceURL{ url: url, header: header, @@ -54,35 +54,35 @@ func NewSourceURL(url string, header http.Header, nodeName types.NodeName, perio // read the manifest URL passed to kubelet. client: &http.Client{Timeout: 10 * time.Second}, } - klog.V(1).InfoS("Watching URL", "URL", url) - go wait.Until(config.run, period, wait.NeverStop) + logger.V(1).Info("Watching URL", "URL", url) + go wait.Until(func() { config.run(logger) }, period, wait.NeverStop) } -func (s *sourceURL) run() { - if err := s.extractFromURL(); err != nil { +func (s *sourceURL) run(logger klog.Logger) { + if err := s.extractFromURL(logger); err != nil { // Don't log this multiple times per minute. The first few entries should be // enough to get the point across. if s.failureLogs < 3 { - klog.InfoS("Failed to read pods from URL", "err", err) + logger.Info("Failed to read pods from URL", "err", err) } else if s.failureLogs == 3 { - klog.InfoS("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err) + logger.Info("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err) } else { - klog.V(4).InfoS("Failed to read pods from URL", "err", err) + logger.V(4).Info("Failed to read pods from URL", "err", err) } s.failureLogs++ } else { if s.failureLogs > 0 { - klog.InfoS("Successfully read pods from URL") + logger.Info("Successfully read pods from URL") s.failureLogs = 0 } } } -func (s *sourceURL) applyDefaults(pod *api.Pod) error { - return applyDefaults(pod, s.url, false, s.nodeName) +func (s *sourceURL) applyDefaults(logger klog.Logger, pod *api.Pod) error { + return applyDefaults(logger, pod, s.url, false, s.nodeName) } -func (s *sourceURL) extractFromURL() error { +func (s *sourceURL) extractFromURL(logger klog.Logger) error { req, err := http.NewRequest("GET", s.url, nil) if err != nil { return err @@ -112,7 +112,7 @@ func (s *sourceURL) extractFromURL() error { s.data = data // First try as it is a single pod. - parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults) + parsed, pod, singlePodErr := tryDecodeSinglePod(logger, data, s.applyDefaults) if parsed { if singlePodErr != nil { // It parsed but could not be used. @@ -123,7 +123,7 @@ func (s *sourceURL) extractFromURL() error { } // That didn't work, so try a list of pods. - parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults) + parsed, podList, multiPodErr := tryDecodePodList(logger, data, s.applyDefaults) if parsed { if multiPodErr != nil { // It parsed but could not be used. diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index 879accb770d..d637451e2f2 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -34,11 +34,13 @@ import ( k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/apis/core/validation" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestURLErrorNotExistNoUpdate(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}) - NewSourceURL("http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch) + NewSourceURL(logger, "http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch) select { case got := <-ch: t.Errorf("Expected no update, Got %#v", got) @@ -47,15 +49,17 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) { } func TestExtractFromHttpBadness(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}, 1) c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0, http.DefaultClient} - if err := c.extractFromURL(); err == nil { + if err := c.extractFromURL(logger); err == nil { t.Errorf("Expected error") } expectEmptyChannel(t, ch) } func TestExtractInvalidPods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) var testCases = []struct { desc string pod *v1.Pod @@ -118,13 +122,14 @@ func TestExtractInvalidPods(t *testing.T) { defer testServer.Close() ch := make(chan interface{}, 1) c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0, http.DefaultClient} - if err := c.extractFromURL(); err == nil { + if err := c.extractFromURL(logger); err == nil { t.Errorf("%s: Expected error", testCase.desc) } } } func TestExtractPodsFromHTTP(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) nodeName := "different-value" grace := int64(30) @@ -301,7 +306,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { defer testServer.Close() ch := make(chan interface{}, 1) c := sourceURL{testServer.URL, http.Header{}, types.NodeName(nodeName), ch, nil, 0, http.DefaultClient} - if err := c.extractFromURL(); err != nil { + if err := c.extractFromURL(logger); err != nil { t.Errorf("%s: Unexpected error: %v", testCase.desc, err) continue } @@ -349,11 +354,13 @@ func TestURLWithHeader(t *testing.T) { } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() + + logger, _ := ktesting.NewTestContext(t) ch := make(chan interface{}, 1) header := make(http.Header) header.Set("Metadata-Flavor", "Google") c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0, http.DefaultClient} - if err := c.extractFromURL(); err != nil { + if err := c.extractFromURL(logger); err != nil { t.Fatalf("Unexpected error extracting from URL: %v", err) } update := (<-ch).(kubetypes.PodUpdate) diff --git a/pkg/kubelet/config/mux.go b/pkg/kubelet/config/mux.go index a2b3e1e0a5f..091148f950f 100644 --- a/pkg/kubelet/config/mux.go +++ b/pkg/kubelet/config/mux.go @@ -28,7 +28,7 @@ type merger interface { // Invoked when a change from a source is received. May also function as an incremental // merger if you wish to consume changes incrementally. Must be reentrant when more than // one source is defined. - Merge(source string, update interface{}) error + Merge(ctx context.Context, source string, update interface{}) error } // mux is a class for merging configuration from multiple sources. Changes are @@ -70,14 +70,15 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf newChannel := make(chan interface{}) m.sources[source] = newChannel - go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done()) + go wait.Until(func() { m.listen(ctx, source, newChannel) }, 0, ctx.Done()) return newChannel } -func (m *mux) listen(source string, listenChannel <-chan interface{}) { +func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan interface{}) { + logger := klog.FromContext(ctx) for update := range listenChannel { - if err := m.merger.Merge(source, update); err != nil { - klog.InfoS("failed merging update", "err", err) + if err := m.merger.Merge(ctx, source, update); err != nil { + logger.Info("failed merging update", "err", err) } } } diff --git a/pkg/kubelet/config/mux_test.go b/pkg/kubelet/config/mux_test.go index b24068505cf..6033023c871 100644 --- a/pkg/kubelet/config/mux_test.go +++ b/pkg/kubelet/config/mux_test.go @@ -20,11 +20,14 @@ import ( "context" "reflect" "testing" + + "k8s.io/kubernetes/test/utils/ktesting" ) func TestConfigurationChannels(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := ktesting.Init(t) + ctx = ktesting.WithCancel(ctx) + defer ctx.Cancel("TestConfigurationChannels completed") mux := newMux(nil) channelOne := mux.ChannelWithContext(ctx, "one") @@ -43,7 +46,7 @@ type MergeMock struct { t *testing.T } -func (m MergeMock) Merge(source string, update interface{}) error { +func (m MergeMock) Merge(ctx context.Context, source string, update interface{}) error { if m.source != source { m.t.Errorf("Expected %s, Got %s", m.source, source) } @@ -54,8 +57,9 @@ func (m MergeMock) Merge(source string, update interface{}) error { } func TestMergeInvoked(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := ktesting.Init(t) + ctx = ktesting.WithCancel(ctx) + defer ctx.Cancel("TestMergeInvoked completed") merger := MergeMock{"one", "test", t} mux := newMux(&merger) @@ -63,18 +67,19 @@ func TestMergeInvoked(t *testing.T) { } // mergeFunc implements the Merger interface -type mergeFunc func(source string, update interface{}) error +type mergeFunc func(ctx context.Context, source string, update interface{}) error -func (f mergeFunc) Merge(source string, update interface{}) error { - return f(source, update) +func (f mergeFunc) Merge(ctx context.Context, source string, update interface{}) error { + return f(ctx, source, update) } func TestSimultaneousMerge(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := ktesting.Init(t) + ctx = ktesting.WithCancel(ctx) + defer ctx.Cancel("TestSimultaneousMerge completed") ch := make(chan bool, 2) - mux := newMux(mergeFunc(func(source string, update interface{}) error { + mux := newMux(mergeFunc(func(ctx context.Context, source string, update interface{}) error { switch source { case "one": if update.(string) != "test" { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 75e92cda8a8..80dbd4b9aa8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -368,22 +368,22 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku // TODO: it needs to be replaced by a proper context in the future ctx := context.TODO() - + logger := klog.FromContext(ctx) // define file config source if kubeCfg.StaticPodPath != "" { klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath) - config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource)) + config.NewSourceFile(logger, kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource)) } // define url config source if kubeCfg.StaticPodURL != "" { klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader) - config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource)) + config.NewSourceURL(logger, kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource)) } if kubeDeps.KubeClient != nil { klog.InfoS("Adding apiserver pod source") - config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource)) + config.NewSourceApiserver(logger, kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource)) } return cfg, nil }