Merge pull request #113471 from ncdc/gc-contextual-logging

garbagecollector: use contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-03-10 04:34:39 -08:00 committed by GitHub
commit cb00077cd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 253 additions and 123 deletions

View File

@ -506,6 +506,8 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
return nil, false, nil return nil, false, nil
} }
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "garbagecollector"))
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector") gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector") discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
@ -540,7 +542,7 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
// Periodically refresh the RESTMapper with new discovery information and sync // Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector. // the garbage collector.
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done()) go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second)
return garbageCollector, true, nil return garbageCollector, true, nil
} }

View File

@ -131,11 +131,11 @@ func NewGarbageCollector(
// resyncMonitors starts or stops resource monitors as needed to ensure that all // resyncMonitors starts or stops resource monitors as needed to ensure that all
// (and only) those resources present in the map are monitored. // (and only) those resources present in the map are monitored.
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error { func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResources map[schema.GroupVersionResource]struct{}) error {
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil { if err := gc.dependencyGraphBuilder.syncMonitors(logger, deletableResources); err != nil {
return err return err
} }
gc.dependencyGraphBuilder.startMonitors() gc.dependencyGraphBuilder.startMonitors(logger)
return nil return nil
} }
@ -151,21 +151,25 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")}) gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")})
defer gc.eventBroadcaster.Shutdown() defer gc.eventBroadcaster.Shutdown()
klog.Infof("Starting garbage collector controller") logger := klog.FromContext(ctx)
defer klog.Infof("Shutting down garbage collector controller") logger.Info("Starting controller", "controller", "garbagecollector")
defer logger.Info("Shutting down controller", "controller", "garbagecollector")
go gc.dependencyGraphBuilder.Run(ctx.Done()) graphLogger := klog.LoggerWithName(logger, "graphbuilder")
go gc.dependencyGraphBuilder.Run(klog.NewContext(ctx, graphLogger))
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), gc.dependencyGraphBuilder.IsSynced) { if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger)
}) {
return return
} }
klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage") logger.Info("All resource monitors have synced. Proceeding to collect garbage")
// gc workers // gc workers
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second) go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second)
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, ctx.Done()) go wait.Until(func() { gc.runAttemptToOrphanWorker(logger) }, 1*time.Second, ctx.Done())
} }
<-ctx.Done() <-ctx.Done()
@ -178,22 +182,24 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
// the mapper's underlying discovery client will be unnecessarily reset during // the mapper's underlying discovery client will be unnecessarily reset during
// the course of detecting new resources. // the course of detecting new resources.
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) { func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.ServerResourcesInterface, period time.Duration) {
oldResources := make(map[schema.GroupVersionResource]struct{}) oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() { wait.UntilWithContext(ctx, func(ctx context.Context) {
logger := klog.FromContext(ctx)
// Get the current resource list from discovery. // Get the current resource list from discovery.
newResources := GetDeletableResources(discoveryClient) newResources := GetDeletableResources(discoveryClient)
// This can occur if there is an internal error in GetDeletableResources. // This can occur if there is an internal error in GetDeletableResources.
if len(newResources) == 0 { if len(newResources) == 0 {
klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync") logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return return
} }
// Decide whether discovery has reported a change. // Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) { if reflect.DeepEqual(oldResources, newResources) {
klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync") logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync")
return return
} }
@ -204,26 +210,30 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
// Once we get here, we should not unpause workers until we've successfully synced // Once we get here, we should not unpause workers until we've successfully synced
attempt := 0 attempt := 0
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) {
attempt++ attempt++
// On a reattempt, check if available resources have changed // On a reattempt, check if available resources have changed
if attempt > 1 { if attempt > 1 {
newResources = GetDeletableResources(discoveryClient) newResources = GetDeletableResources(discoveryClient)
if len(newResources) == 0 { if len(newResources) == 0 {
klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt) logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil return false, nil
} }
} }
klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources)) logger.V(2).Info(
"syncing garbage collector with updated resources from discovery",
"attempt", attempt,
"diff", printDiff(oldResources, newResources),
)
// Resetting the REST mapper will also invalidate the underlying discovery // Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST // client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now. // mapper, but we'll deal with it for now.
gc.restMapper.Reset() gc.restMapper.Reset()
klog.V(4).Infof("reset restmapper") logger.V(4).Info("reset restmapper")
// Perform the monitor resync and wait for controllers to report cache sync. // Perform the monitor resync and wait for controllers to report cache sync.
// //
@ -234,19 +244,21 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
// discovery call if the resources appeared in-between the calls. In that // discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next // case, the restMapper will fail to map some of newResources until the next
// attempt. // attempt.
if err := gc.resyncMonitors(newResources); err != nil { if err := gc.resyncMonitors(logger, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err)) utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil return false, nil
} }
klog.V(4).Infof("resynced monitors") logger.V(4).Info("resynced monitors")
// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill. // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them. // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist. // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync. // note that workers stay paused until we successfully resync.
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) { if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger)
}) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil return false, nil
@ -254,14 +266,14 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
// success, break out of the loop // success, break out of the loop
return true, nil return true, nil
}, stopCh) })
// Finally, keep track of our new state. Do this after all preceding steps // Finally, keep track of our new state. Do this after all preceding steps
// have succeeded to ensure we'll retry on subsequent syncs if an error // have succeeded to ensure we'll retry on subsequent syncs if an error
// occurred. // occurred.
oldResources = newResources oldResources = newResources
klog.V(2).Infof("synced garbage collector") logger.V(2).Info("synced garbage collector")
}, period, stopCh) }, period)
} }
// printDiff returns a human-readable summary of what resources were added and removed // printDiff returns a human-readable summary of what resources were added and removed
@ -295,8 +307,8 @@ func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan
} }
// IsSynced returns true if dependencyGraphBuilder is synced. // IsSynced returns true if dependencyGraphBuilder is synced.
func (gc *GarbageCollector) IsSynced() bool { func (gc *GarbageCollector) IsSynced(logger klog.Logger) bool {
return gc.dependencyGraphBuilder.IsSynced() return gc.dependencyGraphBuilder.IsSynced(logger)
} }
func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) { func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
@ -342,18 +354,20 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
return forgetItem return forgetItem
} }
logger := klog.FromContext(ctx)
if !n.isObserved() { if !n.isObserved() {
nodeFromGraph, existsInGraph := gc.dependencyGraphBuilder.uidToNode.Read(n.identity.UID) nodeFromGraph, existsInGraph := gc.dependencyGraphBuilder.uidToNode.Read(n.identity.UID)
if !existsInGraph { if !existsInGraph {
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
// and in the meantime a deletion of the real object associated with that uid was observed // and in the meantime a deletion of the real object associated with that uid was observed
klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n) logger.V(5).Info("item no longer in the graph, skipping attemptToDeleteItem", "item", n.identity)
return forgetItem return forgetItem
} }
if nodeFromGraph.isObserved() { if nodeFromGraph.isObserved() {
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
// and in the meantime the real object associated with that uid was observed // and in the meantime the real object associated with that uid was observed
klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n) logger.V(5).Info("item no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", "item", n.identity)
return forgetItem return forgetItem
} }
} }
@ -374,7 +388,7 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
// have a way to distinguish this from a valid type we will recognize // have a way to distinguish this from a valid type we will recognize
// after the next discovery sync. // after the next discovery sync.
// For now, record the error and retry. // For now, record the error and retry.
klog.V(5).Infof("error syncing item %s: %v", n, err) logger.V(5).Error(err, "error syncing item", "item", n.identity)
} else { } else {
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err)) utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
} }
@ -384,7 +398,7 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
// requeue if item hasn't been observed via an informer event yet. // requeue if item hasn't been observed via an informer event yet.
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed. // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
// see https://issue.k8s.io/56121 // see https://issue.k8s.io/56121
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity) logger.V(5).Info("item hasn't been observed via informer yet", "item", n.identity)
return requeueItem return requeueItem
} }
@ -397,16 +411,24 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.OwnerReference, item *node) ( func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.OwnerReference, item *node) (
dangling bool, owner *metav1.PartialObjectMetadata, err error) { dangling bool, owner *metav1.PartialObjectMetadata, err error) {
logger := klog.FromContext(ctx)
// check for recorded absent cluster-scoped parent // check for recorded absent cluster-scoped parent
absentOwnerCacheKey := objectReference{OwnerReference: ownerReferenceCoordinates(reference)} absentOwnerCacheKey := objectReference{OwnerReference: ownerReferenceCoordinates(reference)}
if gc.absentOwnerCache.Has(absentOwnerCacheKey) { if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
klog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist",
"item", item.identity,
"owner", reference,
)
return true, nil, nil return true, nil, nil
} }
// check for recorded absent namespaced parent // check for recorded absent namespaced parent
absentOwnerCacheKey.Namespace = item.identity.Namespace absentOwnerCacheKey.Namespace = item.identity.Namespace
if gc.absentOwnerCache.Has(absentOwnerCacheKey) { if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
klog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist in namespace %s", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name, item.identity.Namespace) logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist in namespace",
"item", item.identity,
"owner", reference,
)
return true, nil, nil return true, nil, nil
} }
@ -427,7 +449,10 @@ func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.Own
if len(item.identity.Namespace) == 0 && namespaced { if len(item.identity.Namespace) == 0 && namespaced {
// item is a cluster-scoped object referring to a namespace-scoped owner, which is not valid. // item is a cluster-scoped object referring to a namespace-scoped owner, which is not valid.
// return a marker error, rather than retrying on the lookup failure forever. // return a marker error, rather than retrying on the lookup failure forever.
klog.V(2).Infof("object %s is cluster-scoped, but refers to a namespaced owner of type %s/%s", item.identity, reference.APIVersion, reference.Kind) logger.V(2).Info("item is cluster-scoped, but refers to a namespaced owner",
"item", item.identity,
"owner", reference,
)
return false, nil, namespacedOwnerOfClusterScopedObjectErr return false, nil, namespacedOwnerOfClusterScopedObjectErr
} }
@ -438,14 +463,20 @@ func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.Own
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
gc.absentOwnerCache.Add(absentOwnerCacheKey) gc.absentOwnerCache.Add(absentOwnerCacheKey)
klog.V(5).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) logger.V(5).Info("item's owner is not found",
"item", item.identity,
"owner", reference,
)
return true, nil, nil return true, nil, nil
case err != nil: case err != nil:
return false, nil, err return false, nil, err
} }
if owner.GetUID() != reference.UID { if owner.GetUID() != reference.UID {
klog.V(5).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) logger.V(5).Info("item's owner is not found, UID mismatch",
"item", item.identity,
"owner", reference,
)
gc.absentOwnerCache.Add(absentOwnerCacheKey) gc.absentOwnerCache.Add(absentOwnerCacheKey)
return true, nil, nil return true, nil, nil
} }
@ -498,12 +529,18 @@ func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID {
// if the API get request returns a NotFound error, or the retrieved item's uid does not match, // if the API get request returns a NotFound error, or the retrieved item's uid does not match,
// a virtual delete event for the node is enqueued and enqueuedVirtualDeleteEventErr is returned. // a virtual delete event for the node is enqueued and enqueuedVirtualDeleteEventErr is returned.
func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error { func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error {
klog.V(2).InfoS("Processing object", "object", klog.KRef(item.identity.Namespace, item.identity.Name), logger := klog.FromContext(ctx)
"objectUID", item.identity.UID, "kind", item.identity.Kind, "virtual", !item.isObserved())
logger.V(2).Info("Processing item",
"item", item.identity,
"virtual", !item.isObserved(),
)
// "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents. // "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents.
if item.isBeingDeleted() && !item.isDeletingDependents() { if item.isBeingDeleted() && !item.isDeletingDependents() {
klog.V(5).Infof("processing item %s returned at once, because its DeletionTimestamp is non-nil", item.identity) logger.V(5).Info("processing item returned at once, because its DeletionTimestamp is non-nil",
"item", item.identity,
)
return nil return nil
} }
// TODO: It's only necessary to talk to the API server if this is a // TODO: It's only necessary to talk to the API server if this is a
@ -515,7 +552,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
// the GraphBuilder can add "virtual" node for an owner that doesn't // the GraphBuilder can add "virtual" node for an owner that doesn't
// exist yet, so we need to enqueue a virtual Delete event to remove // exist yet, so we need to enqueue a virtual Delete event to remove
// the virtual node from GraphBuilder.uidToNode. // the virtual node from GraphBuilder.uidToNode.
klog.V(5).Infof("item %v not found, generating a virtual delete event", item.identity) logger.V(5).Info("item not found, generating a virtual delete event",
"item", item.identity,
)
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity) gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
return enqueuedVirtualDeleteEventErr return enqueuedVirtualDeleteEventErr
case err != nil: case err != nil:
@ -523,7 +562,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
} }
if latest.GetUID() != item.identity.UID { if latest.GetUID() != item.identity.UID {
klog.V(5).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity) logger.V(5).Info("UID doesn't match, item not found, generating a virtual delete event",
"item", item.identity,
)
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity) gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
return enqueuedVirtualDeleteEventErr return enqueuedVirtualDeleteEventErr
} }
@ -531,13 +572,15 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
// TODO: attemptToOrphanWorker() routine is similar. Consider merging // TODO: attemptToOrphanWorker() routine is similar. Consider merging
// attemptToOrphanWorker() into attemptToDeleteItem() as well. // attemptToOrphanWorker() into attemptToDeleteItem() as well.
if item.isDeletingDependents() { if item.isDeletingDependents() {
return gc.processDeletingDependentsItem(item) return gc.processDeletingDependentsItem(logger, item)
} }
// compute if we should delete the item // compute if we should delete the item
ownerReferences := latest.GetOwnerReferences() ownerReferences := latest.GetOwnerReferences()
if len(ownerReferences) == 0 { if len(ownerReferences) == 0 {
klog.V(2).Infof("object %s's doesn't have an owner, continue on next item", item.identity) logger.V(2).Info("item doesn't have an owner, continue on next item",
"item", item.identity,
)
return nil return nil
} }
@ -545,15 +588,27 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
if err != nil { if err != nil {
return err return err
} }
klog.V(5).Infof("classify references of %s.\nsolid: %#v\ndangling: %#v\nwaitingForDependentsDeletion: %#v\n", item.identity, solid, dangling, waitingForDependentsDeletion) logger.V(5).Info("classify item's references",
"item", item.identity,
"solid", solid,
"dangling", dangling,
"waitingForDependentsDeletion", waitingForDependentsDeletion,
)
switch { switch {
case len(solid) != 0: case len(solid) != 0:
klog.V(2).Infof("object %#v has at least one existing owner: %#v, will not garbage collect", item.identity, solid) logger.V(2).Info("item has at least one existing owner, will not garbage collect",
"item", item.identity,
"owner", solid,
)
if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 { if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 {
return nil return nil
} }
klog.V(2).Infof("remove dangling references %#v and waiting references %#v for object %s", dangling, waitingForDependentsDeletion, item.identity) logger.V(2).Info("remove dangling references and waiting references for item",
"item", item.identity,
"dangling", dangling,
"waitingForDependentsDeletion", waitingForDependentsDeletion,
)
// waitingForDependentsDeletion needs to be deleted from the // waitingForDependentsDeletion needs to be deleted from the
// ownerReferences, otherwise the referenced objects will be stuck with // ownerReferences, otherwise the referenced objects will be stuck with
// the FinalizerDeletingDependents and never get deleted. // the FinalizerDeletingDependents and never get deleted.
@ -575,7 +630,10 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
// problem. // problem.
// there are multiple workers run attemptToDeleteItem in // there are multiple workers run attemptToDeleteItem in
// parallel, the circle detection can fail in a race condition. // parallel, the circle detection can fail in a race condition.
klog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity) logger.V(2).Info("processing item, some of its owners and its dependent have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the item is going to be deleted with Foreground",
"item", item.identity,
"dependent", dep.identity,
)
patch, err := item.unblockOwnerReferencesStrategicMergePatch() patch, err := item.unblockOwnerReferencesStrategicMergePatch()
if err != nil { if err != nil {
return err return err
@ -586,7 +644,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
break break
} }
} }
klog.V(2).Infof("at least one owner of object %s has FinalizerDeletingDependents, and the object itself has dependents, so it is going to be deleted in Foreground", item.identity) logger.V(2).Info("at least one owner of item has FinalizerDeletingDependents, and the item itself has dependents, so it is going to be deleted in Foreground",
"item", item.identity,
)
// the deletion event will be observed by the graphBuilder, so the item // the deletion event will be observed by the graphBuilder, so the item
// will be processed again in processDeletingDependentsItem. If it // will be processed again in processDeletingDependentsItem. If it
// doesn't have dependents, the function will remove the // doesn't have dependents, the function will remove the
@ -610,22 +670,27 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
// otherwise, default to background. // otherwise, default to background.
policy = metav1.DeletePropagationBackground policy = metav1.DeletePropagationBackground
} }
klog.V(2).InfoS("Deleting object", "object", klog.KRef(item.identity.Namespace, item.identity.Name), logger.V(2).Info("Deleting item",
"objectUID", item.identity.UID, "kind", item.identity.Kind, "propagationPolicy", policy) "item", item.identity,
"propagationPolicy", policy,
)
return gc.deleteObject(item.identity, &policy) return gc.deleteObject(item.identity, &policy)
} }
} }
// process item that's waiting for its dependents to be deleted // process item that's waiting for its dependents to be deleted
func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error { func (gc *GarbageCollector) processDeletingDependentsItem(logger klog.Logger, item *node) error {
blockingDependents := item.blockingDependents() blockingDependents := item.blockingDependents()
if len(blockingDependents) == 0 { if len(blockingDependents) == 0 {
klog.V(2).Infof("remove DeleteDependents finalizer for item %s", item.identity) logger.V(2).Info("remove DeleteDependents finalizer for item", "item", item.identity)
return gc.removeFinalizer(item, metav1.FinalizerDeleteDependents) return gc.removeFinalizer(logger, item, metav1.FinalizerDeleteDependents)
} }
for _, dep := range blockingDependents { for _, dep := range blockingDependents {
if !dep.isDeletingDependents() { if !dep.isDeletingDependents() {
klog.V(2).Infof("adding %s to attemptToDelete, because its owner %s is deletingDependents", dep.identity, item.identity) logger.V(2).Info("adding dependent to attemptToDelete, because its owner is deletingDependents",
"item", item.identity,
"dependent", dep.identity,
)
gc.attemptToDelete.Add(dep) gc.attemptToDelete.Add(dep)
} }
} }
@ -633,7 +698,7 @@ func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error {
} }
// dependents are copies of pointers to the owner's dependents, they don't need to be locked. // dependents are copies of pointers to the owner's dependents, they don't need to be locked.
func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents []*node) error { func (gc *GarbageCollector) orphanDependents(logger klog.Logger, owner objectReference, dependents []*node) error {
errCh := make(chan error, len(dependents)) errCh := make(chan error, len(dependents))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(dependents)) wg.Add(len(dependents))
@ -667,12 +732,12 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [
if len(errorsSlice) != 0 { if len(errorsSlice) != 0 {
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error()) return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
} }
klog.V(5).Infof("successfully updated all dependents of owner %s", owner) logger.V(5).Info("successfully updated all dependents", "owner", owner)
return nil return nil
} }
func (gc *GarbageCollector) runAttemptToOrphanWorker() { func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
for gc.processAttemptToOrphanWorker() { for gc.processAttemptToOrphanWorker(logger) {
} }
} }
@ -681,7 +746,7 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker() {
// OwnerReferences of its dependents, and finally updates the owner to remove // OwnerReferences of its dependents, and finally updates the owner to remove
// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of // the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
// these steps fail. // these steps fail.
func (gc *GarbageCollector) processAttemptToOrphanWorker() bool { func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
item, quit := gc.attemptToOrphan.Get() item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock() gc.workerLock.RLock()
defer gc.workerLock.RUnlock() defer gc.workerLock.RUnlock()
@ -690,7 +755,7 @@ func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
} }
defer gc.attemptToOrphan.Done(item) defer gc.attemptToOrphan.Done(item)
action := gc.attemptToOrphanWorker(item) action := gc.attemptToOrphanWorker(logger, item)
switch action { switch action {
case forgetItem: case forgetItem:
gc.attemptToOrphan.Forget(item) gc.attemptToOrphan.Forget(item)
@ -701,7 +766,7 @@ func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
return true return true
} }
func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueItemAction { func (gc *GarbageCollector) attemptToOrphanWorker(logger klog.Logger, item interface{}) workQueueItemAction {
owner, ok := item.(*node) owner, ok := item.(*node)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
@ -715,13 +780,13 @@ func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueIte
} }
owner.dependentsLock.RUnlock() owner.dependentsLock.RUnlock()
err := gc.orphanDependents(owner.identity, dependents) err := gc.orphanDependents(logger, owner.identity, dependents)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err)) utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
return requeueItem return requeueItem
} }
// update the owner, remove "orphaningFinalizer" from its finalizers list // update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents) err = gc.removeFinalizer(logger, owner, metav1.FinalizerOrphanDependents)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err)) utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
return requeueItem return requeueItem

