mirror of
https://github.com/kubernetes/client-go.git
synced 2026-02-21 23:53:16 +00:00
client-go cache: allow passing name+logger to DeltaFIFO, RealFIFO and Reflector
This improves logging and enables more informative waiting for cache sync in a following commit. It addresses one klog.TODO in the Reflector. The RealFIFOOptions and InformerOptions structs get extended the same way as DeltaFIFOOptions before: a logger may be set, but it's not required. This is not an API break. That the name has to be passed separately is a bit annoying at first glance because it could also be set directly on the logger through WithName, but keeping it separate is better: - name can be set without providing a logger - name can be defaulted - less code in the caller when passing through a logger and adding the name only in the field - last but not least, extracting the name is not supported in a portable manner by logr All in-tree references in production code get updated. While at it, logging in the fifos gets updated to follow best practices: if some code encounters an abnormal situation and then continues, it should use utilruntime.HandleErrorWithLogger instead of normal error logging. Existing "logger" fields get moved to the top because that is a more common place for such a read-only field. Kubernetes-commit: 45251e5f654e6c052659d110cd721f9fbe185191
This commit is contained in:
committed by
Kubernetes Publisher
parent
07f99e4437
commit
b37e79caad
14
tools/cache/controller.go
vendored
14
tools/cache/controller.go
vendored
@@ -27,6 +27,7 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientgofeaturegate "k8s.io/client-go/features"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
@@ -395,6 +396,9 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
|
||||
|
||||
// InformerOptions configure a Reflector.
|
||||
type InformerOptions struct {
|
||||
// Logger, if not nil, is used instead of klog.Background() for logging.
|
||||
Logger *klog.Logger
|
||||
|
||||
// ListerWatcher implements List and Watch functions for the source of the resource
|
||||
// the informer will be informing about.
|
||||
ListerWatcher ListerWatcher
|
||||
@@ -784,7 +788,11 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
|
||||
fifo := newQueueFIFO(clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
|
||||
logger := klog.Background()
|
||||
if options.Logger != nil {
|
||||
logger = *options.Logger
|
||||
}
|
||||
fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
@@ -806,9 +814,10 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
|
||||
func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
options := RealFIFOOptions{
|
||||
Logger: &logger,
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
Transformer: transform,
|
||||
Identifier: identifier,
|
||||
@@ -824,6 +833,7 @@ func newQueueFIFO(clientState Store, transform TransformFunc, identifier Informe
|
||||
return NewRealFIFOWithOptions(options)
|
||||
} else {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
Logger: &logger,
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transform,
|
||||
|
||||
38
tools/cache/delta_fifo.go
vendored
38
tools/cache/delta_fifo.go
vendored
@@ -22,6 +22,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
@@ -31,6 +32,12 @@ import (
|
||||
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
|
||||
// optional.
|
||||
type DeltaFIFOOptions struct {
|
||||
// If set, log output will go to this logger instead of klog.Background().
|
||||
// The name of the fifo gets added automatically.
|
||||
Logger *klog.Logger
|
||||
|
||||
// Name can be used to override the default "DeltaFIFO" name for the new instance.
|
||||
Name string
|
||||
|
||||
// KeyFunction is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
|
||||
@@ -55,9 +62,6 @@ type DeltaFIFOOptions struct {
|
||||
// If set, will be called for objects before enqueueing them. Please
|
||||
// see the comment on TransformFunc for details.
|
||||
Transformer TransformFunc
|
||||
|
||||
// If set, log output will go to this logger instead of klog.Background().
|
||||
Logger *klog.Logger
|
||||
}
|
||||
|
||||
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
|
||||
@@ -102,6 +106,13 @@ type DeltaFIFOOptions struct {
|
||||
// threads, you could end up with multiple threads processing slightly
|
||||
// different versions of the same object.
|
||||
type DeltaFIFO struct {
|
||||
// logger is a per-instance logger. This gets chosen when constructing
|
||||
// the instance, with klog.Background() as default.
|
||||
logger klog.Logger
|
||||
|
||||
// name is the name of the fifo. It is included in the logger.
|
||||
name string
|
||||
|
||||
// lock/cond protects access to 'items' and 'queue'.
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
@@ -139,10 +150,6 @@ type DeltaFIFO struct {
|
||||
|
||||
// Called with every object if non-nil.
|
||||
transformer TransformFunc
|
||||
|
||||
// logger is a per-instance logger. This gets chosen when constructing
|
||||
// the instance, with klog.Background() as default.
|
||||
logger klog.Logger
|
||||
}
|
||||
|
||||
// TransformFunc allows for transforming an object before it will be processed.
|
||||
@@ -263,6 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
}
|
||||
|
||||
f := &DeltaFIFO{
|
||||
logger: klog.Background(),
|
||||
name: "DeltaFIFO",
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: opts.KeyFunction,
|
||||
@@ -270,11 +279,14 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
|
||||
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
|
||||
transformer: opts.Transformer,
|
||||
logger: klog.Background(),
|
||||
}
|
||||
if opts.Logger != nil {
|
||||
f.logger = *opts.Logger
|
||||
}
|
||||
if opts.Name != "" {
|
||||
f.name = opts.Name
|
||||
}
|
||||
f.logger = klog.LoggerWithName(f.logger, f.name)
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
}
|
||||
@@ -477,10 +489,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del
|
||||
// when given a non-empty list (as it is here).
|
||||
// If somehow it happens anyway, deal with it but complain.
|
||||
if oldDeltas == nil {
|
||||
f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
|
||||
utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
|
||||
return nil
|
||||
}
|
||||
f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
|
||||
utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
|
||||
f.items[id] = newDeltas
|
||||
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
|
||||
}
|
||||
@@ -530,7 +542,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||
item, ok := f.items[id]
|
||||
if !ok {
|
||||
// This should never happen
|
||||
f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
|
||||
utilruntime.HandleErrorWithLogger(f.logger, nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
|
||||
continue
|
||||
}
|
||||
delete(f.items, id)
|
||||
@@ -623,7 +635,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
||||
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
||||
if err != nil {
|
||||
deletedObj = nil
|
||||
f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
|
||||
utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
|
||||
} else if !exists {
|
||||
deletedObj = nil
|
||||
f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k)
|
||||
@@ -666,7 +678,7 @@ func (f *DeltaFIFO) Resync() error {
|
||||
func (f *DeltaFIFO) syncKeyLocked(key string) error {
|
||||
obj, exists, err := f.knownObjects.GetByKey(key)
|
||||
if err != nil {
|
||||
f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
|
||||
utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
|
||||
return nil
|
||||
} else if !exists {
|
||||
f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key)
|
||||
|
||||
16
tools/cache/reflector.go
vendored
16
tools/cache/reflector.go
vendored
@@ -85,6 +85,7 @@ type TransformingStore interface {
|
||||
|
||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||
type Reflector struct {
|
||||
logger klog.Logger
|
||||
// name identifies this reflector. By default, it will be a file:line if possible.
|
||||
name string
|
||||
// The name of the type we expect to place in the store. The name
|
||||
@@ -227,6 +228,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
||||
|
||||
// ReflectorOptions configures a Reflector.
|
||||
type ReflectorOptions struct {
|
||||
// Logger, if not nil, is used instead of klog.Background() for logging.
|
||||
// The name of the reflector gets added automatically.
|
||||
Logger *klog.Logger
|
||||
|
||||
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
|
||||
// in the call stack that is outside this package.
|
||||
Name string
|
||||
@@ -289,6 +294,13 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
|
||||
r.name = naming.GetNameFromCallsite(internalPackages...)
|
||||
}
|
||||
|
||||
logger := klog.Background()
|
||||
if options.Logger != nil {
|
||||
logger = *options.Logger
|
||||
}
|
||||
logger = klog.LoggerWithName(logger, r.name)
|
||||
r.logger = logger
|
||||
|
||||
if r.typeDescription == "" {
|
||||
r.typeDescription = getTypeDescriptionFromObject(expectedType)
|
||||
}
|
||||
@@ -299,9 +311,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
|
||||
|
||||
r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)
|
||||
if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) {
|
||||
// Using klog.TODO() here because switching to a caller-provided contextual logger
|
||||
// would require an API change and updating all existing call sites.
|
||||
klog.TODO().V(2).Info(
|
||||
r.logger.V(2).Info(
|
||||
"The client used to build this informer/reflector doesn't support WatchList semantics. The feature will be disabled. This is expected in unit tests but not in production. For details, see the documentation of watchlist.DoesClientNotSupportWatchListSemantics().",
|
||||
"feature", clientfeatures.WatchListClient,
|
||||
)
|
||||
|
||||
@@ -68,12 +68,12 @@ func TestReflectorDataConsistencyDetector(t *testing.T) {
|
||||
}
|
||||
|
||||
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
fifo := newQueueFIFO(store, transformer, InformerNameAndResource{}, nil)
|
||||
fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil)
|
||||
|
||||
lw := &ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
|
||||
2
tools/cache/shared_informer.go
vendored
2
tools/cache/shared_informer.go
vendored
@@ -563,7 +563,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
fifo := newQueueFIFO(s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
|
||||
fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
||||
33
tools/cache/the_real_fifo.go
vendored
33
tools/cache/the_real_fifo.go
vendored
@@ -23,11 +23,19 @@ import (
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
)
|
||||
|
||||
// RealFIFOOptions is the configuration parameters for RealFIFO.
|
||||
type RealFIFOOptions struct {
|
||||
// If set, log output will go to this logger instead of klog.Background().
|
||||
// The name of the fifo gets added automatically.
|
||||
Logger *klog.Logger
|
||||
|
||||
// Name can be used to override the default "RealFIFO" name for the new instance.
|
||||
Name string
|
||||
|
||||
// KeyFunction is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned RealFIFO's keyOf() method, with additional
|
||||
// handling around deleted objects and queue state).
|
||||
@@ -78,6 +86,13 @@ var _ QueueWithBatch = &RealFIFO{}
|
||||
// 1. delivers notifications for items that have been deleted
|
||||
// 2. delivers multiple notifications per item instead of simply the most recent value
|
||||
type RealFIFO struct {
|
||||
// logger is a per-instance logger. This gets chosen when constructing
|
||||
// the instance, with klog.Background() as default.
|
||||
logger klog.Logger
|
||||
|
||||
// name is the name of the fifo. It is included in the logger.
|
||||
name string
|
||||
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
@@ -623,10 +638,10 @@ func reconcileReplacement(
|
||||
deletedObj, exists, err := knownObjects.GetByKey(knownKey)
|
||||
if err != nil {
|
||||
deletedObj = nil
|
||||
utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err))
|
||||
utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
|
||||
} else if !exists {
|
||||
deletedObj = nil
|
||||
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey))
|
||||
utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
|
||||
}
|
||||
retErr := onDelete(DeletedFinalStateUnknown{
|
||||
Key: knownKey,
|
||||
@@ -683,10 +698,10 @@ func (f *RealFIFO) Resync() error {
|
||||
|
||||
knownObj, exists, err := f.knownObjects.GetByKey(knownKey)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err))
|
||||
utilruntime.HandleErrorWithLogger(f.logger, err, "Unable to queue object for sync", "key", knownKey)
|
||||
continue
|
||||
} else if !exists {
|
||||
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey))
|
||||
utilruntime.HandleErrorWithLogger(f.logger, nil, "Key does not exist in known objects store, unable to queue object for sync", "key", knownKey)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -739,6 +754,8 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
|
||||
}
|
||||
|
||||
f := &RealFIFO{
|
||||
logger: klog.Background(),
|
||||
name: "RealFIFO",
|
||||
items: make([]Delta, 0, 10),
|
||||
keyFunc: opts.KeyFunction,
|
||||
knownObjects: opts.KnownObjects,
|
||||
@@ -749,7 +766,13 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
|
||||
identifier: opts.Identifier,
|
||||
metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider),
|
||||
}
|
||||
|
||||
if opts.Logger != nil {
|
||||
f.logger = *opts.Logger
|
||||
}
|
||||
if opts.Name != "" {
|
||||
f.name = opts.Name
|
||||
}
|
||||
f.logger = klog.LoggerWithName(f.logger, f.name)
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user