Merge pull request #125796 from haorenfsa/fix-gc-sync-blocked

garbagecollector: controller should not be blocking on failed cache sync
This commit is contained in:
Kubernetes Prow Robot
2024-09-25 04:02:00 +01:00
committed by GitHub
5 changed files with 249 additions and 116 deletions

View File

@@ -74,8 +74,6 @@ type GarbageCollector struct {
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
workerLock sync.RWMutex
}
var _ controller.Interface = (*GarbageCollector)(nil)
@@ -131,7 +129,7 @@ func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResource
}
// Run starts garbage collector workers.
func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
func (gc *GarbageCollector) Run(ctx context.Context, workers int, initialSyncTimeout time.Duration) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown()
@@ -148,13 +146,15 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
go gc.dependencyGraphBuilder.Run(ctx)
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), initialSyncTimeout), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger)
}) {
return
logger.Info("Garbage collector: not all resource monitors could be synced, proceeding anyways")
} else {
logger.Info("Garbage collector: all resource monitors have synced")
}
logger.Info("All resource monitors have synced. Proceeding to collect garbage")
logger.Info("Proceeding to collect garbage")
// gc workers
for i := 0; i < workers; i++ {
@@ -166,8 +166,8 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
}
// Sync periodically resyncs the garbage collector when new resources are
// observed from discovery. When new resources are detected, Sync will stop all
// GC workers, reset gc.restMapper, and resync the monitors.
// observed from discovery. When new resources are detected, it will reset
// gc.restMapper, and resync the monitors.
//
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
// the mapper's underlying discovery client will be unnecessarily reset during
@@ -200,85 +200,48 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
return
}
// Ensure workers are paused to avoid processing events before informers
// have resynced.
gc.workerLock.Lock()
defer gc.workerLock.Unlock()
logger.V(2).Info(
"syncing garbage collector with updated resources from discovery",
"diff", printDiff(oldResources, newResources),
)
// Once we get here, we should not unpause workers until we've successfully synced
attempt := 0
wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) {
attempt++
// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()
logger.V(4).Info("reset restmapper")
// On a reattempt, check if available resources have changed
if attempt > 1 {
newResources, err = GetDeletableResources(logger, discoveryClient)
// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// attempt.
if err := gc.resyncMonitors(logger, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err))
metrics.GarbageCollectorResourcesSyncError.Inc()
return
}
logger.V(4).Info("resynced monitors")
if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil
}
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
newResources[k] = v
}
}
}
}
logger.V(2).Info(
"syncing garbage collector with updated resources from discovery",
"attempt", attempt,
"diff", printDiff(oldResources, newResources),
)
// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()
logger.V(4).Info("reset restmapper")
// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// attempt.
if err := gc.resyncMonitors(logger, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil
}
logger.V(4).Info("resynced monitors")
// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync.
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger)
}) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil
}
// success, break out of the loop
return true, nil
// gc worker no longer waits for cache to be synced, but we will keep the periodical check to provide logs & metrics
cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
return gc.dependencyGraphBuilder.IsSynced(logger)
})
if cacheSynced {
logger.V(2).Info("synced garbage collector")
} else {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
metrics.GarbageCollectorResourcesSyncError.Inc()
}
// Finally, keep track of our new state. Do this after all preceding steps
// have succeeded to ensure we'll retry on subsequent syncs if an error
// occurred.
// Finally, keep track of our new resource monitor state.
// Monitors where the cache sync times out are still tracked here as
// subsequent runs should stop them if their resources were removed.
oldResources = newResources
logger.V(2).Info("synced garbage collector")
}, period)
}
@@ -328,8 +291,6 @@ var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objec
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
item, quit := gc.attemptToDelete.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
@@ -754,8 +715,6 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
// these steps fail.
func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}

View File