View File

@ -28,6 +28,8 @@ import (
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"github.com/golang/groupcache/lru" "github.com/golang/groupcache/lru"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -102,15 +104,17 @@ func TestGarbageCollectorConstruction(t *testing.T) {
} }
assert.Equal(t, 0, len(gc.dependencyGraphBuilder.monitors)) assert.Equal(t, 0, len(gc.dependencyGraphBuilder.monitors))
logger, _ := ktesting.NewTestContext(t)
// Make sure resource monitor syncing creates and stops resource monitors. // Make sure resource monitor syncing creates and stops resource monitors.
tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil) tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil)
err = gc.resyncMonitors(twoResources) err = gc.resyncMonitors(logger, twoResources)
if err != nil { if err != nil {
t.Errorf("Failed adding a monitor: %v", err) t.Errorf("Failed adding a monitor: %v", err)
} }
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors)) assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
err = gc.resyncMonitors(podResource) err = gc.resyncMonitors(logger, podResource)
if err != nil { if err != nil {
t.Errorf("Failed removing a monitor: %v", err) t.Errorf("Failed removing a monitor: %v", err)
} }
@ -121,13 +125,13 @@ func TestGarbageCollectorConstruction(t *testing.T) {
defer cancel() defer cancel()
go gc.Run(ctx, 1) go gc.Run(ctx, 1)
err = gc.resyncMonitors(twoResources) err = gc.resyncMonitors(logger, twoResources)
if err != nil { if err != nil {
t.Errorf("Failed adding a monitor: %v", err) t.Errorf("Failed adding a monitor: %v", err)
} }
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors)) assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
err = gc.resyncMonitors(podResource) err = gc.resyncMonitors(logger, podResource)
if err != nil { if err != nil {
t.Errorf("Failed removing a monitor: %v", err) t.Errorf("Failed removing a monitor: %v", err)
} }
@ -408,6 +412,8 @@ func TestProcessEvent(t *testing.T) {
alwaysStarted := make(chan struct{}) alwaysStarted := make(chan struct{})
close(alwaysStarted) close(alwaysStarted)
for _, scenario := range testScenarios { for _, scenario := range testScenarios {
logger, _ := ktesting.NewTestContext(t)
dependencyGraphBuilder := &GraphBuilder{ dependencyGraphBuilder := &GraphBuilder{
informersStarted: alwaysStarted, informersStarted: alwaysStarted,
graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
@ -420,7 +426,7 @@ func TestProcessEvent(t *testing.T) {
} }
for i := 0; i < len(scenario.events); i++ { for i := 0; i < len(scenario.events); i++ {
dependencyGraphBuilder.graphChanges.Add(&scenario.events[i]) dependencyGraphBuilder.graphChanges.Add(&scenario.events[i])
dependencyGraphBuilder.processGraphChanges() dependencyGraphBuilder.processGraphChanges(logger)
verifyGraphInvariants(scenario.name, dependencyGraphBuilder.uidToNode.uidToNode, t) verifyGraphInvariants(scenario.name, dependencyGraphBuilder.uidToNode.uidToNode, t)
} }
} }
@ -439,6 +445,8 @@ func BenchmarkReferencesDiffs(t *testing.B) {
// TestDependentsRace relies on golang's data race detector to check if there is // TestDependentsRace relies on golang's data race detector to check if there is
// data race among in the dependents field. // data race among in the dependents field.
func TestDependentsRace(t *testing.T) { func TestDependentsRace(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
gc := setupGC(t, &restclient.Config{}) gc := setupGC(t, &restclient.Config{})
defer close(gc.stop) defer close(gc.stop)
@ -452,7 +460,7 @@ func TestDependentsRace(t *testing.T) {
defer wg.Done() defer wg.Done()
for i := 0; i < updates; i++ { for i := 0; i < updates; i++ {
dependent := &node{} dependent := &node{}
gc.dependencyGraphBuilder.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) gc.dependencyGraphBuilder.addDependentToOwners(logger, dependent, []metav1.OwnerReference{{UID: ownerUID}})
gc.dependencyGraphBuilder.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) gc.dependencyGraphBuilder.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
} }
}() }()
@ -460,7 +468,7 @@ func TestDependentsRace(t *testing.T) {
defer wg.Done() defer wg.Done()
for i := 0; i < updates; i++ { for i := 0; i < updates; i++ {
gc.attemptToOrphan.Add(owner) gc.attemptToOrphan.Add(owner)
gc.processAttemptToOrphanWorker() gc.processAttemptToOrphanWorker(logger)
} }
}() }()
wg.Wait() wg.Wait()
@ -672,6 +680,8 @@ func TestUnblockOwnerReference(t *testing.T) {
} }
func TestOrphanDependentsFailure(t *testing.T) { func TestOrphanDependentsFailure(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testHandler := &fakeActionHandler{ testHandler := &fakeActionHandler{
response: map[string]FakeResponse{ response: map[string]FakeResponse{
"PATCH" + "/api/v1/namespaces/ns1/pods/pod": { "PATCH" + "/api/v1/namespaces/ns1/pods/pod": {
@ -698,7 +708,7 @@ func TestOrphanDependentsFailure(t *testing.T) {
}, },
}, },
} }
err := gc.orphanDependents(objectReference{}, dependents) err := gc.orphanDependents(logger, objectReference{}, dependents)
expected := `the server reported a conflict` expected := `the server reported a conflict`
if err == nil || !strings.Contains(err.Error(), expected) { if err == nil || !strings.Contains(err.Error(), expected) {
if err != nil { if err != nil {
@ -862,7 +872,8 @@ func TestGarbageCollectorSync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go gc.Run(ctx, 1) go gc.Run(ctx, 1)
// The pseudo-code of GarbageCollector.Sync(): // The pseudo-code of GarbageCollector.Sync():
@ -879,7 +890,7 @@ func TestGarbageCollectorSync(t *testing.T) {
// The 1s sleep in the test allows GetDeletableResources and // The 1s sleep in the test allows GetDeletableResources and
// gc.resyncMonitors to run ~5 times to ensure the changes to the // gc.resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up. // fakeDiscoveryClient are picked up.
go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, ctx.Done()) go gc.Sync(ctx, fakeDiscoveryClient, 200*time.Millisecond)
// Wait until the sync discovers the initial resources // Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -2286,8 +2297,11 @@ func TestConflictingData(t *testing.T) {
}, },
} }
logger, _ := ktesting.NewTestContext(t)
ctx := stepContext{ ctx := stepContext{
t: t, t: t,
logger: logger,
gc: gc, gc: gc,
eventRecorder: eventRecorder, eventRecorder: eventRecorder,
metadataClient: metadataClient, metadataClient: metadataClient,
@ -2396,6 +2410,7 @@ func makeMetadataObj(identity objectReference, owners ...objectReference) *metav
type stepContext struct { type stepContext struct {
t *testing.T t *testing.T
logger klog.Logger
gc *GarbageCollector gc *GarbageCollector
eventRecorder *record.FakeRecorder eventRecorder *record.FakeRecorder
metadataClient *fakemetadata.FakeMetadataClient metadataClient *fakemetadata.FakeMetadataClient
@ -2417,7 +2432,7 @@ func processPendingGraphChanges(count int) step {
if count <= 0 { if count <= 0 {
// process all // process all
for ctx.gc.dependencyGraphBuilder.graphChanges.Len() != 0 { for ctx.gc.dependencyGraphBuilder.graphChanges.Len() != 0 {
ctx.gc.dependencyGraphBuilder.processGraphChanges() ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
} }
} else { } else {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -2425,7 +2440,7 @@ func processPendingGraphChanges(count int) step {
ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1) ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1)
return return
} }
ctx.gc.dependencyGraphBuilder.processGraphChanges() ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
} }
} }
}, },
@ -2488,7 +2503,7 @@ func processEvent(e *event) step {
ctx.t.Fatalf("events present in graphChanges, must process pending graphChanges before calling processEvent") ctx.t.Fatalf("events present in graphChanges, must process pending graphChanges before calling processEvent")
} }
ctx.gc.dependencyGraphBuilder.graphChanges.Add(e) ctx.gc.dependencyGraphBuilder.graphChanges.Add(e)
ctx.gc.dependencyGraphBuilder.processGraphChanges() ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
}, },
} }
} }

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
) )
@ -30,10 +32,29 @@ type objectReference struct {
Namespace string Namespace string
} }
// String is used when logging an objectReference in text format.
func (s objectReference) String() string { func (s objectReference) String() string {
return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID) return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID)
} }
// MarshalLog is used when logging an objectReference in JSON format.
func (s objectReference) MarshalLog() interface{} {
return struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
APIVersion string `json:"apiVersion"`
UID types.UID `json:"uid"`
}{
Namespace: s.Namespace,
Name: s.Name,
APIVersion: s.APIVersion,
UID: s.UID,
}
}
var _ fmt.Stringer = objectReference{}
var _ logr.Marshaler = objectReference{}
// The single-threaded GraphBuilder.processGraphChanges() is the sole writer of the // The single-threaded GraphBuilder.processGraphChanges() is the sole writer of the
// nodes. The multi-threaded GarbageCollector.attemptToDeleteItem() reads the nodes. // nodes. The multi-threaded GarbageCollector.attemptToDeleteItem() reads the nodes.
// WARNING: node has different locks on different fields. setters and getters // WARNING: node has different locks on different fields. setters and getters

