garbagecollector: add initialSyncTimeout for Run

Signed-off-by: haorenfsa <haorenfsa@gmail.com>
This commit is contained in:
haorenfsa 2024-09-13 23:03:47 +08:00
parent d4fdfaf17d
commit 87ca404634
5 changed files with 13 additions and 9 deletions

View File

@ -698,11 +698,12 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
// Start the garbage collector. // Start the garbage collector.
workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(ctx, workers) const syncPeriod = 30 * time.Second
go garbageCollector.Run(ctx, workers, syncPeriod)
// Periodically refresh the RESTMapper with new discovery information and sync // Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector. // the garbage collector.
go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second) go garbageCollector.Sync(ctx, discoveryClient, syncPeriod)
return garbageCollector, true, nil return garbageCollector, true, nil
} }

View File

@ -129,7 +129,7 @@ func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResource
} }
// Run starts garbage collector workers. // Run starts garbage collector workers.
func (gc *GarbageCollector) Run(ctx context.Context, workers int) { func (gc *GarbageCollector) Run(ctx context.Context, workers int, initialSyncTimeout time.Duration) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown() defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown() defer gc.attemptToOrphan.ShutDown()
@ -146,7 +146,7 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
go gc.dependencyGraphBuilder.Run(ctx) go gc.dependencyGraphBuilder.Run(ctx)
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), 30*time.Second), func() bool { if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), initialSyncTimeout), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger) return gc.dependencyGraphBuilder.IsSynced(logger)
}) { }) {
logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways") logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways")

View File

@ -124,7 +124,7 @@ func TestGarbageCollectorConstruction(t *testing.T) {
} }
assert.Len(t, gc.dependencyGraphBuilder.monitors, 1) assert.Len(t, gc.dependencyGraphBuilder.monitors, 1)
go gc.Run(tCtx, 1) go gc.Run(tCtx, 1, 5*time.Second)
err = gc.resyncMonitors(logger, twoResources) err = gc.resyncMonitors(logger, twoResources)
if err != nil { if err != nil {
@ -914,7 +914,8 @@ func TestGarbageCollectorSync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
go gc.Run(tCtx, 1) syncPeriod := 200 * time.Millisecond
go gc.Run(tCtx, 1, syncPeriod)
// The pseudo-code of GarbageCollector.Sync(): // The pseudo-code of GarbageCollector.Sync():
// GarbageCollector.Sync(client, period, stopCh): // GarbageCollector.Sync(client, period, stopCh):
// wait.Until() loops with `period` until the `stopCh` is closed : // wait.Until() loops with `period` until the `stopCh` is closed :
@ -929,7 +930,7 @@ func TestGarbageCollectorSync(t *testing.T) {
// The 1s sleep in the test allows GetDeletableResources and // The 1s sleep in the test allows GetDeletableResources and
// gc.resyncMonitors to run ~5 times to ensure the changes to the // gc.resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up. // fakeDiscoveryClient are picked up.
go gc.Sync(tCtx, fakeDiscoveryClient, 200*time.Millisecond) go gc.Sync(tCtx, fakeDiscoveryClient, syncPeriod)
// Wait until the sync discovers the initial resources // Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

View File

@ -301,7 +301,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
// mapper, but we'll deal with it for now. // mapper, but we'll deal with it for now.
restMapper.Reset() restMapper.Reset()
}, syncPeriod, tCtx.Done()) }, syncPeriod, tCtx.Done())
go gc.Run(tCtx, workers) go gc.Run(tCtx, workers, syncPeriod)
go gc.Sync(tCtx, clientSet.Discovery(), syncPeriod) go gc.Sync(tCtx, clientSet.Discovery(), syncPeriod)
} }
@ -1371,6 +1371,8 @@ func TestCascadingDeleteOnCRDConversionFailure(t *testing.T) {
} }
ctx.startGC(5) ctx.startGC(5)
// make sure gc.Sync finds the new CRD and starts monitoring it
time.Sleep(ctx.syncPeriod + 1*time.Second)
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name) rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
podClient := clientSet.CoreV1().Pods(ns.Name) podClient := clientSet.CoreV1().Pods(ns.Name)

View File

@ -219,7 +219,7 @@ func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclie
go wait.Until(func() { go wait.Until(func() {
restMapper.Reset() restMapper.Reset()
}, syncPeriod, ctx.Done()) }, syncPeriod, ctx.Done())
go gc.Run(ctx, 1) go gc.Run(ctx, 1, syncPeriod)
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
} }
return startGC return startGC