Merge pull request #130581 from zhifei92/migrate-kubelet-config-to-contextual-logging

chore(kubelet):  migrate config to contextual logging
This commit is contained in:
Kubernetes Prow Robot
2025-09-05 05:25:26 -07:00
committed by GitHub
17 changed files with 160 additions and 125 deletions

View File

@@ -223,6 +223,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*

View File

@@ -237,6 +237,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*

View File

@@ -69,6 +69,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/config/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*

View File

@@ -34,22 +34,22 @@ import (
const WaitForAPIServerSyncPeriod = 1 * time.Second
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.InfoS("Waiting for node sync before watching apiserver pods")
logger.Info("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).InfoS("node sync completed")
logger.V(4).Info("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).InfoS("node sync has not completed yet")
logger.V(4).Info("node sync has not completed yet")
}
klog.InfoS("Watching apiserver")
logger.Info("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}

View File

@@ -57,7 +57,7 @@ func generatePodName(name string, nodeName types.NodeName) string {
return fmt.Sprintf("%s-%s", name, strings.ToLower(string(nodeName)))
}
func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
func applyDefaults(logger klog.Logger, pod *api.Pod, source string, isFile bool, nodeName types.NodeName) error {
if len(pod.UID) == 0 {
hasher := md5.New()
hash.DeepHashObject(hasher, pod)
@@ -70,16 +70,16 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.Node
fmt.Fprintf(hasher, "url:%s", source)
}
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))
klog.V(5).InfoS("Generated UID", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source)
logger.V(5).Info("Generated UID", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source)
}
pod.Name = generatePodName(pod.Name, nodeName)
klog.V(5).InfoS("Generated pod name", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source)
logger.V(5).Info("Generated pod name", "pod", klog.KObj(pod), "podUID", pod.UID, "source", source)
if pod.Namespace == "" {
pod.Namespace = metav1.NamespaceDefault
}
klog.V(5).InfoS("Set namespace for pod", "pod", klog.KObj(pod), "source", source)
logger.V(5).Info("Set namespace for pod", "pod", klog.KObj(pod), "source", source)
// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName = string(nodeName)
@@ -104,7 +104,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName types.Node
return nil
}
type defaultFunc func(pod *api.Pod) error
type defaultFunc func(logger klog.Logger, pod *api.Pod) error
// A static pod tried to use a ClusterTrustBundle projected volume source.
var ErrStaticPodTriedToUseClusterTrustBundle = errors.New("static pods may not use ClusterTrustBundle projected volume sources")
@@ -113,7 +113,7 @@ var ErrStaticPodTriedToUseClusterTrustBundle = errors.New("static pods may not u
var ErrStaticPodTriedToUseResourceClaims = errors.New("static pods may not use ResourceClaims")
// tryDecodeSinglePod takes data and tries to extract valid Pod config information from it.
func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) {
func tryDecodeSinglePod(logger klog.Logger, data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) {
// JSON is valid YAML, so this should work for everything.
json, err := utilyaml.ToJSON(data)
if err != nil {
@@ -135,7 +135,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v
}
// Apply default values and validate the pod.
if err = defaultFn(newPod); err != nil {
if err = defaultFn(logger, newPod); err != nil {
return true, pod, err
}
if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 {
@@ -143,7 +143,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v
}
v1Pod := &v1.Pod{}
if err := k8s_api_v1.Convert_core_Pod_To_v1_Pod(newPod, v1Pod, nil); err != nil {
klog.ErrorS(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod))
logger.Error(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod))
return true, nil, err
}
@@ -177,7 +177,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v
return true, v1Pod, nil
}
func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods v1.PodList, err error) {
func tryDecodePodList(logger klog.Logger, data []byte, defaultFn defaultFunc) (parsed bool, pods v1.PodList, err error) {
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data)
if err != nil {
return false, pods, err
@@ -196,7 +196,7 @@ func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods v1.
if newPod.Name == "" {
return true, pods, fmt.Errorf("invalid pod: name is needed for the pod")
}
if err = defaultFn(newPod); err != nil {
if err = defaultFn(logger, newPod); err != nil {
return true, pods, err
}
if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 {

View File

@@ -29,6 +29,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podtest "k8s.io/kubernetes/pkg/api/pod/testing"
"k8s.io/kubernetes/pkg/apis/core"
@@ -37,9 +39,10 @@ import (
"k8s.io/utils/ptr"
)
func noDefault(*core.Pod) error { return nil }
func noDefault(klog.Logger, *core.Pod) error { return nil }
func TestDecodeSinglePod(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
grace := int64(30)
enableServiceLinks := v1.DefaultEnableServiceLinks
pod := &v1.Pod{
@@ -80,7 +83,7 @@ func TestDecodeSinglePod(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
parsed, podOut, err := tryDecodeSinglePod(json, noDefault)
parsed, podOut, err := tryDecodeSinglePod(logger, json, noDefault)
if !parsed {
t.Errorf("expected to have parsed file: (%s)", string(json))
}
@@ -98,7 +101,7 @@ func TestDecodeSinglePod(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
parsed, podOut, err = tryDecodeSinglePod(yaml, noDefault)
parsed, podOut, err = tryDecodeSinglePod(logger, yaml, noDefault)
if !parsed {
t.Errorf("expected to have parsed file: (%s)", string(yaml))
}
@@ -112,6 +115,7 @@ func TestDecodeSinglePod(t *testing.T) {
}
func TestDecodeSinglePodRejectsClusterTrustBundleVolumes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
grace := int64(30)
enableServiceLinks := v1.DefaultEnableServiceLinks
pod := &v1.Pod{
@@ -175,13 +179,14 @@ func TestDecodeSinglePodRejectsClusterTrustBundleVolumes(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
_, _, err = tryDecodeSinglePod(json, noDefault)
_, _, err = tryDecodeSinglePod(logger, json, noDefault)
if !strings.Contains(err.Error(), "may not reference clustertrustbundles") {
t.Errorf("Got error %q, want %q", err, fmt.Errorf("static pods may not reference clustertrustbundles API objects"))
}
}
func TestDecodeSinglePodRejectsResourceClaims(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
grace := int64(30)
enableServiceLinks := v1.DefaultEnableServiceLinks
pod := &v1.Pod{
@@ -231,13 +236,14 @@ func TestDecodeSinglePodRejectsResourceClaims(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
_, _, err = tryDecodeSinglePod(json, noDefault)
_, _, err = tryDecodeSinglePod(logger, json, noDefault)
if !strings.Contains(err.Error(), "may not reference resourceclaims") {
t.Errorf("Got error %q, want %q", err, fmt.Errorf("static pods may not reference resourceclaims API objects"))
}
}
func TestDecodePodList(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
grace := int64(30)
enableServiceLinks := v1.DefaultEnableServiceLinks
pod := &v1.Pod{
@@ -282,7 +288,7 @@ func TestDecodePodList(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
parsed, podListOut, err := tryDecodePodList(json, noDefault)
parsed, podListOut, err := tryDecodePodList(logger, json, noDefault)
if !parsed {
t.Errorf("expected to have parsed file: (%s)", string(json))
}
@@ -301,7 +307,7 @@ func TestDecodePodList(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
parsed, podListOut, err = tryDecodePodList(yaml, noDefault)
parsed, podListOut, err = tryDecodePodList(logger, yaml, noDefault)
if !parsed {
t.Errorf("expected to have parsed file: (%s): %v", string(yaml), err)
continue

View File

@@ -99,9 +99,12 @@ func (c *PodConfig) SeenAllSources(seenSources sets.Set[string]) bool {
if c.pods == nil {
return false
}
// Use klog.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
klog.V(5).InfoS("Looking for sources, have seen", "sources", sets.List(c.sources), "seenSources", seenSources)
logger.V(5).Info("Looking for sources, have seen", "sources", sets.List(c.sources), "seenSources", seenSources)
return seenSources.HasAll(sets.List(c.sources)...) && c.pods.seenSources(sets.List(c.sources)...)
}
@@ -157,12 +160,12 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
func (s *podStorage) Merge(ctx context.Context, source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
adds, updates, deletes, removes, reconciles := s.merge(ctx, source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
@@ -216,9 +219,10 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
func (s *podStorage) merge(ctx context.Context, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
logger := klog.FromContext(ctx)
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
@@ -235,7 +239,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
filtered := filterInvalidPods(logger, newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
@@ -258,7 +262,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
continue
}
recordFirstSeenTime(ref)
recordFirstSeenTime(logger, ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
@@ -268,16 +272,16 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
logger.V(4).Info("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
} else if update.Op == kubetypes.DELETE {
klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
logger.V(4).Info("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
} else {
klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
logger.V(4).Info("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
logger.V(4).Info("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
@@ -289,7 +293,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source", "source", source)
logger.V(4).Info("Setting pods for source", "source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
@@ -303,7 +307,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
default:
klog.InfoS("Received invalid update type", "type", update)
logger.Info("Received invalid update type", "type", update)
}
@@ -330,14 +334,14 @@ func (s *podStorage) seenSources(sources ...string) bool {
return s.sourcesSeen.HasAll(sources...)
}
func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) {
func filterInvalidPods(logger klog.Logger, pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) {
names := sets.Set[string]{}
for i, pod := range pods {
// Pods from each source are assumed to have passed validation individually.
// This function only checks if there is any naming conflict.
name := kubecontainer.GetPodFullName(pod)
if names.Has(name) {
klog.InfoS("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source)
logger.Info("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source)
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s due to duplicate pod name %q, ignoring", format.Pod(pod), source, pod.Name)
continue
} else {
@@ -393,8 +397,8 @@ func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
}
// recordFirstSeenTime records the first seen time of this pod.
func recordFirstSeenTime(pod *v1.Pod) {
klog.V(4).InfoS("Receiving a new pod", "pod", klog.KObj(pod))
func recordFirstSeenTime(logger klog.Logger, pod *v1.Pod) {
logger.V(4).Info("Receiving a new pod", "pod", klog.KObj(pod))
pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = kubetypes.NewTimestamp().GetString()
}

View File

@@ -60,13 +60,13 @@ type sourceFile struct {
}
// NewSourceFile watches a config file for changes.
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
// "github.com/sigma/go-inotify" requires a path without trailing "/"
path = strings.TrimRight(path, string(os.PathSeparator))
config := newSourceFile(path, nodeName, period, updates)
klog.V(1).InfoS("Watching path", "path", path)
config.run()
logger.V(1).Info("Watching path", "path", path)
config.run(logger)
}
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
@@ -89,36 +89,36 @@ func newSourceFile(path string, nodeName types.NodeName, period time.Duration, u
}
}
func (s *sourceFile) run() {
func (s *sourceFile) run(logger klog.Logger) {
listTicker := time.NewTicker(s.period)
go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
if err := s.listConfig(logger); err != nil {
logger.Error(err, "Unable to read config path", "path", s.path)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
if err := s.listConfig(logger); err != nil {
logger.Error(err, "Unable to read config path", "path", s.path)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.ErrorS(err, "Unable to process watch event")
if err := s.consumeWatchEvent(logger, e); err != nil {
logger.Error(err, "Unable to process watch event")
}
}
}
}()
s.startWatch()
s.startWatch(logger)
}
func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
return applyDefaults(pod, source, true, s.nodeName)
func (s *sourceFile) applyDefaults(logger klog.Logger, pod *api.Pod, source string) error {
return applyDefaults(logger, pod, source, true, s.nodeName)
}
func (s *sourceFile) listConfig() error {
func (s *sourceFile) listConfig(logger klog.Logger) error {
path := s.path
statInfo, err := os.Stat(path)
if err != nil {
@@ -132,7 +132,7 @@ func (s *sourceFile) listConfig() error {
switch {
case statInfo.Mode().IsDir():
pods, err := s.extractFromDir(path)
pods, err := s.extractFromDir(logger, path)
if err != nil {
return err
}
@@ -144,7 +144,7 @@ func (s *sourceFile) listConfig() error {
return s.replaceStore(pods...)
case statInfo.Mode().IsRegular():
pod, err := s.extractFromFile(path)
pod, err := s.extractFromFile(logger, path)
if err != nil {
return err
}
@@ -158,7 +158,7 @@ func (s *sourceFile) listConfig() error {
// Get as many pod manifests as we can from a directory. Return an error if and only if something
// prevented us from reading anything at all. Do not return an error if only some files
// were problematic.
func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
func (s *sourceFile) extractFromDir(logger klog.Logger, name string) ([]*v1.Pod, error) {
dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return nil, fmt.Errorf("glob failed: %v", err)
@@ -173,32 +173,32 @@ func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
for _, path := range dirents {
statInfo, err := os.Stat(path)
if err != nil {
klog.ErrorS(err, "Could not get metadata", "path", path)
logger.Error(err, "Could not get metadata", "path", path)
continue
}
switch {
case statInfo.Mode().IsDir():
klog.ErrorS(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path)
logger.Error(nil, "Provided manifest path is a directory, not recursing into manifest path", "path", path)
case statInfo.Mode().IsRegular():
pod, err := s.extractFromFile(path)
pod, err := s.extractFromFile(logger, path)
if err != nil {
if !os.IsNotExist(err) {
klog.ErrorS(err, "Could not process manifest file", "path", path)
logger.Error(err, "Could not process manifest file", "path", path)
}
} else {
pods = append(pods, pod)
}
default:
klog.ErrorS(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode())
logger.Error(nil, "Manifest path is not a directory or file", "path", path, "mode", statInfo.Mode())
}
}
return pods, nil
}
// extractFromFile parses a file for Pod configuration information.
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
klog.V(3).InfoS("Reading config file", "path", filename)
func (s *sourceFile) extractFromFile(logger klog.Logger, filename string) (pod *v1.Pod, err error) {
logger.V(3).Info("Reading config file", "path", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
@@ -221,11 +221,11 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
return pod, err
}
defaultFn := func(pod *api.Pod) error {
return s.applyDefaults(pod, filename)
defaultFn := func(logger klog.Logger, pod *api.Pod) error {
return s.applyDefaults(logger, pod, filename)
}
parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
parsed, pod, podErr := tryDecodeSinglePod(logger, data, defaultFn)
if parsed {
if podErr != nil {
return pod, podErr

View File

@@ -48,7 +48,7 @@ func (e *retryableError) Error() string {
return e.message
}
func (s *sourceFile) startWatch() {
func (s *sourceFile) startWatch(logger klog.Logger) {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffID := "watch"
@@ -57,8 +57,8 @@ func (s *sourceFile) startWatch() {
return
}
if err := s.doWatch(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
if err := s.doWatch(logger); err != nil {
logger.Error(err, "Unable to read config path", "path", s.path)
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffID, time.Now())
}
@@ -66,7 +66,7 @@ func (s *sourceFile) startWatch() {
}, retryPeriod)
}
func (s *sourceFile) doWatch() error {
func (s *sourceFile) doWatch(logger klog.Logger) error {
_, err := os.Stat(s.path)
if err != nil {
if !os.IsNotExist(err) {
@@ -91,7 +91,7 @@ func (s *sourceFile) doWatch() error {
for {
select {
case event := <-w.Events:
if err = s.produceWatchEvent(&event); err != nil {
if err = s.produceWatchEvent(logger, &event); err != nil {
return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
}
case err = <-w.Errors:
@@ -100,10 +100,10 @@ func (s *sourceFile) doWatch() error {
}
}
func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
func (s *sourceFile) produceWatchEvent(logger klog.Logger, e *fsnotify.Event) error {
// Ignore file start with dots
if strings.HasPrefix(filepath.Base(e.Name), ".") {
klog.V(4).InfoS("Ignored pod manifest, because it starts with dots", "eventName", e.Name)
logger.V(4).Info("Ignored pod manifest, because it starts with dots", "eventName", e.Name)
return nil
}
var eventType podEventType
@@ -127,10 +127,10 @@ func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
return nil
}
func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
func (s *sourceFile) consumeWatchEvent(logger klog.Logger, e *watchEvent) error {
switch e.eventType {
case podAdd, podModify:
pod, err := s.extractFromFile(e.fileName)
pod, err := s.extractFromFile(logger, e.fileName)
if err != nil {
return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
}

View File

@@ -40,20 +40,23 @@ import (
"k8s.io/kubernetes/pkg/apis/core/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestExtractFromNonExistentFile(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
err := lw.doWatch()
err := lw.doWatch(logger)
if err == nil {
t.Errorf("Expected error")
}
}
func TestUpdateOnNonExistentFile(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{})
NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
NewSourceFile(logger, "random_non_existent_path", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
@@ -68,6 +71,7 @@ func TestUpdateOnNonExistentFile(t *testing.T) {
}
func TestReadPodsFromFileExistAlready(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
@@ -81,7 +85,7 @@ func TestReadPodsFromFileExistAlready(t *testing.T) {
file := testCase.writeToFile(dirName, "test_pod_manifest", t)
ch := make(chan interface{})
NewSourceFile(file, hostname, time.Millisecond, ch)
NewSourceFile(logger, file, hostname, time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
@@ -228,6 +232,7 @@ func createSymbolicLink(link, target, name string, t *testing.T) string {
}
func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
@@ -253,9 +258,9 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
ch := make(chan interface{})
if watchDir {
NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
NewSourceFile(logger, dirName, hostname, 100*time.Millisecond, ch)
} else {
NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
NewSourceFile(logger, filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
}
expectEmptyUpdate(t, ch)
@@ -281,6 +286,7 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
}
func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
@@ -319,9 +325,9 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test
}()
if watchDir {
NewSourceFile(dirName, hostname, period, ch)
NewSourceFile(logger, dirName, hostname, period, ch)
} else {
NewSourceFile(file, hostname, period, ch)
NewSourceFile(logger, file, hostname, period, ch)
}
// await fsnotify to be ready

View File

@@ -24,6 +24,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestExtractFromBadDataFile(t *testing.T) {
@@ -33,6 +34,7 @@ func TestExtractFromBadDataFile(t *testing.T) {
}
defer removeAll(dirName, t)
logger, _ := ktesting.NewTestContext(t)
fileName := filepath.Join(dirName, "test_pod_config")
err = os.WriteFile(fileName, []byte{1, 2, 3}, 0555)
if err != nil {
@@ -41,7 +43,7 @@ func TestExtractFromBadDataFile(t *testing.T) {
ch := make(chan interface{}, 1)
lw := newSourceFile(fileName, "localhost", time.Millisecond, ch)
err = lw.listConfig()
err = lw.listConfig(logger)
if err == nil {
t.Fatalf("expected error, got nil")
}
@@ -55,9 +57,10 @@ func TestExtractFromEmptyDir(t *testing.T) {
}
defer removeAll(dirName, t)
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
lw := newSourceFile(dirName, "localhost", time.Millisecond, ch)
err = lw.listConfig()
err = lw.listConfig(logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@@ -25,10 +25,10 @@ import (
"k8s.io/klog/v2"
)
func (s *sourceFile) startWatch() {
klog.ErrorS(nil, "Watching source file is unsupported in this build")
func (s *sourceFile) startWatch(logger klog.Logger) {
logger.Error(nil, "Watching source file is unsupported in this build")
}
func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
func (s *sourceFile) consumeWatchEvent(logger klog.Logger, e *watchEvent) error {
return fmt.Errorf("consuming watch event is unsupported in this build")
}

View File

@@ -43,7 +43,7 @@ type sourceURL struct {
}
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
config := &sourceURL{
url: url,
header: header,
@@ -54,35 +54,35 @@ func NewSourceURL(url string, header http.Header, nodeName types.NodeName, perio
// read the manifest URL passed to kubelet.
client: &http.Client{Timeout: 10 * time.Second},
}
klog.V(1).InfoS("Watching URL", "URL", url)
go wait.Until(config.run, period, wait.NeverStop)
logger.V(1).Info("Watching URL", "URL", url)
go wait.Until(func() { config.run(logger) }, period, wait.NeverStop)
}
func (s *sourceURL) run() {
if err := s.extractFromURL(); err != nil {
func (s *sourceURL) run(logger klog.Logger) {
if err := s.extractFromURL(logger); err != nil {
// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
if s.failureLogs < 3 {
klog.InfoS("Failed to read pods from URL", "err", err)
logger.Info("Failed to read pods from URL", "err", err)
} else if s.failureLogs == 3 {
klog.InfoS("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err)
logger.Info("Failed to read pods from URL. Dropping verbosity of this message to V(4)", "err", err)
} else {
klog.V(4).InfoS("Failed to read pods from URL", "err", err)
logger.V(4).Info("Failed to read pods from URL", "err", err)
}
s.failureLogs++
} else {
if s.failureLogs > 0 {
klog.InfoS("Successfully read pods from URL")
logger.Info("Successfully read pods from URL")
s.failureLogs = 0
}
}
}
func (s *sourceURL) applyDefaults(pod *api.Pod) error {
return applyDefaults(pod, s.url, false, s.nodeName)
func (s *sourceURL) applyDefaults(logger klog.Logger, pod *api.Pod) error {
return applyDefaults(logger, pod, s.url, false, s.nodeName)
}
func (s *sourceURL) extractFromURL() error {
func (s *sourceURL) extractFromURL(logger klog.Logger) error {
req, err := http.NewRequest("GET", s.url, nil)
if err != nil {
return err
@@ -112,7 +112,7 @@ func (s *sourceURL) extractFromURL() error {
s.data = data
// First try as it is a single pod.
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
parsed, pod, singlePodErr := tryDecodeSinglePod(logger, data, s.applyDefaults)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
@@ -123,7 +123,7 @@ func (s *sourceURL) extractFromURL() error {
}
// That didn't work, so try a list of pods.
parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
parsed, podList, multiPodErr := tryDecodePodList(logger, data, s.applyDefaults)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.

View File

@@ -34,11 +34,13 @@ import (
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/apis/core/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestURLErrorNotExistNoUpdate(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{})
NewSourceURL("http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch)
NewSourceURL(logger, "http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
@@ -47,15 +49,17 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) {
}
func TestExtractFromHttpBadness(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(); err == nil {
if err := c.extractFromURL(logger); err == nil {
t.Errorf("Expected error")
}
expectEmptyChannel(t, ch)
}
func TestExtractInvalidPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
var testCases = []struct {
desc string
pod *v1.Pod
@@ -118,13 +122,14 @@ func TestExtractInvalidPods(t *testing.T) {
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(); err == nil {
if err := c.extractFromURL(logger); err == nil {
t.Errorf("%s: Expected error", testCase.desc)
}
}
}
func TestExtractPodsFromHTTP(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
nodeName := "different-value"
grace := int64(30)
@@ -301,7 +306,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, http.Header{}, types.NodeName(nodeName), ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(); err != nil {
if err := c.extractFromURL(logger); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
}
@@ -349,11 +354,13 @@ func TestURLWithHeader(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
header := make(http.Header)
header.Set("Metadata-Flavor", "Google")
c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(); err != nil {
if err := c.extractFromURL(logger); err != nil {
t.Fatalf("Unexpected error extracting from URL: %v", err)
}
update := (<-ch).(kubetypes.PodUpdate)

View File

@@ -28,7 +28,7 @@ type merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
Merge(ctx context.Context, source string, update interface{}) error
}
// mux is a class for merging configuration from multiple sources. Changes are
@@ -70,14 +70,15 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
go wait.Until(func() { m.listen(ctx, source, newChannel) }, 0, ctx.Done())
return newChannel
}
func (m *mux) listen(source string, listenChannel <-chan interface{}) {
func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan interface{}) {
logger := klog.FromContext(ctx)
for update := range listenChannel {
if err := m.merger.Merge(source, update); err != nil {
klog.InfoS("failed merging update", "err", err)
if err := m.merger.Merge(ctx, source, update); err != nil {
logger.Info("failed merging update", "err", err)
}
}
}

View File

@@ -20,11 +20,14 @@ import (
"context"
"reflect"
"testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestConfigurationChannels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := ktesting.Init(t)
ctx = ktesting.WithCancel(ctx)
defer ctx.Cancel("TestConfigurationChannels completed")
mux := newMux(nil)
channelOne := mux.ChannelWithContext(ctx, "one")
@@ -43,7 +46,7 @@ type MergeMock struct {
t *testing.T
}
func (m MergeMock) Merge(source string, update interface{}) error {
func (m MergeMock) Merge(ctx context.Context, source string, update interface{}) error {
if m.source != source {
m.t.Errorf("Expected %s, Got %s", m.source, source)
}
@@ -54,8 +57,9 @@ func (m MergeMock) Merge(source string, update interface{}) error {
}
func TestMergeInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := ktesting.Init(t)
ctx = ktesting.WithCancel(ctx)
defer ctx.Cancel("TestMergeInvoked completed")
merger := MergeMock{"one", "test", t}
mux := newMux(&merger)
@@ -63,18 +67,19 @@ func TestMergeInvoked(t *testing.T) {
}
// mergeFunc implements the Merger interface
type mergeFunc func(source string, update interface{}) error
type mergeFunc func(ctx context.Context, source string, update interface{}) error
func (f mergeFunc) Merge(source string, update interface{}) error {
return f(source, update)
func (f mergeFunc) Merge(ctx context.Context, source string, update interface{}) error {
return f(ctx, source, update)
}
func TestSimultaneousMerge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := ktesting.Init(t)
ctx = ktesting.WithCancel(ctx)
defer ctx.Cancel("TestSimultaneousMerge completed")
ch := make(chan bool, 2)
mux := newMux(mergeFunc(func(source string, update interface{}) error {
mux := newMux(mergeFunc(func(ctx context.Context, source string, update interface{}) error {
switch source {
case "one":
if update.(string) != "test" {

View File

@@ -370,22 +370,22 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
logger := klog.FromContext(ctx)
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
config.NewSourceFile(logger, kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
config.NewSourceURL(logger, kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
config.NewSourceApiserver(logger, kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
}