View File

@ -17,6 +17,7 @@ limitations under the License.
package garbagecollector package garbagecollector
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
@ -133,7 +134,7 @@ func (m *monitor) Run() {
type monitors map[schema.GroupVersionResource]*monitor type monitors map[schema.GroupVersionResource]*monitor
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) { func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{ handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges. // add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
@ -168,12 +169,13 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
gb.graphChanges.Add(event) gb.graphChanges.Add(event)
}, },
} }
shared, err := gb.sharedInformers.ForResource(resource) shared, err := gb.sharedInformers.ForResource(resource)
if err != nil { if err != nil {
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) logger.V(4).Error(err, "unable to use a shared informer", "resource", resource, "kind", kind)
return nil, nil, err return nil, nil, err
} }
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) logger.V(4).Info("using a shared informer", "resource", resource, "kind", kind)
// need to clone because it's from a shared cache // need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil return shared.Informer().GetController(), shared.Informer().GetStore(), nil
@ -185,7 +187,7 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
// instead of immediately exiting on an error. It may be called before or after // instead of immediately exiting on an error. It may be called before or after
// Run. Monitors are NOT started as part of the sync. To ensure all existing // Run. Monitors are NOT started as part of the sync. To ensure all existing
// monitors are started, call startMonitors. // monitors are started, call startMonitors.
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error { func (gb *GraphBuilder) syncMonitors(logger klog.Logger, resources map[schema.GroupVersionResource]struct{}) error {
gb.monitorLock.Lock() gb.monitorLock.Lock()
defer gb.monitorLock.Unlock() defer gb.monitorLock.Unlock()
@ -212,7 +214,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err)) errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue continue
} }
c, s, err := gb.controllerFor(resource, kind) c, s, err := gb.controllerFor(logger, resource, kind)
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err)) errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue continue
@ -228,7 +230,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
} }
} }
klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove)) logger.V(4).Info("synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
// NewAggregate returns nil if errs is 0-length // NewAggregate returns nil if errs is 0-length
return utilerrors.NewAggregate(errs) return utilerrors.NewAggregate(errs)
} }
@ -238,7 +240,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
// //
// If called before Run, startMonitors does nothing (as there is no stop channel // If called before Run, startMonitors does nothing (as there is no stop channel
// to support monitor/informer execution). // to support monitor/informer execution).
func (gb *GraphBuilder) startMonitors() { func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
gb.monitorLock.Lock() gb.monitorLock.Lock()
defer gb.monitorLock.Unlock() defer gb.monitorLock.Unlock()
@ -260,25 +262,25 @@ func (gb *GraphBuilder) startMonitors() {
started++ started++
} }
} }
klog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors)) logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
} }
// IsSynced returns true if any monitors exist AND all those monitors' // IsSynced returns true if any monitors exist AND all those monitors'
// controllers HasSynced functions return true. This means IsSynced could return // controllers HasSynced functions return true. This means IsSynced could return
// true at one time, and then later return false if all monitors were // true at one time, and then later return false if all monitors were
// reconstructed. // reconstructed.
func (gb *GraphBuilder) IsSynced() bool { func (gb *GraphBuilder) IsSynced(logger klog.Logger) bool {
gb.monitorLock.Lock() gb.monitorLock.Lock()
defer gb.monitorLock.Unlock() defer gb.monitorLock.Unlock()
if len(gb.monitors) == 0 { if len(gb.monitors) == 0 {
klog.V(4).Info("garbage controller monitor not synced: no monitors") logger.V(4).Info("garbage controller monitor not synced: no monitors")
return false return false
} }
for resource, monitor := range gb.monitors { for resource, monitor := range gb.monitors {
if !monitor.controller.HasSynced() { if !monitor.controller.HasSynced() {
klog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource) logger.V(4).Info("garbage controller monitor not yet synced", "resource", resource)
return false return false
} }
} }
@ -287,20 +289,21 @@ func (gb *GraphBuilder) IsSynced() bool {
// Run sets the stop channel and starts monitor execution until stopCh is // Run sets the stop channel and starts monitor execution until stopCh is
// closed. Any running monitors will be stopped before Run returns. // closed. Any running monitors will be stopped before Run returns.
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) { func (gb *GraphBuilder) Run(ctx context.Context) {
klog.Infof("GraphBuilder running") logger := klog.FromContext(ctx)
defer klog.Infof("GraphBuilder stopping") logger.Info("Running", "component", "GraphBuilder")
defer logger.Info("Stopping", "component", "GraphBuilder")
// Set up the stop channel. // Set up the stop channel.
gb.monitorLock.Lock() gb.monitorLock.Lock()
gb.stopCh = stopCh gb.stopCh = ctx.Done()
gb.running = true gb.running = true
gb.monitorLock.Unlock() gb.monitorLock.Unlock()
// Start monitors and begin change processing until the stop channel is // Start monitors and begin change processing until the stop channel is
// closed. // closed.
gb.startMonitors() gb.startMonitors(logger)
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) wait.Until(func() { gb.runProcessGraphChanges(logger) }, 1*time.Second, ctx.Done())
// Stop any running monitors. // Stop any running monitors.
gb.monitorLock.Lock() gb.monitorLock.Lock()
@ -316,7 +319,7 @@ func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
// reset monitors so that the graph builder can be safely re-run/synced. // reset monitors so that the graph builder can be safely re-run/synced.
gb.monitors = nil gb.monitors = nil
klog.Infof("stopped %d of %d monitors", stopped, len(monitors)) logger.Info("stopped monitors", "stopped", stopped, "total", len(monitors))
} }
var ignoredResources = map[schema.GroupResource]struct{}{ var ignoredResources = map[schema.GroupResource]struct{}{
@ -350,7 +353,7 @@ func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent // exist in the gb.uidToNode yet, a "virtual" node will be created to represent
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that // the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
// attemptToDeleteItem() will verify if the owner exists according to the API server. // attemptToDeleteItem() will verify if the owner exists according to the API server.
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) { func (gb *GraphBuilder) addDependentToOwners(logger klog.Logger, n *node, owners []metav1.OwnerReference) {
// track if some of the referenced owners already exist in the graph and have been observed, // track if some of the referenced owners already exist in the graph and have been observed,
// and the dependent's ownerRef does not match their observed coordinates // and the dependent's ownerRef does not match their observed coordinates
hasPotentiallyInvalidOwnerReference := false hasPotentiallyInvalidOwnerReference := false
@ -368,7 +371,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
dependents: make(map[*node]struct{}), dependents: make(map[*node]struct{}),
virtual: true, virtual: true,
} }
klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity) logger.V(5).Info("add virtual item", "identity", ownerNode.identity)
gb.uidToNode.Write(ownerNode) gb.uidToNode.Write(ownerNode)
} }
ownerNode.addDependent(n) ownerNode.addDependent(n)
@ -385,7 +388,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
// The owner node has been observed via an informer // The owner node has been observed via an informer
// the dependent's namespace doesn't match the observed owner's namespace, this is definitely wrong. // the dependent's namespace doesn't match the observed owner's namespace, this is definitely wrong.
// cluster-scoped owners can be referenced as an owner from any namespace or cluster-scoped object. // cluster-scoped owners can be referenced as an owner from any namespace or cluster-scoped object.
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", n.identity, ownerNode.identity) logger.V(2).Info("item references an owner but does not match namespaces", "item", n.identity, "owner", ownerNode.identity)
gb.reportInvalidNamespaceOwnerRef(n, owner.UID) gb.reportInvalidNamespaceOwnerRef(n, owner.UID)
} }
hasPotentiallyInvalidOwnerReference = true hasPotentiallyInvalidOwnerReference = true
@ -393,7 +396,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
if ownerNode.isObserved() { if ownerNode.isObserved() {
// The owner node has been observed via an informer // The owner node has been observed via an informer
// n's owner reference doesn't match the observed identity, this might be wrong. // n's owner reference doesn't match the observed identity, this might be wrong.
klog.V(2).Infof("node %s references an owner %s with coordinates that do not match the observed identity", n.identity, ownerNode.identity) logger.V(2).Info("item references an owner with coordinates that do not match the observed identity", "item", n.identity, "owner", ownerNode.identity)
} }
hasPotentiallyInvalidOwnerReference = true hasPotentiallyInvalidOwnerReference = true
} else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() { } else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() {
@ -447,9 +450,9 @@ func (gb *GraphBuilder) reportInvalidNamespaceOwnerRef(n *node, invalidOwnerUID
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed // insertNode insert the node to gb.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list. // in n.owners, and adds the node to their dependents list.
func (gb *GraphBuilder) insertNode(n *node) { func (gb *GraphBuilder) insertNode(logger klog.Logger, n *node) {
gb.uidToNode.Write(n) gb.uidToNode.Write(n)
gb.addDependentToOwners(n, n.owners) gb.addDependentToOwners(logger, n, n.owners)
} }
// removeDependentFromOwners remove n from owners' dependents list. // removeDependentFromOwners remove n from owners' dependents list.
@ -555,12 +558,12 @@ func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.O
// if an blocking ownerReference points to an object gets removed, or gets set to // if an blocking ownerReference points to an object gets removed, or gets set to
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue. // "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) { func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(logger klog.Logger, removed []metav1.OwnerReference, changed []ownerRefPair) {
for _, ref := range removed { for _, ref := range removed {
if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion { if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
node, found := gb.uidToNode.Read(ref.UID) node, found := gb.uidToNode.Read(ref.UID)
if !found { if !found {
klog.V(5).Infof("cannot find %s in uidToNode", ref.UID) logger.V(5).Info("cannot find uid in uidToNode", "uid", ref.UID)
continue continue
} }
gb.attemptToDelete.Add(node) gb.attemptToDelete.Add(node)
@ -572,7 +575,7 @@ func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerRe
if wasBlocked && isUnblocked { if wasBlocked && isUnblocked {
node, found := gb.uidToNode.Read(c.newRef.UID) node, found := gb.uidToNode.Read(c.newRef.UID)
if !found { if !found {
klog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID) logger.V(5).Info("cannot find uid in uidToNode", "uid", c.newRef.UID)
continue continue
} }
gb.attemptToDelete.Add(node) gb.attemptToDelete.Add(node)
@ -580,14 +583,14 @@ func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerRe
} }
} }
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) { func (gb *GraphBuilder) processTransitions(logger klog.Logger, oldObj interface{}, newAccessor metav1.Object, n *node) {
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) { if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
klog.V(5).Infof("add %s to the attemptToOrphan", n.identity) logger.V(5).Info("add item to attemptToOrphan", "item", n.identity)
gb.attemptToOrphan.Add(n) gb.attemptToOrphan.Add(n)
return return
} }
if startsWaitingForDependentsDeleted(oldObj, newAccessor) { if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity) logger.V(2).Info("add item to attemptToDelete, because it's waiting for its dependents to be deleted", "item", n.identity)
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here. // if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n.markDeletingDependents() n.markDeletingDependents()
for dep := range n.dependents { for dep := range n.dependents {
@ -597,8 +600,8 @@ func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav
} }
} }
func (gb *GraphBuilder) runProcessGraphChanges() { func (gb *GraphBuilder) runProcessGraphChanges(logger klog.Logger) {
for gb.processGraphChanges() { for gb.processGraphChanges(logger) {
} }
} }
@ -615,7 +618,7 @@ func identityFromEvent(event *event, accessor metav1.Object) objectReference {
} }
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue. // Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func (gb *GraphBuilder) processGraphChanges() bool { func (gb *GraphBuilder) processGraphChanges(logger klog.Logger) bool {
item, quit := gb.graphChanges.Get() item, quit := gb.graphChanges.Get()
if quit { if quit {
return false return false
@ -632,7 +635,16 @@ func (gb *GraphBuilder) processGraphChanges() bool {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true return true
} }
klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v, virtual=%v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType, event.virtual)
logger.V(5).Info("GraphBuilder process object",
"apiVersion", event.gvk.GroupVersion().String(),
"kind", event.gvk.Kind,
"object", klog.KObj(accessor),
"uid", string(accessor.GetUID()),
"eventType", event.eventType,
"virtual", event.virtual,
)
// Check if the node already exists // Check if the node already exists
existingNode, found := gb.uidToNode.Read(accessor.GetUID()) existingNode, found := gb.uidToNode.Read(accessor.GetUID())
if found && !event.virtual && !existingNode.isObserved() { if found && !event.virtual && !existingNode.isObserved() {
@ -650,14 +662,20 @@ func (gb *GraphBuilder) processGraphChanges() bool {
for _, dep := range potentiallyInvalidDependents { for _, dep := range potentiallyInvalidDependents {
if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace { if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {
// Namespace mismatch, this is definitely wrong // Namespace mismatch, this is definitely wrong
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", dep.identity, observedIdentity) logger.V(2).Info("item references an owner but does not match namespaces",
"item", dep.identity,
"owner", observedIdentity,
)
gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID) gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)
} }
gb.attemptToDelete.Add(dep) gb.attemptToDelete.Add(dep)
} }
// make a copy (so we don't modify the existing node in place), store the observed identity, and replace the virtual node // make a copy (so we don't modify the existing node in place), store the observed identity, and replace the virtual node
klog.V(2).Infof("replacing virtual node %s with observed node %s", existingNode.identity, observedIdentity) logger.V(2).Info("replacing virtual item with observed item",
"virtual", existingNode.identity,
"observed", observedIdentity,
)
existingNode = existingNode.clone() existingNode = existingNode.clone()
existingNode.identity = observedIdentity existingNode.identity = observedIdentity
gb.uidToNode.Write(existingNode) gb.uidToNode.Write(existingNode)
@ -673,21 +691,21 @@ func (gb *GraphBuilder) processGraphChanges() bool {
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor), deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
beingDeleted: beingDeleted(accessor), beingDeleted: beingDeleted(accessor),
} }
gb.insertNode(newNode) gb.insertNode(logger, newNode)
// the underlying delta_fifo may combine a creation and a deletion into // the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event. // one event, so we need to further process the event.
gb.processTransitions(event.oldObj, accessor, newNode) gb.processTransitions(logger, event.oldObj, accessor, newNode)
case (event.eventType == addEvent || event.eventType == updateEvent) && found: case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// handle changes in ownerReferences // handle changes in ownerReferences
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 { if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
// check if the changed dependency graph unblock owners that are // check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents. // waiting for the deletion of their dependents.
gb.addUnblockedOwnersToDeleteQueue(removed, changed) gb.addUnblockedOwnersToDeleteQueue(logger, removed, changed)
// update the node itself // update the node itself
existingNode.owners = accessor.GetOwnerReferences() existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists. // Add the node to its new owners' dependent lists.
gb.addDependentToOwners(existingNode, added) gb.addDependentToOwners(logger, existingNode, added)
// remove the node from the dependent list of node that are no longer in // remove the node from the dependent list of node that are no longer in
// the node's owners list. // the node's owners list.
gb.removeDependentFromOwners(existingNode, removed) gb.removeDependentFromOwners(existingNode, removed)
@ -696,10 +714,12 @@ func (gb *GraphBuilder) processGraphChanges() bool {
if beingDeleted(accessor) { if beingDeleted(accessor) {
existingNode.markBeingDeleted() existingNode.markBeingDeleted()
} }
gb.processTransitions(event.oldObj, accessor, existingNode) gb.processTransitions(logger, event.oldObj, accessor, existingNode)
case event.eventType == deleteEvent: case event.eventType == deleteEvent:
if !found { if !found {
klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID()) logger.V(5).Info("item doesn't exist in the graph, this shouldn't happen",
"item", accessor.GetUID(),
)
return true return true
} }

View File

@ -82,7 +82,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte, pt t
return gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(context.TODO(), item.Name, pt, patch, metav1.PatchOptions{}) return gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(context.TODO(), item.Name, pt, patch, metav1.PatchOptions{})
} }
func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string) error { func (gc *GarbageCollector) removeFinalizer(logger klog.Logger, owner *node, targetFinalizer string) error {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ownerObject, err := gc.getObject(owner.identity) ownerObject, err := gc.getObject(owner.identity)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
@ -106,7 +106,7 @@ func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string)
newFinalizers = append(newFinalizers, f) newFinalizers = append(newFinalizers, f)
} }
if !found { if !found {
klog.V(5).Infof("the %s finalizer is already removed from object %s", targetFinalizer, owner.identity) logger.V(5).Info("finalizer already removed from object", "finalizer", targetFinalizer, "object", owner.identity)
return nil return nil
} }

