mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 11:38:15 +00:00
Fix duplicate GC event handlers getting added if discovery flutters
This commit is contained in:
parent
733f63c6c4
commit
c9a084d59c
@ -187,14 +187,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
|||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// Get the current resource list from discovery.
|
// Get the current resource list from discovery.
|
||||||
newResources := GetDeletableResources(logger, discoveryClient)
|
newResources, err := GetDeletableResources(logger, discoveryClient)
|
||||||
|
|
||||||
// This can occur if there is an internal error in GetDeletableResources.
|
|
||||||
if len(newResources) == 0 {
|
if len(newResources) == 0 {
|
||||||
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
|
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
|
||||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
|
||||||
|
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
|
||||||
|
for k, v := range oldResources {
|
||||||
|
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
|
||||||
|
newResources[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Decide whether discovery has reported a change.
|
// Decide whether discovery has reported a change.
|
||||||
if reflect.DeepEqual(oldResources, newResources) {
|
if reflect.DeepEqual(oldResources, newResources) {
|
||||||
@ -214,12 +221,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
|
|||||||
|
|
||||||
// On a reattempt, check if available resources have changed
|
// On a reattempt, check if available resources have changed
|
||||||
if attempt > 1 {
|
if attempt > 1 {
|
||||||
newResources = GetDeletableResources(logger, discoveryClient)
|
newResources, err = GetDeletableResources(logger, discoveryClient)
|
||||||
|
|
||||||
if len(newResources) == 0 {
|
if len(newResources) == 0 {
|
||||||
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
|
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
|
||||||
metrics.GarbageCollectorResourcesSyncError.Inc()
|
metrics.GarbageCollectorResourcesSyncError.Inc()
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
|
||||||
|
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
|
||||||
|
for k, v := range oldResources {
|
||||||
|
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
|
||||||
|
newResources[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.V(2).Info(
|
logger.V(2).Info(
|
||||||
@ -806,20 +822,23 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
|
|||||||
// garbage collector should recognize and work with. More specifically, all
|
// garbage collector should recognize and work with. More specifically, all
|
||||||
// preferred resources which support the 'delete', 'list', and 'watch' verbs.
|
// preferred resources which support the 'delete', 'list', and 'watch' verbs.
|
||||||
//
|
//
|
||||||
|
// If an error was encountered fetching resources from the server,
|
||||||
|
// it is included as well, along with any resources that were successfully resolved.
|
||||||
|
//
|
||||||
// All discovery errors are considered temporary. Upon encountering any error,
|
// All discovery errors are considered temporary. Upon encountering any error,
|
||||||
// GetDeletableResources will log and return any discovered resources it was
|
// GetDeletableResources will log and return any discovered resources it was
|
||||||
// able to process (which may be none).
|
// able to process (which may be none).
|
||||||
func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
|
func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) (map[schema.GroupVersionResource]struct{}, error) {
|
||||||
preferredResources, err := discoveryClient.ServerPreferredResources()
|
preferredResources, lookupErr := discoveryClient.ServerPreferredResources()
|
||||||
if err != nil {
|
if lookupErr != nil {
|
||||||
if discovery.IsGroupDiscoveryFailedError(err) {
|
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(lookupErr); isLookupFailure {
|
||||||
logger.Info("failed to discover some groups", "groups", err.(*discovery.ErrGroupDiscoveryFailed).Groups)
|
logger.Info("failed to discover some groups", "groups", groupLookupFailures)
|
||||||
} else {
|
} else {
|
||||||
logger.Info("failed to discover preferred resources", "error", err)
|
logger.Info("failed to discover preferred resources", "error", lookupErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if preferredResources == nil {
|
if preferredResources == nil {
|
||||||
return map[schema.GroupVersionResource]struct{}{}
|
return map[schema.GroupVersionResource]struct{}{}, lookupErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is extracted from discovery.GroupVersionResources to allow tolerating
|
// This is extracted from discovery.GroupVersionResources to allow tolerating
|
||||||
@ -837,7 +856,7 @@ func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerR
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return deletableGroupVersionResources
|
return deletableGroupVersionResources, lookupErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *GarbageCollector) Name() string {
|
func (gc *GarbageCollector) Name() string {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/klog/v2/ktesting"
|
"k8s.io/klog/v2/ktesting"
|
||||||
|
|
||||||
@ -805,10 +806,13 @@ func TestGetDeletableResources(t *testing.T) {
|
|||||||
PreferredResources: test.serverResources,
|
PreferredResources: test.serverResources,
|
||||||
Error: test.err,
|
Error: test.err,
|
||||||
}
|
}
|
||||||
actual := GetDeletableResources(logger, client)
|
actual, actualErr := GetDeletableResources(logger, client)
|
||||||
if !reflect.DeepEqual(test.deletableResources, actual) {
|
if !reflect.DeepEqual(test.deletableResources, actual) {
|
||||||
t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual)
|
t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual)
|
||||||
}
|
}
|
||||||
|
if !reflect.DeepEqual(test.err, actualErr) {
|
||||||
|
t.Errorf("expected error:\n%v\ngot:\n%v", test.err, actualErr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -822,7 +826,15 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
GroupVersion: "apps/v1",
|
||||||
|
APIResources: []metav1.APIResource{
|
||||||
|
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}}
|
||||||
|
|
||||||
unsyncableServerResources := []*metav1.APIResourceList{
|
unsyncableServerResources := []*metav1.APIResourceList{
|
||||||
{
|
{
|
||||||
GroupVersion: "v1",
|
GroupVersion: "v1",
|
||||||
@ -845,6 +857,10 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
200,
|
200,
|
||||||
[]byte("{}"),
|
[]byte("{}"),
|
||||||
},
|
},
|
||||||
|
"GET" + "/apis/apps/v1/deployments": {
|
||||||
|
200,
|
||||||
|
[]byte("{}"),
|
||||||
|
},
|
||||||
"GET" + "/api/v1/secrets": {
|
"GET" + "/api/v1/secrets": {
|
||||||
404,
|
404,
|
||||||
[]byte("{}"),
|
[]byte("{}"),
|
||||||
@ -859,7 +875,11 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rm := &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)}
|
tweakableRM := meta.NewDefaultRESTMapper(nil)
|
||||||
|
tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, schema.GroupVersionResource{Version: "v1", Resource: "pods"}, schema.GroupVersionResource{Version: "v1", Resource: "pod"}, meta.RESTScopeNamespace)
|
||||||
|
tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Secret"}, schema.GroupVersionResource{Version: "v1", Resource: "secrets"}, schema.GroupVersionResource{Version: "v1", Resource: "secret"}, meta.RESTScopeNamespace)
|
||||||
|
tweakableRM.AddSpecific(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployment"}, meta.RESTScopeNamespace)
|
||||||
|
rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)}}
|
||||||
metadataClient, err := metadata.NewForConfig(clientConfig)
|
metadataClient, err := metadata.NewForConfig(clientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -900,38 +920,70 @@ func TestGarbageCollectorSync(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning an error
|
// Simulate the discovery client returning an error
|
||||||
fakeDiscoveryClient.setPreferredResources(nil)
|
fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
||||||
fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
// Wait until sync discovers the change
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
// No monitor changes
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Remove the error from being returned and see if the garbage collector sync is still working
|
// Remove the error from being returned and see if the garbage collector sync is still working
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
||||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
// Wait until sync discovers the change
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
assertMonitors(t, gc, "pods", "secrets")
|
||||||
|
|
||||||
// Put the resources back to normal and ensure garbage collector sync recovers
|
// Put the resources back to normal and ensure garbage collector sync recovers
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
|
||||||
|
// Partial discovery failure
|
||||||
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
// Deployments monitor kept
|
||||||
|
assertMonitors(t, gc, "pods", "deployments", "secrets")
|
||||||
|
|
||||||
|
// Put the resources back to normal and ensure garbage collector sync recovers
|
||||||
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
// Unsyncable monitor removed
|
||||||
|
assertMonitors(t, gc, "pods", "deployments")
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
||||||
|
t.Helper()
|
||||||
|
expected := sets.NewString(resources...)
|
||||||
|
actual := sets.NewString()
|
||||||
|
for m := range gc.dependencyGraphBuilder.monitors {
|
||||||
|
actual.Insert(m.Resource)
|
||||||
|
}
|
||||||
|
if !actual.Equal(expected) {
|
||||||
|
t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
|
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
|
||||||
@ -979,15 +1031,10 @@ func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceL
|
|||||||
return f.PreferredResources, f.Error
|
return f.PreferredResources, f.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
|
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) {
|
||||||
f.Lock.Lock()
|
f.Lock.Lock()
|
||||||
defer f.Lock.Unlock()
|
defer f.Lock.Unlock()
|
||||||
f.PreferredResources = resources
|
f.PreferredResources = resources
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeServerResources) setError(err error) {
|
|
||||||
f.Lock.Lock()
|
|
||||||
defer f.Lock.Unlock()
|
|
||||||
f.Error = err
|
f.Error = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +265,14 @@ func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
|
|||||||
logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
|
logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsResourceSynced returns true if a monitor exists for the given resource and has synced
|
||||||
|
func (gb *GraphBuilder) IsResourceSynced(resource schema.GroupVersionResource) bool {
|
||||||
|
gb.monitorLock.Lock()
|
||||||
|
defer gb.monitorLock.Unlock()
|
||||||
|
monitor, ok := gb.monitors[resource]
|
||||||
|
return ok && monitor.controller.HasSynced()
|
||||||
|
}
|
||||||
|
|
||||||
// IsSynced returns true if any monitors exist AND all those monitors'
|
// IsSynced returns true if any monitors exist AND all those monitors'
|
||||||
// controllers HasSynced functions return true. This means IsSynced could return
|
// controllers HasSynced functions return true. This means IsSynced could return
|
||||||
// true at one time, and then later return false if all monitors were
|
// true at one time, and then later return false if all monitors were
|
||||||
|
Loading…
Reference in New Issue
Block a user