mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
Avoid deadlock in gc resync if available resources change during sync
This commit is contained in:
parent
268dc570e7
commit
7da3d65571
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/discovery"
|
"k8s.io/client-go/discovery"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
@ -170,10 +171,8 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
|||||||
newResources := GetDeletableResources(discoveryClient)
|
newResources := GetDeletableResources(discoveryClient)
|
||||||
|
|
||||||
// This can occur if there is an internal error in GetDeletableResources.
|
// This can occur if there is an internal error in GetDeletableResources.
|
||||||
// If the gc attempts to sync with 0 resources it will block forever.
|
|
||||||
// TODO: Implement a more complete solution for the garbage collector hanging.
|
|
||||||
if len(newResources) == 0 {
|
if len(newResources) == 0 {
|
||||||
glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync")
|
glog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,39 +182,61 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Something has changed, time to sync.
|
|
||||||
glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources)
|
|
||||||
|
|
||||||
// Ensure workers are paused to avoid processing events before informers
|
// Ensure workers are paused to avoid processing events before informers
|
||||||
// have resynced.
|
// have resynced.
|
||||||
gc.workerLock.Lock()
|
gc.workerLock.Lock()
|
||||||
defer gc.workerLock.Unlock()
|
defer gc.workerLock.Unlock()
|
||||||
|
|
||||||
// Resetting the REST mapper will also invalidate the underlying discovery
|
// Once we get here, we should not unpause workers until we've successfully synced
|
||||||
// client. This is a leaky abstraction and assumes behavior about the REST
|
attempt := 0
|
||||||
// mapper, but we'll deal with it for now.
|
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||||
gc.restMapper.Reset()
|
attempt++
|
||||||
|
|
||||||
// Perform the monitor resync and wait for controllers to report cache sync.
|
// On a reattempt, check if available resources have changed
|
||||||
//
|
if attempt > 1 {
|
||||||
// NOTE: It's possible that newResources will diverge from the resources
|
newResources = GetDeletableResources(discoveryClient)
|
||||||
// discovered by restMapper during the call to Reset, since they are
|
if len(newResources) == 0 {
|
||||||
// distinct discovery clients invalidated at different times. For example,
|
glog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
|
||||||
// newResources may contain resources not returned in the restMapper's
|
return false, nil
|
||||||
// discovery call if the resources appeared in-between the calls. In that
|
}
|
||||||
// case, the restMapper will fail to map some of newResources until the next
|
}
|
||||||
// sync period.
|
|
||||||
if err := gc.resyncMonitors(newResources); err != nil {
|
glog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))
|
||||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
|
||||||
return
|
// Resetting the REST mapper will also invalidate the underlying discovery
|
||||||
}
|
// client. This is a leaky abstraction and assumes behavior about the REST
|
||||||
// TODO: WaitForCacheSync can block forever during normal operation. Could
|
// mapper, but we'll deal with it for now.
|
||||||
// pass a timeout channel, but we have to consider the implications of
|
gc.restMapper.Reset()
|
||||||
// un-pausing the GC with a partially synced graph builder.
|
glog.V(4).Infof("reset restmapper")
|
||||||
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
|
// Perform the monitor resync and wait for controllers to report cache sync.
|
||||||
return
|
//
|
||||||
}
|
// NOTE: It's possible that newResources will diverge from the resources
|
||||||
|
// discovered by restMapper during the call to Reset, since they are
|
||||||
|
// distinct discovery clients invalidated at different times. For example,
|
||||||
|
// newResources may contain resources not returned in the restMapper's
|
||||||
|
// discovery call if the resources appeared in-between the calls. In that
|
||||||
|
// case, the restMapper will fail to map some of newResources until the next
|
||||||
|
// attempt.
|
||||||
|
if err := gc.resyncMonitors(newResources); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("resynced monitors")
|
||||||
|
|
||||||
|
// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
|
||||||
|
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
||||||
|
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
||||||
|
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
|
||||||
|
// note that workers stay paused until we successfully resync.
|
||||||
|
if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// success, break out of the loop
|
||||||
|
return true, nil
|
||||||
|
}, stopCh)
|
||||||
|
|
||||||
// Finally, keep track of our new state. Do this after all preceding steps
|
// Finally, keep track of our new state. Do this after all preceding steps
|
||||||
// have succeeded to ensure we'll retry on subsequent syncs if an error
|
// have succeeded to ensure we'll retry on subsequent syncs if an error
|
||||||
@ -225,6 +246,36 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
|
|||||||
}, period, stopCh)
|
}, period, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// printDiff returns a human-readable summary of what resources were added and removed
|
||||||
|
func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
|
||||||
|
removed := sets.NewString()
|
||||||
|
for oldResource := range oldResources {
|
||||||
|
if _, ok := newResources[oldResource]; !ok {
|
||||||
|
removed.Insert(fmt.Sprintf("%+v", oldResource))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
added := sets.NewString()
|
||||||
|
for newResource := range newResources {
|
||||||
|
if _, ok := oldResources[newResource]; !ok {
|
||||||
|
added.Insert(fmt.Sprintf("%+v", newResource))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
|
||||||
|
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
|
||||||
|
stopChWithTimeout := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
case <-time.After(timeout):
|
||||||
|
}
|
||||||
|
close(stopChWithTimeout)
|
||||||
|
}()
|
||||||
|
return stopChWithTimeout
|
||||||
|
}
|
||||||
|
|
||||||
func (gc *GarbageCollector) IsSynced() bool {
|
func (gc *GarbageCollector) IsSynced() bool {
|
||||||
return gc.dependencyGraphBuilder.IsSynced()
|
return gc.dependencyGraphBuilder.IsSynced()
|
||||||
}
|
}
|
||||||
|
@ -800,6 +800,15 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
unsyncableServerResources := []*metav1.APIResourceList{
|
||||||
|
{
|
||||||
|
GroupVersion: "v1",
|
||||||
|
APIResources: []metav1.APIResource{
|
||||||
|
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
||||||
|
{Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
fakeDiscoveryClient := &fakeServerResources{
|
fakeDiscoveryClient := &fakeServerResources{
|
||||||
PreferredResources: serverResources,
|
PreferredResources: serverResources,
|
||||||
Error: nil,
|
Error: nil,
|
||||||
@ -813,6 +822,10 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
200,
|
200,
|
||||||
[]byte("{}"),
|
[]byte("{}"),
|
||||||
},
|
},
|
||||||
|
"GET" + "/api/v1/secrets": {
|
||||||
|
404,
|
||||||
|
[]byte("{}"),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||||
@ -849,7 +862,7 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
fmt.Printf("Test output")
|
fmt.Printf("Test output")
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
@ -865,13 +878,29 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
fakeDiscoveryClient.setPreferredResources(serverResources)
|
fakeDiscoveryClient.setPreferredResources(serverResources)
|
||||||
fakeDiscoveryClient.setError(nil)
|
fakeDiscoveryClient.setError(nil)
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
||||||
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
|
||||||
|
fakeDiscoveryClient.setError(nil)
|
||||||
|
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// Put the resources back to normal and ensure garbage collector sync recovers
|
||||||
|
fakeDiscoveryClient.setPreferredResources(serverResources)
|
||||||
|
fakeDiscoveryClient.setError(nil)
|
||||||
|
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
|
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
|
||||||
before := fakeDiscoveryClient.getInterfaceUsedCount()
|
before := fakeDiscoveryClient.getInterfaceUsedCount()
|
||||||
t := 1 * time.Second
|
t := 1 * time.Second
|
||||||
time.Sleep(t)
|
time.Sleep(t)
|
||||||
@ -879,7 +908,19 @@ func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
|
|||||||
if before == after {
|
if before == after {
|
||||||
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
|
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
workerLockAcquired := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
workerLock.Lock()
|
||||||
|
workerLock.Unlock()
|
||||||
|
close(workerLockAcquired)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-workerLockAcquired:
|
||||||
|
return nil
|
||||||
|
case <-time.After(t):
|
||||||
|
return fmt.Errorf("workerLock blocked for at least %v", t)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeServerResources struct {
|
type fakeServerResources struct {
|
||||||
|
@ -288,11 +288,13 @@ func (gb *GraphBuilder) IsSynced() bool {
|
|||||||
defer gb.monitorLock.Unlock()
|
defer gb.monitorLock.Unlock()
|
||||||
|
|
||||||
if len(gb.monitors) == 0 {
|
if len(gb.monitors) == 0 {
|
||||||
|
glog.V(4).Info("garbage controller monitor not synced: no monitors")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, monitor := range gb.monitors {
|
for resource, monitor := range gb.monitors {
|
||||||
if !monitor.controller.HasSynced() {
|
if !monitor.controller.HasSynced() {
|
||||||
|
glog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user