@@ -24,6 +24,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -49,6 +50,7 @@ import (
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -60,9 +62,11 @@ import (
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
metricsutil "k8s.io/component-base/metrics/testutil"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/kubernetes/pkg/api/legacyscheme"
c "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metrics"
"k8s.io/kubernetes/test/utils/ktesting"
)
@@ -120,7 +124,7 @@ func TestGarbageCollectorConstruction(t *testing.T) {
}
assert.Len(t, gc.dependencyGraphBuilder.monitors, 1)
go gc.Run(tCtx, 1)
go gc.Run(tCtx, 1, 5*time.Second)
err = gc.resyncMonitors(logger, twoResources)
if err != nil {
@@ -814,7 +818,8 @@ func TestGetDeletableResources(t *testing.T) {
}
// TestGarbageCollectorSync ensures that a discovery client error
// will not cause the garbage collector to block infinitely.
// or an informer sync error will not cause the garbage collector
// to block infinitely.
func TestGarbageCollectorSync(t *testing.T) {
serverResources := []*metav1.APIResourceList{
{
@@ -845,7 +850,6 @@ func TestGarbageCollectorSync(t *testing.T) {
PreferredResources: serverResources,
Error: nil,
Lock: sync.Mutex{},
InterfaceUsedCount: 0,
}
testHandler := &fakeActionHandler{
@@ -864,7 +868,24 @@ func TestGarbageCollectorSync(t *testing.T) {
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
testHandler2 := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/secrets": {
200,
[]byte("{}"),
},
},
}
var secretSyncOK atomic.Bool
var alternativeTestHandler = func(response http.ResponseWriter, request *http.Request) {
if request.URL.Path == "/api/v1/secrets" && secretSyncOK.Load() {
testHandler2.ServeHTTP(response, request)
return
}
testHandler.ServeHTTP(response, request)
}
srv, clientConfig := testServerAndClientConfig(alternativeTestHandler)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = nil
client, err := kubernetes.NewForConfig(clientConfig)
@@ -884,7 +905,7 @@ func TestGarbageCollectorSync(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, 0)
tCtx := ktesting.Init(t)
logger, tCtx := ktesting.NewTestContext(t)
defer tCtx.Cancel("test has completed")
alwaysStarted := make(chan struct{})
close(alwaysStarted)
@@ -893,7 +914,8 @@ func TestGarbageCollectorSync(t *testing.T) {
t.Fatal(err)
}
go gc.Run(tCtx, 1)
syncPeriod := 200 * time.Millisecond
go gc.Run(tCtx, 1, syncPeriod)
// The pseudo-code of GarbageCollector.Sync():
// GarbageCollector.Sync(client, period, stopCh):
// wait.Until() loops with `period` until the `stopCh` is closed :
@@ -908,14 +930,14 @@ func TestGarbageCollectorSync(t *testing.T) {
// The 1s sleep in the test allows GetDeletableResources and
// gc.resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up.
go gc.Sync(tCtx, fakeDiscoveryClient, 200*time.Millisecond)
go gc.Sync(tCtx, fakeDiscoveryClient, syncPeriod)
// Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
assertMonitors(t, gc, "pods", "deployments")
@@ -930,7 +952,7 @@ func TestGarbageCollectorSync(t *testing.T) {
// Remove the error from being returned and see if the garbage collector sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
@@ -946,7 +968,7 @@ func TestGarbageCollectorSync(t *testing.T) {
// Put the resources back to normal and ensure garbage collector sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
@@ -963,12 +985,33 @@ func TestGarbageCollectorSync(t *testing.T) {
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
// Unsyncable monitor removed
assertMonitors(t, gc, "pods", "deployments")
// Simulate initial not-synced informer which will be synced at the end.
metrics.GarbageCollectorResourcesSyncError.Reset()
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
time.Sleep(1 * time.Second)
assertMonitors(t, gc, "pods", "secrets")
if gc.IsSynced(logger) {
t.Fatal("cache from garbage collector should not be synced")
}
val, _ := metricsutil.GetCounterMetricValue(metrics.GarbageCollectorResourcesSyncError)
if val < 1 {
t.Fatalf("expect sync error metric > 0")
}
// The informer is synced now.
secretSyncOK.Store(true)
if err := wait.PollUntilContextTimeout(tCtx, time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
return gc.IsSynced(logger), nil
}); err != nil {
t.Fatal(err)
}
}
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
@@ -983,27 +1026,15 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
}
}
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
workerLockAcquired := make(chan struct{})
go func() {
workerLock.Lock()
defer workerLock.Unlock()
close(workerLockAcquired)
}()
select {
case <-workerLockAcquired:
return nil
case <-time.After(t):
return fmt.Errorf("workerLock blocked for at least %v", t)
return fmt.Errorf("discoveryClient.ServerPreferredResources() not called over %v", t)
}
return nil
}
type fakeServerResources struct {