mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
garbagecollector: use contextual logging
Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
parent
77af0be42f
commit
26e3dab78b
@ -502,6 +502,8 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "garbagecollector"))
|
||||
|
||||
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
|
||||
|
||||
@ -536,7 +538,7 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
|
||||
|
||||
// Periodically refresh the RESTMapper with new discovery information and sync
|
||||
// the garbage collector.
|
||||
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done())
|
||||
go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second)
|
||||
|
||||
return garbageCollector, true, nil
|
||||
}
|
||||
|
@ -131,11 +131,11 @@ func NewGarbageCollector(
|
||||
|
||||
// resyncMonitors starts or stops resource monitors as needed to ensure that all
|
||||
// (and only) those resources present in the map are monitored.
|
||||
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
|
||||
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
|
||||
func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResources map[schema.GroupVersionResource]struct{}) error {
|
||||
if err := gc.dependencyGraphBuilder.syncMonitors(logger, deletableResources); err != nil {
|
||||
return err
|
||||
}
|
||||
gc.dependencyGraphBuilder.startMonitors()
|
||||
gc.dependencyGraphBuilder.startMonitors(logger)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -151,21 +151,25 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
||||
gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")})
|
||||
defer gc.eventBroadcaster.Shutdown()
|
||||
|
||||
klog.Infof("Starting garbage collector controller")
|
||||
defer klog.Infof("Shutting down garbage collector controller")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting controller", "controller", "garbagecollector")
|
||||
defer logger.Info("Shutting down controller", "controller", "garbagecollector")
|
||||
|
||||
go gc.dependencyGraphBuilder.Run(ctx.Done())
|
||||
graphLogger := klog.LoggerWithName(logger, "graphbuilder")
|
||||
go gc.dependencyGraphBuilder.Run(klog.NewContext(ctx, graphLogger))
|
||||
|
||||
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), gc.dependencyGraphBuilder.IsSynced) {
|
||||
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
|
||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||
}) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
|
||||
logger.Info("All resource monitors have synced. Proceeding to collect garbage")
|
||||
|
||||
// gc workers
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second)
|
||||
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, ctx.Done())
|
||||
go wait.Until(func() { gc.runAttemptToOrphanWorker(logger) }, 1*time.Second, ctx.Done())
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
@ -178,22 +182,24 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
||||
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
||||
// the mapper's underlying discovery client will be unnecessarily reset during
|
||||
// the course of detecting new resources.
|
||||
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
|
||||
func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.ServerResourcesInterface, period time.Duration) {
|
||||
oldResources := make(map[schema.GroupVersionResource]struct{})
|
||||
wait.Until(func() {
|
||||
wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// Get the current resource list from discovery.
|
||||
newResources := GetDeletableResources(discoveryClient)
|
||||
|
||||
// This can occur if there is an internal error in GetDeletableResources.
|
||||
if len(newResources) == 0 {
|
||||
klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
|
||||
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
|
||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||
return
|
||||
}
|
||||
|
||||
// Decide whether discovery has reported a change.
|
||||
if reflect.DeepEqual(oldResources, newResources) {
|
||||
klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
|
||||
logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync")
|
||||
return
|
||||
}
|
||||
|
||||
@ -204,26 +210,30 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
||||
|
||||
// Once we get here, we should not unpause workers until we've successfully synced
|
||||
attempt := 0
|
||||
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||
wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) {
|
||||
attempt++
|
||||
|
||||
// On a reattempt, check if available resources have changed
|
||||
if attempt > 1 {
|
||||
newResources = GetDeletableResources(discoveryClient)
|
||||
if len(newResources) == 0 {
|
||||
klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
|
||||
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
|
||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))
|
||||
logger.V(2).Info(
|
||||
"syncing garbage collector with updated resources from discovery",
|
||||
"attempt", attempt,
|
||||
"diff", printDiff(oldResources, newResources),
|
||||
)
|
||||
|
||||
// Resetting the REST mapper will also invalidate the underlying discovery
|
||||
// client. This is a leaky abstraction and assumes behavior about the REST
|
||||
// mapper, but we'll deal with it for now.
|
||||
gc.restMapper.Reset()
|
||||
klog.V(4).Infof("reset restmapper")
|
||||
logger.V(4).Info("reset restmapper")
|
||||
|
||||
// Perform the monitor resync and wait for controllers to report cache sync.
|
||||
//
|
||||
@ -234,19 +244,21 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
||||
// discovery call if the resources appeared in-between the calls. In that
|
||||
// case, the restMapper will fail to map some of newResources until the next
|
||||
// attempt.
|
||||
if err := gc.resyncMonitors(newResources); err != nil {
|
||||
if err := gc.resyncMonitors(logger, newResources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
|
||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).Infof("resynced monitors")
|
||||
logger.V(4).Info("resynced monitors")
|
||||
|
||||
// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
|
||||
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
||||
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
||||
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
|
||||
// note that workers stay paused until we successfully resync.
|
||||
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
|
||||
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
|
||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||
}) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
|
||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||
return false, nil
|
||||
@ -254,14 +266,14 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
||||
|
||||
// success, break out of the loop
|
||||
return true, nil
|
||||
}, stopCh)
|
||||
})
|
||||
|
||||
// Finally, keep track of our new state. Do this after all preceding steps
|
||||
// have succeeded to ensure we'll retry on subsequent syncs if an error
|
||||
// occurred.
|
||||
oldResources = newResources
|
||||
klog.V(2).Infof("synced garbage collector")
|
||||
}, period, stopCh)
|
||||
logger.V(2).Info("synced garbage collector")
|
||||
}, period)
|
||||
}
|
||||
|
||||
// printDiff returns a human-readable summary of what resources were added and removed
|
||||
@ -295,8 +307,8 @@ func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan
|
||||
}
|
||||
|
||||
// IsSynced returns true if dependencyGraphBuilder is synced.
|
||||
func (gc *GarbageCollector) IsSynced() bool {
|
||||
return gc.dependencyGraphBuilder.IsSynced()
|
||||
func (gc *GarbageCollector) IsSynced(logger klog.Logger) bool {
|
||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
|
||||
@ -342,18 +354,20 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
|
||||
return forgetItem
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
if !n.isObserved() {
|
||||
nodeFromGraph, existsInGraph := gc.dependencyGraphBuilder.uidToNode.Read(n.identity.UID)
|
||||
if !existsInGraph {
|
||||
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
||||
// and in the meantime a deletion of the real object associated with that uid was observed
|
||||
klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n)
|
||||
logger.V(5).Info("item no longer in the graph, skipping attemptToDeleteItem", "item", n.identity)
|
||||
return forgetItem
|
||||
}
|
||||
if nodeFromGraph.isObserved() {
|
||||
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
||||
// and in the meantime the real object associated with that uid was observed
|
||||
klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n)
|
||||
logger.V(5).Info("item no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", "item", n.identity)
|
||||
return forgetItem
|
||||
}
|
||||
}
|
||||
@ -374,7 +388,7 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
|
||||
// have a way to distinguish this from a valid type we will recognize
|
||||
// after the next discovery sync.
|
||||
// For now, record the error and retry.
|
||||
klog.V(5).Infof("error syncing item %s: %v", n, err)
|
||||
logger.V(5).Error(err, "error syncing item", "item", n.identity)
|
||||
} else {
|
||||
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
|
||||
}
|
||||
@ -384,7 +398,7 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
|
||||
// requeue if item hasn't been observed via an informer event yet.
|
||||
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
|
||||
// see https://issue.k8s.io/56121
|
||||
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
|
||||
logger.V(5).Info("item hasn't been observed via informer yet", "item", n.identity)
|
||||
return requeueItem
|
||||
}
|
||||
|
||||
@ -397,16 +411,24 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item inte
|
||||
func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.OwnerReference, item *node) (
|
||||
dangling bool, owner *metav1.PartialObjectMetadata, err error) {
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
// check for recorded absent cluster-scoped parent
|
||||
absentOwnerCacheKey := objectReference{OwnerReference: ownerReferenceCoordinates(reference)}
|
||||
if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
|
||||
klog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||
logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist",
|
||||
"item", item.identity,
|
||||
"owner", reference,
|
||||
)
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// check for recorded absent namespaced parent
|
||||
absentOwnerCacheKey.Namespace = item.identity.Namespace
|
||||
if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
|
||||
klog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist in namespace %s", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name, item.identity.Namespace)
|
||||
logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist in namespace",
|
||||
"item", item.identity,
|
||||
"owner", reference,
|
||||
)
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
@ -427,7 +449,10 @@ func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.Own
|
||||
if len(item.identity.Namespace) == 0 && namespaced {
|
||||
// item is a cluster-scoped object referring to a namespace-scoped owner, which is not valid.
|
||||
// return a marker error, rather than retrying on the lookup failure forever.
|
||||
klog.V(2).Infof("object %s is cluster-scoped, but refers to a namespaced owner of type %s/%s", item.identity, reference.APIVersion, reference.Kind)
|
||||
logger.V(2).Info("item is cluster-scoped, but refers to a namespaced owner",
|
||||
"item", item.identity,
|
||||
"owner", reference,
|
||||
)
|
||||
return false, nil, namespacedOwnerOfClusterScopedObjectErr
|
||||
}
|
||||
|
||||
@ -438,14 +463,20 @@ func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.Own
|
||||
switch {
|
||||
case errors.IsNotFound(err):
|
||||
gc.absentOwnerCache.Add(absentOwnerCacheKey)
|
||||
klog.V(5).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||
logger.V(5).Info("item's owner is not found",
|
||||
"item", item.identity,
|
||||
"owner", reference,
|
||||
)
|
||||
return true, nil, nil
|
||||
case err != nil:
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
if owner.GetUID() != reference.UID {
|
||||
klog.V(5).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||
logger.V(5).Info("item's owner is not found, UID mismatch",
|
||||
"item", item.identity,
|
||||
"owner", reference,
|
||||
)
|
||||
gc.absentOwnerCache.Add(absentOwnerCacheKey)
|
||||
return true, nil, nil
|
||||
}
|
||||
@ -498,12 +529,18 @@ func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID {
|
||||
// if the API get request returns a NotFound error, or the retrieved item's uid does not match,
|
||||
// a virtual delete event for the node is enqueued and enqueuedVirtualDeleteEventErr is returned.
|
||||
func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error {
|
||||
klog.V(2).InfoS("Processing object", "object", klog.KRef(item.identity.Namespace, item.identity.Name),
|
||||
"objectUID", item.identity.UID, "kind", item.identity.Kind, "virtual", !item.isObserved())
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
logger.V(2).Info("Processing item",
|
||||
"item", item.identity,
|
||||
"virtual", !item.isObserved(),
|
||||
)
|
||||
|
||||
// "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents.
|
||||
if item.isBeingDeleted() && !item.isDeletingDependents() {
|
||||
klog.V(5).Infof("processing item %s returned at once, because its DeletionTimestamp is non-nil", item.identity)
|
||||
logger.V(5).Info("processing item returned at once, because its DeletionTimestamp is non-nil",
|
||||
"item", item.identity,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
// TODO: It's only necessary to talk to the API server if this is a
|
||||
@ -515,7 +552,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
// the GraphBuilder can add "virtual" node for an owner that doesn't
|
||||
// exist yet, so we need to enqueue a virtual Delete event to remove
|
||||
// the virtual node from GraphBuilder.uidToNode.
|
||||
klog.V(5).Infof("item %v not found, generating a virtual delete event", item.identity)
|
||||
logger.V(5).Info("item not found, generating a virtual delete event",
|
||||
"item", item.identity,
|
||||
)
|
||||
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
|
||||
return enqueuedVirtualDeleteEventErr
|
||||
case err != nil:
|
||||
@ -523,7 +562,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
}
|
||||
|
||||
if latest.GetUID() != item.identity.UID {
|
||||
klog.V(5).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
|
||||
logger.V(5).Info("UID doesn't match, item not found, generating a virtual delete event",
|
||||
"item", item.identity,
|
||||
)
|
||||
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
|
||||
return enqueuedVirtualDeleteEventErr
|
||||
}
|
||||
@ -531,13 +572,15 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
// TODO: attemptToOrphanWorker() routine is similar. Consider merging
|
||||
// attemptToOrphanWorker() into attemptToDeleteItem() as well.
|
||||
if item.isDeletingDependents() {
|
||||
return gc.processDeletingDependentsItem(item)
|
||||
return gc.processDeletingDependentsItem(logger, item)
|
||||
}
|
||||
|
||||
// compute if we should delete the item
|
||||
ownerReferences := latest.GetOwnerReferences()
|
||||
if len(ownerReferences) == 0 {
|
||||
klog.V(2).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
|
||||
logger.V(2).Info("item doesn't have an owner, continue on next item",
|
||||
"item", item.identity,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -545,15 +588,27 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(5).Infof("classify references of %s.\nsolid: %#v\ndangling: %#v\nwaitingForDependentsDeletion: %#v\n", item.identity, solid, dangling, waitingForDependentsDeletion)
|
||||
logger.V(5).Info("classify item's references",
|
||||
"item", item.identity,
|
||||
"solid", solid,
|
||||
"dangling", dangling,
|
||||
"waitingForDependentsDeletion", waitingForDependentsDeletion,
|
||||
)
|
||||
|
||||
switch {
|
||||
case len(solid) != 0:
|
||||
klog.V(2).Infof("object %#v has at least one existing owner: %#v, will not garbage collect", item.identity, solid)
|
||||
logger.V(2).Info("item has at least one existing owner, will not garbage collect",
|
||||
"item", item.identity,
|
||||
"owner", solid,
|
||||
)
|
||||
if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 {
|
||||
return nil
|
||||
}
|
||||
klog.V(2).Infof("remove dangling references %#v and waiting references %#v for object %s", dangling, waitingForDependentsDeletion, item.identity)
|
||||
logger.V(2).Info("remove dangling references and waiting references for item",
|
||||
"item", item.identity,
|
||||
"dangling", dangling,
|
||||
"waitingForDependentsDeletion", waitingForDependentsDeletion,
|
||||
)
|
||||
// waitingForDependentsDeletion needs to be deleted from the
|
||||
// ownerReferences, otherwise the referenced objects will be stuck with
|
||||
// the FinalizerDeletingDependents and never get deleted.
|
||||
@ -575,7 +630,10 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
// problem.
|
||||
// there are multiple workers run attemptToDeleteItem in
|
||||
// parallel, the circle detection can fail in a race condition.
|
||||
klog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity)
|
||||
logger.V(2).Info("processing item, some of its owners and its dependent have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the item is going to be deleted with Foreground",
|
||||
"item", item.identity,
|
||||
"dependent", dep.identity,
|
||||
)
|
||||
patch, err := item.unblockOwnerReferencesStrategicMergePatch()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -586,7 +644,9 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
break
|
||||
}
|
||||
}
|
||||
klog.V(2).Infof("at least one owner of object %s has FinalizerDeletingDependents, and the object itself has dependents, so it is going to be deleted in Foreground", item.identity)
|
||||
logger.V(2).Info("at least one owner of item has FinalizerDeletingDependents, and the item itself has dependents, so it is going to be deleted in Foreground",
|
||||
"item", item.identity,
|
||||
)
|
||||
// the deletion event will be observed by the graphBuilder, so the item
|
||||
// will be processed again in processDeletingDependentsItem. If it
|
||||
// doesn't have dependents, the function will remove the
|
||||
@ -610,22 +670,27 @@ func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node)
|
||||
// otherwise, default to background.
|
||||
policy = metav1.DeletePropagationBackground
|
||||
}
|
||||
klog.V(2).InfoS("Deleting object", "object", klog.KRef(item.identity.Namespace, item.identity.Name),
|
||||
"objectUID", item.identity.UID, "kind", item.identity.Kind, "propagationPolicy", policy)
|
||||
logger.V(2).Info("Deleting item",
|
||||
"item", item.identity,
|
||||
"propagationPolicy", policy,
|
||||
)
|
||||
return gc.deleteObject(item.identity, &policy)
|
||||
}
|
||||
}
|
||||
|
||||
// process item that's waiting for its dependents to be deleted
|
||||
func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error {
|
||||
func (gc *GarbageCollector) processDeletingDependentsItem(logger klog.Logger, item *node) error {
|
||||
blockingDependents := item.blockingDependents()
|
||||
if len(blockingDependents) == 0 {
|
||||
klog.V(2).Infof("remove DeleteDependents finalizer for item %s", item.identity)
|
||||
return gc.removeFinalizer(item, metav1.FinalizerDeleteDependents)
|
||||
logger.V(2).Info("remove DeleteDependents finalizer for item", "item", item.identity)
|
||||
return gc.removeFinalizer(logger, item, metav1.FinalizerDeleteDependents)
|
||||
}
|
||||
for _, dep := range blockingDependents {
|
||||
if !dep.isDeletingDependents() {
|
||||
klog.V(2).Infof("adding %s to attemptToDelete, because its owner %s is deletingDependents", dep.identity, item.identity)
|
||||
logger.V(2).Info("adding dependent to attemptToDelete, because its owner is deletingDependents",
|
||||
"item", item.identity,
|
||||
"dependent", dep.identity,
|
||||
)
|
||||
gc.attemptToDelete.Add(dep)
|
||||
}
|
||||
}
|
||||
@ -633,7 +698,7 @@ func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error {
|
||||
}
|
||||
|
||||
// dependents are copies of pointers to the owner's dependents, they don't need to be locked.
|
||||
func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents []*node) error {
|
||||
func (gc *GarbageCollector) orphanDependents(logger klog.Logger, owner objectReference, dependents []*node) error {
|
||||
errCh := make(chan error, len(dependents))
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(dependents))
|
||||
@ -667,12 +732,12 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [
|
||||
if len(errorsSlice) != 0 {
|
||||
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
|
||||
}
|
||||
klog.V(5).Infof("successfully updated all dependents of owner %s", owner)
|
||||
logger.V(5).Info("successfully updated all dependents", "owner", owner)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
|
||||
for gc.processAttemptToOrphanWorker() {
|
||||
func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
|
||||
for gc.processAttemptToOrphanWorker(logger) {
|
||||
}
|
||||
}
|
||||
|
||||
@ -681,7 +746,7 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker() {
|
||||
// OwnerReferences of its dependents, and finally updates the owner to remove
|
||||
// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
|
||||
// these steps fail.
|
||||
func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
|
||||
func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
|
||||
item, quit := gc.attemptToOrphan.Get()
|
||||
gc.workerLock.RLock()
|
||||
defer gc.workerLock.RUnlock()
|
||||
@ -690,7 +755,7 @@ func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
|
||||
}
|
||||
defer gc.attemptToOrphan.Done(item)
|
||||
|
||||
action := gc.attemptToOrphanWorker(item)
|
||||
action := gc.attemptToOrphanWorker(logger, item)
|
||||
switch action {
|
||||
case forgetItem:
|
||||
gc.attemptToOrphan.Forget(item)
|
||||
@ -701,7 +766,7 @@ func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueItemAction {
|
||||
func (gc *GarbageCollector) attemptToOrphanWorker(logger klog.Logger, item interface{}) workQueueItemAction {
|
||||
owner, ok := item.(*node)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
|
||||
@ -715,13 +780,13 @@ func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueIte
|
||||
}
|
||||
owner.dependentsLock.RUnlock()
|
||||
|
||||
err := gc.orphanDependents(owner.identity, dependents)
|
||||
err := gc.orphanDependents(logger, owner.identity, dependents)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
|
||||
return requeueItem
|
||||
}
|
||||
// update the owner, remove "orphaningFinalizer" from its finalizers list
|
||||
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
|
||||
err = gc.removeFinalizer(logger, owner, metav1.FinalizerOrphanDependents)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
|
||||
return requeueItem
|
||||
|
@ -28,6 +28,8 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
|
||||
"github.com/golang/groupcache/lru"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
@ -102,15 +104,17 @@ func TestGarbageCollectorConstruction(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, 0, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
// Make sure resource monitor syncing creates and stops resource monitors.
|
||||
tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil)
|
||||
err = gc.resyncMonitors(twoResources)
|
||||
err = gc.resyncMonitors(logger, twoResources)
|
||||
if err != nil {
|
||||
t.Errorf("Failed adding a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
err = gc.resyncMonitors(podResource)
|
||||
err = gc.resyncMonitors(logger, podResource)
|
||||
if err != nil {
|
||||
t.Errorf("Failed removing a monitor: %v", err)
|
||||
}
|
||||
@ -121,13 +125,13 @@ func TestGarbageCollectorConstruction(t *testing.T) {
|
||||
defer cancel()
|
||||
go gc.Run(ctx, 1)
|
||||
|
||||
err = gc.resyncMonitors(twoResources)
|
||||
err = gc.resyncMonitors(logger, twoResources)
|
||||
if err != nil {
|
||||
t.Errorf("Failed adding a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
err = gc.resyncMonitors(podResource)
|
||||
err = gc.resyncMonitors(logger, podResource)
|
||||
if err != nil {
|
||||
t.Errorf("Failed removing a monitor: %v", err)
|
||||
}
|
||||
@ -408,6 +412,8 @@ func TestProcessEvent(t *testing.T) {
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
for _, scenario := range testScenarios {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
dependencyGraphBuilder := &GraphBuilder{
|
||||
informersStarted: alwaysStarted,
|
||||
graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
@ -420,7 +426,7 @@ func TestProcessEvent(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < len(scenario.events); i++ {
|
||||
dependencyGraphBuilder.graphChanges.Add(&scenario.events[i])
|
||||
dependencyGraphBuilder.processGraphChanges()
|
||||
dependencyGraphBuilder.processGraphChanges(logger)
|
||||
verifyGraphInvariants(scenario.name, dependencyGraphBuilder.uidToNode.uidToNode, t)
|
||||
}
|
||||
}
|
||||
@ -439,6 +445,8 @@ func BenchmarkReferencesDiffs(t *testing.B) {
|
||||
// TestDependentsRace relies on golang's data race detector to check if there is
|
||||
// data race among in the dependents field.
|
||||
func TestDependentsRace(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
gc := setupGC(t, &restclient.Config{})
|
||||
defer close(gc.stop)
|
||||
|
||||
@ -452,7 +460,7 @@ func TestDependentsRace(t *testing.T) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < updates; i++ {
|
||||
dependent := &node{}
|
||||
gc.dependencyGraphBuilder.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
|
||||
gc.dependencyGraphBuilder.addDependentToOwners(logger, dependent, []metav1.OwnerReference{{UID: ownerUID}})
|
||||
gc.dependencyGraphBuilder.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
|
||||
}
|
||||
}()
|
||||
@ -460,7 +468,7 @@ func TestDependentsRace(t *testing.T) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < updates; i++ {
|
||||
gc.attemptToOrphan.Add(owner)
|
||||
gc.processAttemptToOrphanWorker()
|
||||
gc.processAttemptToOrphanWorker(logger)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
@ -672,6 +680,8 @@ func TestUnblockOwnerReference(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOrphanDependentsFailure(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
testHandler := &fakeActionHandler{
|
||||
response: map[string]FakeResponse{
|
||||
"PATCH" + "/api/v1/namespaces/ns1/pods/pod": {
|
||||
@ -698,7 +708,7 @@ func TestOrphanDependentsFailure(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
err := gc.orphanDependents(objectReference{}, dependents)
|
||||
err := gc.orphanDependents(logger, objectReference{}, dependents)
|
||||
expected := `the server reported a conflict`
|
||||
if err == nil || !strings.Contains(err.Error(), expected) {
|
||||
if err != nil {
|
||||
@ -862,7 +872,8 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go gc.Run(ctx, 1)
|
||||
// The pseudo-code of GarbageCollector.Sync():
|
||||
@ -879,7 +890,7 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
// The 1s sleep in the test allows GetDeletableResources and
|
||||
// gc.resyncMonitors to run ~5 times to ensure the changes to the
|
||||
// fakeDiscoveryClient are picked up.
|
||||
go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, ctx.Done())
|
||||
go gc.Sync(ctx, fakeDiscoveryClient, 200*time.Millisecond)
|
||||
|
||||
// Wait until the sync discovers the initial resources
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -2286,8 +2297,11 @@ func TestConflictingData(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
ctx := stepContext{
|
||||
t: t,
|
||||
logger: logger,
|
||||
gc: gc,
|
||||
eventRecorder: eventRecorder,
|
||||
metadataClient: metadataClient,
|
||||
@ -2396,6 +2410,7 @@ func makeMetadataObj(identity objectReference, owners ...objectReference) *metav
|
||||
|
||||
type stepContext struct {
|
||||
t *testing.T
|
||||
logger klog.Logger
|
||||
gc *GarbageCollector
|
||||
eventRecorder *record.FakeRecorder
|
||||
metadataClient *fakemetadata.FakeMetadataClient
|
||||
@ -2417,7 +2432,7 @@ func processPendingGraphChanges(count int) step {
|
||||
if count <= 0 {
|
||||
// process all
|
||||
for ctx.gc.dependencyGraphBuilder.graphChanges.Len() != 0 {
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges()
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
|
||||
}
|
||||
} else {
|
||||
for i := 0; i < count; i++ {
|
||||
@ -2425,7 +2440,7 @@ func processPendingGraphChanges(count int) step {
|
||||
ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1)
|
||||
return
|
||||
}
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges()
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -2488,7 +2503,7 @@ func processEvent(e *event) step {
|
||||
ctx.t.Fatalf("events present in graphChanges, must process pending graphChanges before calling processEvent")
|
||||
}
|
||||
ctx.gc.dependencyGraphBuilder.graphChanges.Add(e)
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges()
|
||||
ctx.gc.dependencyGraphBuilder.processGraphChanges(ctx.logger)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package garbagecollector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
@ -133,7 +134,7 @@ func (m *monitor) Run() {
|
||||
|
||||
type monitors map[schema.GroupVersionResource]*monitor
|
||||
|
||||
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
|
||||
func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
|
||||
handlers := cache.ResourceEventHandlerFuncs{
|
||||
// add the event to the dependencyGraphBuilder's graphChanges.
|
||||
AddFunc: func(obj interface{}) {
|
||||
@ -168,12 +169,13 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
}
|
||||
|
||||
shared, err := gb.sharedInformers.ForResource(resource)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
|
||||
logger.V(4).Error(err, "unable to use a shared informer", "resource", resource, "kind", kind)
|
||||
return nil, nil, err
|
||||
}
|
||||
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
|
||||
logger.V(4).Info("using a shared informer", "resource", resource, "kind", kind)
|
||||
// need to clone because it's from a shared cache
|
||||
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
|
||||
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
|
||||
@ -185,7 +187,7 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
||||
// instead of immediately exiting on an error. It may be called before or after
|
||||
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||
// monitors are started, call startMonitors.
|
||||
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
func (gb *GraphBuilder) syncMonitors(logger klog.Logger, resources map[schema.GroupVersionResource]struct{}) error {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
@ -212,7 +214,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
|
||||
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
|
||||
continue
|
||||
}
|
||||
c, s, err := gb.controllerFor(resource, kind)
|
||||
c, s, err := gb.controllerFor(logger, resource, kind)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
|
||||
continue
|
||||
@ -228,7 +230,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
|
||||
logger.V(4).Info("synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
|
||||
// NewAggregate returns nil if errs is 0-length
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
@ -238,7 +240,7 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
|
||||
//
|
||||
// If called before Run, startMonitors does nothing (as there is no stop channel
|
||||
// to support monitor/informer execution).
|
||||
func (gb *GraphBuilder) startMonitors() {
|
||||
func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
@ -260,25 +262,25 @@ func (gb *GraphBuilder) startMonitors() {
|
||||
started++
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
|
||||
logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
|
||||
}
|
||||
|
||||
// IsSynced returns true if any monitors exist AND all those monitors'
|
||||
// controllers HasSynced functions return true. This means IsSynced could return
|
||||
// true at one time, and then later return false if all monitors were
|
||||
// reconstructed.
|
||||
func (gb *GraphBuilder) IsSynced() bool {
|
||||
func (gb *GraphBuilder) IsSynced(logger klog.Logger) bool {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
if len(gb.monitors) == 0 {
|
||||
klog.V(4).Info("garbage controller monitor not synced: no monitors")
|
||||
logger.V(4).Info("garbage controller monitor not synced: no monitors")
|
||||
return false
|
||||
}
|
||||
|
||||
for resource, monitor := range gb.monitors {
|
||||
if !monitor.controller.HasSynced() {
|
||||
klog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
|
||||
logger.V(4).Info("garbage controller monitor not yet synced", "resource", resource)
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -287,20 +289,21 @@ func (gb *GraphBuilder) IsSynced() bool {
|
||||
|
||||
// Run sets the stop channel and starts monitor execution until stopCh is
|
||||
// closed. Any running monitors will be stopped before Run returns.
|
||||
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
|
||||
klog.Infof("GraphBuilder running")
|
||||
defer klog.Infof("GraphBuilder stopping")
|
||||
func (gb *GraphBuilder) Run(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Running", "component", "GraphBuilder")
|
||||
defer logger.Info("Stopping", "component", "GraphBuilder")
|
||||
|
||||
// Set up the stop channel.
|
||||
gb.monitorLock.Lock()
|
||||
gb.stopCh = stopCh
|
||||
gb.stopCh = ctx.Done()
|
||||
gb.running = true
|
||||
gb.monitorLock.Unlock()
|
||||
|
||||
// Start monitors and begin change processing until the stop channel is
|
||||
// closed.
|
||||
gb.startMonitors()
|
||||
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
|
||||
gb.startMonitors(logger)
|
||||
wait.Until(func() { gb.runProcessGraphChanges(logger) }, 1*time.Second, ctx.Done())
|
||||
|
||||
// Stop any running monitors.
|
||||
gb.monitorLock.Lock()
|
||||
@ -316,7 +319,7 @@ func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
|
||||
|
||||
// reset monitors so that the graph builder can be safely re-run/synced.
|
||||
gb.monitors = nil
|
||||
klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
|
||||
logger.Info("stopped monitors", "stopped", stopped, "total", len(monitors))
|
||||
}
|
||||
|
||||
var ignoredResources = map[schema.GroupResource]struct{}{
|
||||
@ -350,7 +353,7 @@ func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
|
||||
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent
|
||||
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
|
||||
// attemptToDeleteItem() will verify if the owner exists according to the API server.
|
||||
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
|
||||
func (gb *GraphBuilder) addDependentToOwners(logger klog.Logger, n *node, owners []metav1.OwnerReference) {
|
||||
// track if some of the referenced owners already exist in the graph and have been observed,
|
||||
// and the dependent's ownerRef does not match their observed coordinates
|
||||
hasPotentiallyInvalidOwnerReference := false
|
||||
@ -368,7 +371,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
|
||||
dependents: make(map[*node]struct{}),
|
||||
virtual: true,
|
||||
}
|
||||
klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
|
||||
logger.V(5).Info("add virtual item", "identity", ownerNode.identity)
|
||||
gb.uidToNode.Write(ownerNode)
|
||||
}
|
||||
ownerNode.addDependent(n)
|
||||
@ -385,7 +388,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
|
||||
// The owner node has been observed via an informer
|
||||
// the dependent's namespace doesn't match the observed owner's namespace, this is definitely wrong.
|
||||
// cluster-scoped owners can be referenced as an owner from any namespace or cluster-scoped object.
|
||||
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", n.identity, ownerNode.identity)
|
||||
logger.V(2).Info("item references an owner but does not match namespaces", "item", n.identity, "owner", ownerNode.identity)
|
||||
gb.reportInvalidNamespaceOwnerRef(n, owner.UID)
|
||||
}
|
||||
hasPotentiallyInvalidOwnerReference = true
|
||||
@ -393,7 +396,7 @@ func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerRefer
|
||||
if ownerNode.isObserved() {
|
||||
// The owner node has been observed via an informer
|
||||
// n's owner reference doesn't match the observed identity, this might be wrong.
|
||||
klog.V(2).Infof("node %s references an owner %s with coordinates that do not match the observed identity", n.identity, ownerNode.identity)
|
||||
logger.V(2).Info("item references an owner with coordinates that do not match the observed identity", "item", n.identity, "owner", ownerNode.identity)
|
||||
}
|
||||
hasPotentiallyInvalidOwnerReference = true
|
||||
} else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() {
|
||||
@ -447,9 +450,9 @@ func (gb *GraphBuilder) reportInvalidNamespaceOwnerRef(n *node, invalidOwnerUID
|
||||
|
||||
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed
|
||||
// in n.owners, and adds the node to their dependents list.
|
||||
func (gb *GraphBuilder) insertNode(n *node) {
|
||||
func (gb *GraphBuilder) insertNode(logger klog.Logger, n *node) {
|
||||
gb.uidToNode.Write(n)
|
||||
gb.addDependentToOwners(n, n.owners)
|
||||
gb.addDependentToOwners(logger, n, n.owners)
|
||||
}
|
||||
|
||||
// removeDependentFromOwners remove n from owners' dependents list.
|
||||
@ -555,12 +558,12 @@ func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.O
|
||||
|
||||
// if an blocking ownerReference points to an object gets removed, or gets set to
|
||||
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
|
||||
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
|
||||
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(logger klog.Logger, removed []metav1.OwnerReference, changed []ownerRefPair) {
|
||||
for _, ref := range removed {
|
||||
if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
|
||||
node, found := gb.uidToNode.Read(ref.UID)
|
||||
if !found {
|
||||
klog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
|
||||
logger.V(5).Info("cannot find uid in uidToNode", "uid", ref.UID)
|
||||
continue
|
||||
}
|
||||
gb.attemptToDelete.Add(node)
|
||||
@ -572,7 +575,7 @@ func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerRe
|
||||
if wasBlocked && isUnblocked {
|
||||
node, found := gb.uidToNode.Read(c.newRef.UID)
|
||||
if !found {
|
||||
klog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
|
||||
logger.V(5).Info("cannot find uid in uidToNode", "uid", c.newRef.UID)
|
||||
continue
|
||||
}
|
||||
gb.attemptToDelete.Add(node)
|
||||
@ -580,14 +583,14 @@ func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerRe
|
||||
}
|
||||
}
|
||||
|
||||
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
|
||||
func (gb *GraphBuilder) processTransitions(logger klog.Logger, oldObj interface{}, newAccessor metav1.Object, n *node) {
|
||||
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
|
||||
klog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
|
||||
logger.V(5).Info("add item to attemptToOrphan", "item", n.identity)
|
||||
gb.attemptToOrphan.Add(n)
|
||||
return
|
||||
}
|
||||
if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
|
||||
klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
|
||||
logger.V(2).Info("add item to attemptToDelete, because it's waiting for its dependents to be deleted", "item", n.identity)
|
||||
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
|
||||
n.markDeletingDependents()
|
||||
for dep := range n.dependents {
|
||||
@ -597,8 +600,8 @@ func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav
|
||||
}
|
||||
}
|
||||
|
||||
func (gb *GraphBuilder) runProcessGraphChanges() {
|
||||
for gb.processGraphChanges() {
|
||||
func (gb *GraphBuilder) runProcessGraphChanges(logger klog.Logger) {
|
||||
for gb.processGraphChanges(logger) {
|
||||
}
|
||||
}
|
||||
|
||||
@ -615,7 +618,7 @@ func identityFromEvent(event *event, accessor metav1.Object) objectReference {
|
||||
}
|
||||
|
||||
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
|
||||
func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
func (gb *GraphBuilder) processGraphChanges(logger klog.Logger) bool {
|
||||
item, quit := gb.graphChanges.Get()
|
||||
if quit {
|
||||
return false
|
||||
@ -632,7 +635,16 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||
return true
|
||||
}
|
||||
klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v, virtual=%v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType, event.virtual)
|
||||
|
||||
logger.V(5).Info("GraphBuilder process object",
|
||||
"apiVersion", event.gvk.GroupVersion().String(),
|
||||
"kind", event.gvk.Kind,
|
||||
"object", klog.KObj(accessor),
|
||||
"uid", string(accessor.GetUID()),
|
||||
"eventType", event.eventType,
|
||||
"virtual", event.virtual,
|
||||
)
|
||||
|
||||
// Check if the node already exists
|
||||
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
|
||||
if found && !event.virtual && !existingNode.isObserved() {
|
||||
@ -650,14 +662,20 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
for _, dep := range potentiallyInvalidDependents {
|
||||
if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {
|
||||
// Namespace mismatch, this is definitely wrong
|
||||
klog.V(2).Infof("node %s references an owner %s but does not match namespaces", dep.identity, observedIdentity)
|
||||
logger.V(2).Info("item references an owner but does not match namespaces",
|
||||
"item", dep.identity,
|
||||
"owner", observedIdentity,
|
||||
)
|
||||
gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)
|
||||
}
|
||||
gb.attemptToDelete.Add(dep)
|
||||
}
|
||||
|
||||
// make a copy (so we don't modify the existing node in place), store the observed identity, and replace the virtual node
|
||||
klog.V(2).Infof("replacing virtual node %s with observed node %s", existingNode.identity, observedIdentity)
|
||||
logger.V(2).Info("replacing virtual item with observed item",
|
||||
"virtual", existingNode.identity,
|
||||
"observed", observedIdentity,
|
||||
)
|
||||
existingNode = existingNode.clone()
|
||||
existingNode.identity = observedIdentity
|
||||
gb.uidToNode.Write(existingNode)
|
||||
@ -673,21 +691,21 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
|
||||
beingDeleted: beingDeleted(accessor),
|
||||
}
|
||||
gb.insertNode(newNode)
|
||||
gb.insertNode(logger, newNode)
|
||||
// the underlying delta_fifo may combine a creation and a deletion into
|
||||
// one event, so we need to further process the event.
|
||||
gb.processTransitions(event.oldObj, accessor, newNode)
|
||||
gb.processTransitions(logger, event.oldObj, accessor, newNode)
|
||||
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
|
||||
// handle changes in ownerReferences
|
||||
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
|
||||
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
|
||||
// check if the changed dependency graph unblock owners that are
|
||||
// waiting for the deletion of their dependents.
|
||||
gb.addUnblockedOwnersToDeleteQueue(removed, changed)
|
||||
gb.addUnblockedOwnersToDeleteQueue(logger, removed, changed)
|
||||
// update the node itself
|
||||
existingNode.owners = accessor.GetOwnerReferences()
|
||||
// Add the node to its new owners' dependent lists.
|
||||
gb.addDependentToOwners(existingNode, added)
|
||||
gb.addDependentToOwners(logger, existingNode, added)
|
||||
// remove the node from the dependent list of node that are no longer in
|
||||
// the node's owners list.
|
||||
gb.removeDependentFromOwners(existingNode, removed)
|
||||
@ -696,10 +714,12 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
if beingDeleted(accessor) {
|
||||
existingNode.markBeingDeleted()
|
||||
}
|
||||
gb.processTransitions(event.oldObj, accessor, existingNode)
|
||||
gb.processTransitions(logger, event.oldObj, accessor, existingNode)
|
||||
case event.eventType == deleteEvent:
|
||||
if !found {
|
||||
klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
|
||||
logger.V(5).Info("item doesn't exist in the graph, this shouldn't happen",
|
||||
"item", accessor.GetUID(),
|
||||
)
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte, pt t
|
||||
return gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(context.TODO(), item.Name, pt, patch, metav1.PatchOptions{})
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string) error {
|
||||
func (gc *GarbageCollector) removeFinalizer(logger klog.Logger, owner *node, targetFinalizer string) error {
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
ownerObject, err := gc.getObject(owner.identity)
|
||||
if errors.IsNotFound(err) {
|
||||
@ -106,7 +106,7 @@ func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string)
|
||||
newFinalizers = append(newFinalizers, f)
|
||||
}
|
||||
if !found {
|
||||
klog.V(5).Infof("the %s finalizer is already removed from object %s", targetFinalizer, owner.identity)
|
||||
logger.V(5).Info("finalizer already removed from object", "finalizer", targetFinalizer, "object", owner.identity)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,8 @@ import (
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/test/integration"
|
||||
@ -199,6 +201,7 @@ func createRandomCustomResourceDefinition(
|
||||
}
|
||||
|
||||
type testContext struct {
|
||||
logger klog.Logger
|
||||
tearDown func()
|
||||
gc *garbagecollector.GarbageCollector
|
||||
clientSet clientset.Interface
|
||||
@ -258,7 +261,8 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
|
||||
t.Fatalf("failed to create garbage collector: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
tearDown := func() {
|
||||
cancel()
|
||||
result.TearDownFn()
|
||||
@ -272,7 +276,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
|
||||
restMapper.Reset()
|
||||
}, syncPeriod, ctx.Done())
|
||||
go gc.Run(ctx, workers)
|
||||
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done())
|
||||
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
|
||||
}
|
||||
|
||||
if workerCount > 0 {
|
||||
@ -280,6 +284,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
|
||||
}
|
||||
|
||||
return &testContext{
|
||||
logger: logger,
|
||||
tearDown: tearDown,
|
||||
gc: gc,
|
||||
clientSet: clientSet,
|
||||
@ -1025,7 +1030,9 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
||||
ctx.startGC(5)
|
||||
timeout := make(chan struct{})
|
||||
time.AfterFunc(5*time.Second, func() { close(timeout) })
|
||||
if !cache.WaitForCacheSync(timeout, gc.IsSynced) {
|
||||
if !cache.WaitForCacheSync(timeout, func() bool {
|
||||
return gc.IsSynced(ctx.logger)
|
||||
}) {
|
||||
t.Fatalf("failed to wait for garbage collector to be synced")
|
||||
}
|
||||
|
||||
|
@ -1849,7 +1849,7 @@ func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config,
|
||||
restMapper.Reset()
|
||||
}, syncPeriod, ctx.Done())
|
||||
go gc.Run(ctx, 1)
|
||||
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done())
|
||||
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
|
||||
}
|
||||
return startGC
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user