View File

@ -46,6 +46,8 @@ import (
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/controller-manager/pkg/informerfactory" "k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration"
@ -199,6 +201,7 @@ func createRandomCustomResourceDefinition(
} }
type testContext struct { type testContext struct {
logger klog.Logger
tearDown func() tearDown func()
gc *garbagecollector.GarbageCollector gc *garbagecollector.GarbageCollector
clientSet clientset.Interface clientSet clientset.Interface
@ -258,7 +261,8 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
t.Fatalf("failed to create garbage collector: %v", err) t.Fatalf("failed to create garbage collector: %v", err)
} }
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
tearDown := func() { tearDown := func() {
cancel() cancel()
result.TearDownFn() result.TearDownFn()
@ -272,7 +276,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
restMapper.Reset() restMapper.Reset()
}, syncPeriod, ctx.Done()) }, syncPeriod, ctx.Done())
go gc.Run(ctx, workers) go gc.Run(ctx, workers)
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done()) go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
} }
if workerCount > 0 { if workerCount > 0 {
@ -280,6 +284,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
} }
return &testContext{ return &testContext{
logger: logger,
tearDown: tearDown, tearDown: tearDown,
gc: gc, gc: gc,
clientSet: clientSet, clientSet: clientSet,
@ -1025,7 +1030,9 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
ctx.startGC(5) ctx.startGC(5)
timeout := make(chan struct{}) timeout := make(chan struct{})
time.AfterFunc(5*time.Second, func() { close(timeout) }) time.AfterFunc(5*time.Second, func() { close(timeout) })
if !cache.WaitForCacheSync(timeout, gc.IsSynced) { if !cache.WaitForCacheSync(timeout, func() bool {
return gc.IsSynced(ctx.logger)
}) {
t.Fatalf("failed to wait for garbage collector to be synced") t.Fatalf("failed to wait for garbage collector to be synced")
} }

View File

@ -2013,7 +2013,7 @@ func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config,
restMapper.Reset() restMapper.Reset()
}, syncPeriod, ctx.Done()) }, syncPeriod, ctx.Done())
go gc.Run(ctx, 1) go gc.Run(ctx, 1)
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done()) go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
} }
return startGC return startGC
} }