mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-20 09:05:26 +00:00
garbagecollector: controller loop should not be blocking on failed cache sync
Signed-off-by: Andrew Sy Kim <andrewsy@google.com> Co-authored-by: He Xiaoxi <xxhe@alauda.io>
This commit is contained in:
parent
e5dd48efd0
commit
e8b1d7dc24
@ -74,8 +74,6 @@ type GarbageCollector struct {
|
|||||||
|
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
eventBroadcaster record.EventBroadcaster
|
eventBroadcaster record.EventBroadcaster
|
||||||
|
|
||||||
workerLock sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ controller.Interface = (*GarbageCollector)(nil)
|
var _ controller.Interface = (*GarbageCollector)(nil)
|
||||||
@ -148,13 +146,15 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
|||||||
|
|
||||||
go gc.dependencyGraphBuilder.Run(ctx)
|
go gc.dependencyGraphBuilder.Run(ctx)
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
|
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), 30*time.Second), func() bool {
|
||||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||||
}) {
|
}) {
|
||||||
return
|
logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways")
|
||||||
|
} else {
|
||||||
|
logger.Info("Garbage collector: all resource monitors have synced")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("All resource monitors have synced. Proceeding to collect garbage")
|
logger.Info("Proceeding to collect garbage")
|
||||||
|
|
||||||
// gc workers
|
// gc workers
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
@ -166,8 +166,8 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sync periodically resyncs the garbage collector when new resources are
|
// Sync periodically resyncs the garbage collector when new resources are
|
||||||
// observed from discovery. When new resources are detected, Sync will stop all
|
// observed from discovery. When new resources are detected, it will reset
|
||||||
// GC workers, reset gc.restMapper, and resync the monitors.
|
// gc.restMapper, and resync the monitors.
|
||||||
//
|
//
|
||||||
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
||||||
// the mapper's underlying discovery client will be unnecessarily reset during
|
// the mapper's underlying discovery client will be unnecessarily reset during
|
||||||
@ -200,38 +200,8 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure workers are paused to avoid processing events before informers
|
|
||||||
// have resynced.
|
|
||||||
gc.workerLock.Lock()
|
|
||||||
defer gc.workerLock.Unlock()
|
|
||||||
|
|
||||||
// Once we get here, we should not unpause workers until we've successfully synced
|
|
||||||
attempt := 0
|
|
||||||
wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) {
|
|
||||||
attempt++
|
|
||||||
|
|
||||||
// On a reattempt, check if available resources have changed
|
|
||||||
if attempt > 1 {
|
|
||||||
newResources, err = GetDeletableResources(logger, discoveryClient)
|
|
||||||
|
|
||||||
if len(newResources) == 0 {
|
|
||||||
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
|
|
||||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
|
|
||||||
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
|
|
||||||
for k, v := range oldResources {
|
|
||||||
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
|
|
||||||
newResources[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.V(2).Info(
|
logger.V(2).Info(
|
||||||
"syncing garbage collector with updated resources from discovery",
|
"syncing garbage collector with updated resources from discovery",
|
||||||
"attempt", attempt,
|
|
||||||
"diff", printDiff(oldResources, newResources),
|
"diff", printDiff(oldResources, newResources),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -251,9 +221,9 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
|||||||
// case, the restMapper will fail to map some of newResources until the next
|
// case, the restMapper will fail to map some of newResources until the next
|
||||||
// attempt.
|
// attempt.
|
||||||
if err := gc.resyncMonitors(logger, newResources); err != nil {
|
if err := gc.resyncMonitors(logger, newResources); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
|
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err))
|
||||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||||
return false, nil
|
return
|
||||||
}
|
}
|
||||||
logger.V(4).Info("resynced monitors")
|
logger.V(4).Info("resynced monitors")
|
||||||
|
|
||||||
@ -265,18 +235,13 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
|||||||
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
|
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
|
||||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||||
}) {
|
}) {
|
||||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
|
||||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// success, break out of the loop
|
// Finally, keep track of our new resource monitor state.
|
||||||
return true, nil
|
// Monitors where the cache sync times out are still tracked here as
|
||||||
})
|
// subsequent runs should stop them if their resources were removed.
|
||||||
|
|
||||||
// Finally, keep track of our new state. Do this after all preceding steps
|
|
||||||
// have succeeded to ensure we'll retry on subsequent syncs if an error
|
|
||||||
// occurred.
|
|
||||||
oldResources = newResources
|
oldResources = newResources
|
||||||
logger.V(2).Info("synced garbage collector")
|
logger.V(2).Info("synced garbage collector")
|
||||||
}, period)
|
}, period)
|
||||||
@ -328,8 +293,6 @@ var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objec
|
|||||||
|
|
||||||
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
|
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
|
||||||
item, quit := gc.attemptToDelete.Get()
|
item, quit := gc.attemptToDelete.Get()
|
||||||
gc.workerLock.RLock()
|
|
||||||
defer gc.workerLock.RUnlock()
|
|
||||||
if quit {
|
if quit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -754,8 +717,6 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
|
|||||||
// these steps fail.
|
// these steps fail.
|
||||||
func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
|
func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
|
||||||
item, quit := gc.attemptToOrphan.Get()
|
item, quit := gc.attemptToOrphan.Get()
|
||||||
gc.workerLock.RLock()
|
|
||||||
defer gc.workerLock.RUnlock()
|
|
||||||
if quit {
|
if quit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -814,7 +814,8 @@ func TestGetDeletableResources(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestGarbageCollectorSync ensures that a discovery client error
|
// TestGarbageCollectorSync ensures that a discovery client error
|
||||||
// will not cause the garbage collector to block infinitely.
|
// or an informer sync error will not cause the garbage collector
|
||||||
|
// to block infinitely.
|
||||||
func TestGarbageCollectorSync(t *testing.T) {
|
func TestGarbageCollectorSync(t *testing.T) {
|
||||||
serverResources := []*metav1.APIResourceList{
|
serverResources := []*metav1.APIResourceList{
|
||||||
{
|
{
|
||||||
@ -912,49 +913,30 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
|
|
||||||
// Wait until the sync discovers the initial resources
|
// Wait until the sync discovers the initial resources
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
|
||||||
}
|
|
||||||
assertMonitors(t, gc, "pods", "deployments")
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning an error
|
// Simulate the discovery client returning an error
|
||||||
fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
// No monitor changes
|
|
||||||
assertMonitors(t, gc, "pods", "deployments")
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Remove the error from being returned and see if the garbage collector sync is still working
|
// Remove the error from being returned and see if the garbage collector sync is still working
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
|
||||||
}
|
|
||||||
assertMonitors(t, gc, "pods", "deployments")
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
||||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
assertMonitors(t, gc, "pods", "secrets")
|
assertMonitors(t, gc, "pods", "secrets")
|
||||||
|
|
||||||
// Put the resources back to normal and ensure garbage collector sync recovers
|
// Put the resources back to normal and ensure garbage collector sync recovers
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
|
||||||
}
|
|
||||||
assertMonitors(t, gc, "pods", "deployments")
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Partial discovery failure
|
// Partial discovery failure
|
||||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
|
||||||
// Wait until sync discovers the change
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
// Deployments monitor kept
|
// Deployments monitor kept
|
||||||
assertMonitors(t, gc, "pods", "deployments", "secrets")
|
assertMonitors(t, gc, "pods", "deployments", "secrets")
|
||||||
@ -963,12 +945,35 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
// Wait until sync discovers the change
|
// Wait until sync discovers the change
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
|
||||||
}
|
|
||||||
// Unsyncable monitor removed
|
// Unsyncable monitor removed
|
||||||
assertMonitors(t, gc, "pods", "deployments")
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
|
// Add fake controller simulate the initial not-synced informer which will be synced at the end.
|
||||||
|
fc := fakeController{}
|
||||||
|
gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "secrets",
|
||||||
|
}] = &monitor{controller: &fc}
|
||||||
|
if gc.IsSynced(logger) {
|
||||||
|
t.Fatal("cache from garbage collector should not be synced")
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
assertMonitors(t, gc, "pods", "secrets")
|
||||||
|
|
||||||
|
// The informer is synced now.
|
||||||
|
fc.SetSynced(true)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
assertMonitors(t, gc, "pods", "secrets")
|
||||||
|
|
||||||
|
if !gc.IsSynced(logger) {
|
||||||
|
t.Fatal("cache from garbage collector should be synced")
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
||||||
@ -983,29 +988,6 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
|
|
||||||
before := fakeDiscoveryClient.getInterfaceUsedCount()
|
|
||||||
t := 1 * time.Second
|
|
||||||
time.Sleep(t)
|
|
||||||
after := fakeDiscoveryClient.getInterfaceUsedCount()
|
|
||||||
if before == after {
|
|
||||||
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
workerLockAcquired := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
workerLock.Lock()
|
|
||||||
defer workerLock.Unlock()
|
|
||||||
close(workerLockAcquired)
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-workerLockAcquired:
|
|
||||||
return nil
|
|
||||||
case <-time.After(t):
|
|
||||||
return fmt.Errorf("workerLock blocked for at least %v", t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakeServerResources struct {
|
type fakeServerResources struct {
|
||||||
PreferredResources []*metav1.APIResourceList
|
PreferredResources []*metav1.APIResourceList
|
||||||
Error error
|
Error error
|
||||||
@ -1035,12 +1017,6 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou
|
|||||||
f.Error = err
|
f.Error = err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeServerResources) getInterfaceUsedCount() int {
|
|
||||||
f.Lock.Lock()
|
|
||||||
defer f.Lock.Unlock()
|
|
||||||
return f.InterfaceUsedCount
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -2778,6 +2754,28 @@ func assertState(s state) step {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fakeController struct {
|
||||||
|
synced bool
|
||||||
|
lock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeController) Run(stopCh <-chan struct{}) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeController) HasSynced() bool {
|
||||||
|
return f.synced
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeController) SetSynced(synced bool) {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
f.synced = synced
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeController) LastSyncResourceVersion() string {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
// trackingWorkqueue implements RateLimitingInterface,
|
// trackingWorkqueue implements RateLimitingInterface,
|
||||||
// allows introspection of the items in the queue,
|
// allows introspection of the items in the queue,
|
||||||
// and treats AddAfter and AddRateLimited the same as Add
|
// and treats AddAfter and AddRateLimited the same as Add
|
||||||
|
Loading…
Reference in New Issue
Block a user