diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 7f93386a86d..b4432cd1276 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -170,10 +171,8 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf newResources := GetDeletableResources(discoveryClient) // This can occur if there is an internal error in GetDeletableResources. - // If the gc attempts to sync with 0 resources it will block forever. - // TODO: Implement a more complete solution for the garbage collector hanging. if len(newResources) == 0 { - glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync") + glog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync") return } @@ -183,39 +182,61 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf return } - // Something has changed, time to sync. - glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources) - // Ensure workers are paused to avoid processing events before informers // have resynced. gc.workerLock.Lock() defer gc.workerLock.Unlock() - // Resetting the REST mapper will also invalidate the underlying discovery - // client. This is a leaky abstraction and assumes behavior about the REST - // mapper, but we'll deal with it for now. - gc.restMapper.Reset() + // Once we get here, we should not unpause workers until we've successfully synced + attempt := 0 + wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + attempt++ - // Perform the monitor resync and wait for controllers to report cache sync. - // - // NOTE: It's possible that newResources will diverge from the resources - // discovered by restMapper during the call to Reset, since they are - // distinct discovery clients invalidated at different times. For example, - // newResources may contain resources not returned in the restMapper's - // discovery call if the resources appeared in-between the calls. In that - // case, the restMapper will fail to map some of newResources until the next - // sync period. - if err := gc.resyncMonitors(newResources); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) - return - } - // TODO: WaitForCacheSync can block forever during normal operation. Could - // pass a timeout channel, but we have to consider the implications of - // un-pausing the GC with a partially synced graph builder. - if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) - return - } + // On a reattempt, check if available resources have changed + if attempt > 1 { + newResources = GetDeletableResources(discoveryClient) + if len(newResources) == 0 { + glog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt) + return false, nil + } + } + + glog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources)) + + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + gc.restMapper.Reset() + glog.V(4).Infof("reset restmapper") + + // Perform the monitor resync and wait for controllers to report cache sync. + // + // NOTE: It's possible that newResources will diverge from the resources + // discovered by restMapper during the call to Reset, since they are + // distinct discovery clients invalidated at different times. For example, + // newResources may contain resources not returned in the restMapper's + // discovery call if the resources appeared in-between the calls. In that + // case, the restMapper will fail to map some of newResources until the next + // attempt. + if err := gc.resyncMonitors(newResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err)) + return false, nil + } + glog.V(4).Infof("resynced monitors") + + // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. + // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. + // informers keep attempting to sync in the background, so retrying doesn't interrupt them. + // the call to resyncMonitors on the reattempt will no-op for resources that still exist. + // note that workers stay paused until we successfully resync. + if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) + return false, nil + } + + // success, break out of the loop + return true, nil + }, stopCh) // Finally, keep track of our new state. Do this after all preceding steps // have succeeded to ensure we'll retry on subsequent syncs if an error @@ -225,6 +246,36 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf }, period, stopCh) } +// printDiff returns a human-readable summary of what resources were added and removed +func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string { + removed := sets.NewString() + for oldResource := range oldResources { + if _, ok := newResources[oldResource]; !ok { + removed.Insert(fmt.Sprintf("%+v", oldResource)) + } + } + added := sets.NewString() + for newResource := range newResources { + if _, ok := oldResources[newResource]; !ok { + added.Insert(fmt.Sprintf("%+v", newResource)) + } + } + return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List()) +} + +// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached +func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} { + stopChWithTimeout := make(chan struct{}) + go func() { + select { + case <-stopCh: + case <-time.After(timeout): + } + close(stopChWithTimeout) + }() + return stopChWithTimeout +} + func (gc *GarbageCollector) IsSynced() bool { return gc.dependencyGraphBuilder.IsSynced() } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 96682e4c6da..0eb159eb3b9 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -800,6 +800,15 @@ func TestGarbageCollectorSync(t *testing.T) { }, }, } + unsyncableServerResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}}, + {Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"delete", "list", "watch"}}, + }, + }, + } fakeDiscoveryClient := &fakeServerResources{ PreferredResources: serverResources, Error: nil, @@ -813,6 +822,10 @@ func TestGarbageCollectorSync(t *testing.T) { 200, []byte("{}"), }, + "GET" + "/api/v1/secrets": { + 404, + []byte("{}"), + }, }, } srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) @@ -849,7 +862,7 @@ func TestGarbageCollectorSync(t *testing.T) { fmt.Printf("Test output") time.Sleep(1 * time.Second) - err = expectSyncNotBlocked(fakeDiscoveryClient) + err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) if err != nil { t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) } @@ -865,13 +878,29 @@ func TestGarbageCollectorSync(t *testing.T) { fakeDiscoveryClient.setPreferredResources(serverResources) fakeDiscoveryClient.setError(nil) - err = expectSyncNotBlocked(fakeDiscoveryClient) + err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) + fakeDiscoveryClient.setError(nil) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Put the resources back to normal and ensure garbage collector sync recovers + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) if err != nil { t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) } } -func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error { +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { before := fakeDiscoveryClient.getInterfaceUsedCount() t := 1 * time.Second time.Sleep(t) @@ -879,7 +908,19 @@ func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error { if before == after { return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) } - return nil + + workerLockAcquired := make(chan struct{}) + go func() { + workerLock.Lock() + workerLock.Unlock() + close(workerLockAcquired) + }() + select { + case <-workerLockAcquired: + return nil + case <-time.After(t): + return fmt.Errorf("workerLock blocked for at least %v", t) + } } type fakeServerResources struct { diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index 4e18b1c2678..f7602d3731f 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -288,11 +288,13 @@ func (gb *GraphBuilder) IsSynced() bool { defer gb.monitorLock.Unlock() if len(gb.monitors) == 0 { + glog.V(4).Info("garbage controller monitor not synced: no monitors") return false } - for _, monitor := range gb.monitors { + for resource, monitor := range gb.monitors { if !monitor.controller.HasSynced() { + glog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource) return false } } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index 0997de8065d..a25e9246541 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -284,12 +284,32 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro // PollUntil tries a condition func until it returns true, an error or stopCh is // closed. // -// PolUntil always waits interval before the first run of 'condition'. +// PollUntil always waits interval before the first run of 'condition'. // 'condition' will always be invoked at least once. func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { return WaitFor(poller(interval, 0), condition, stopCh) } +// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed. +// +// PollImmediateUntil runs the 'condition' before waiting for the interval. +// 'condition' will always be invoked at least once. +func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + done, err := condition() + if err != nil { + return err + } + if done { + return nil + } + select { + case <-stopCh: + return ErrWaitTimeout + default: + return PollUntil(interval, condition, stopCh) + } +} + // WaitFunc creates a channel that receives an item every time a test // should be executed and is closed when the last test should be invoked. type WaitFunc func(done <-chan struct{}) <-chan struct{}