mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Preserve resourcequota informers for groups with discovery resolution errors only
This commit is contained in:
parent
c9a084d59c
commit
12a874d227
@ -440,11 +440,13 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
|
|
||||||
if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 {
|
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure && len(newResources) > 0 {
|
||||||
// In partial discovery cases, don't remove any existing informers, just add new ones
|
// In partial discovery cases, preserve existing informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
|
||||||
for k, v := range oldResources {
|
for k, v := range oldResources {
|
||||||
|
if _, failed := groupLookupFailures[k.GroupVersion()]; failed {
|
||||||
newResources[k] = v
|
newResources[k] = v
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// short circuit in non-discovery error cases or if discovery returned zero resources
|
// short circuit in non-discovery error cases or if discovery returned zero resources
|
||||||
return
|
return
|
||||||
@ -474,6 +476,10 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
|
|||||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// at this point, we've synced the new resources to our monitors, so record that fact.
|
||||||
|
oldResources = newResources
|
||||||
|
|
||||||
// wait for caches to fill for a while (our sync period).
|
// wait for caches to fill for a while (our sync period).
|
||||||
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
||||||
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
||||||
@ -488,8 +494,6 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// success, remember newly synced resources
|
|
||||||
oldResources = newResources
|
|
||||||
logger.V(2).Info("synced quota controller")
|
logger.V(2).Info("synced quota controller")
|
||||||
}, period)
|
}, period)
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
quota "k8s.io/apiserver/pkg/quota/v1"
|
quota "k8s.io/apiserver/pkg/quota/v1"
|
||||||
"k8s.io/apiserver/pkg/quota/v1/generic"
|
"k8s.io/apiserver/pkg/quota/v1/generic"
|
||||||
|
"k8s.io/client-go/discovery"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
@ -1016,6 +1017,12 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
|
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
GroupVersion: "apps/v1",
|
||||||
|
APIResources: []metav1.APIResource{
|
||||||
|
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
unsyncableServerResources := []*metav1.APIResourceList{
|
unsyncableServerResources := []*metav1.APIResourceList{
|
||||||
{
|
{
|
||||||
@ -1026,6 +1033,16 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
appsV1Resources := []*metav1.APIResourceList{
|
||||||
|
{
|
||||||
|
GroupVersion: "apps/v1",
|
||||||
|
APIResources: []metav1.APIResource{
|
||||||
|
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}}
|
||||||
|
coreV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "", Version: "v1"}: fmt.Errorf(":-/")}}
|
||||||
fakeDiscoveryClient := &fakeServerResources{
|
fakeDiscoveryClient := &fakeServerResources{
|
||||||
PreferredResources: serverResources,
|
PreferredResources: serverResources,
|
||||||
Error: nil,
|
Error: nil,
|
||||||
@ -1043,6 +1060,10 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
404,
|
404,
|
||||||
[]byte("{}"),
|
[]byte("{}"),
|
||||||
},
|
},
|
||||||
|
"GET" + "/apis/apps/v1/deployments": {
|
||||||
|
200,
|
||||||
|
[]byte("{}"),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1056,9 +1077,11 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
|
|
||||||
pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
|
pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
|
||||||
secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
|
secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
|
||||||
|
deployments := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
|
||||||
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
|
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
|
||||||
pods: newGenericLister(pods.GroupResource(), []runtime.Object{}),
|
pods: newGenericLister(pods.GroupResource(), []runtime.Object{}),
|
||||||
secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}),
|
secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}),
|
||||||
|
deployments: newGenericLister(deployments.GroupResource(), []runtime.Object{}),
|
||||||
}
|
}
|
||||||
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources)
|
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources)
|
||||||
defer close(qc.stop)
|
defer close(qc.stop)
|
||||||
@ -1088,38 +1111,88 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err)
|
t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
assertMonitors(t, qc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning an error
|
// Simulate the discovery client returning an error
|
||||||
fakeDiscoveryClient.setPreferredResources(nil)
|
fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
||||||
fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
// Wait until sync discovers the change
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
// No monitors removed
|
||||||
|
assertMonitors(t, qc, "pods", "deployments")
|
||||||
|
|
||||||
// Remove the error from being returned and see if the quota sync is still working
|
// Remove the error from being returned and see if the quota sync is still working
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
assertMonitors(t, qc, "pods", "deployments")
|
||||||
|
|
||||||
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
||||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
|
||||||
// Wait until sync discovers the change
|
// Wait until sync discovers the change
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
// deployments removed, secrets added
|
||||||
|
assertMonitors(t, qc, "pods", "secrets")
|
||||||
|
|
||||||
// Put the resources back to normal and ensure quota sync recovers
|
// Put the resources back to normal and ensure quota sync recovers
|
||||||
fakeDiscoveryClient.setPreferredResources(serverResources)
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
fakeDiscoveryClient.setError(nil)
|
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
// secrets removed, deployments readded
|
||||||
|
assertMonitors(t, qc, "pods", "deployments")
|
||||||
|
|
||||||
|
// apps/v1 discovery failure
|
||||||
|
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
// deployments remain due to appsv1 error, secrets added
|
||||||
|
assertMonitors(t, qc, "pods", "deployments", "secrets")
|
||||||
|
|
||||||
|
// core/v1 discovery failure
|
||||||
|
fakeDiscoveryClient.setPreferredResources(appsV1Resources, coreV1Error)
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
// pods and secrets remain due to corev1 error
|
||||||
|
assertMonitors(t, qc, "pods", "deployments", "secrets")
|
||||||
|
|
||||||
|
// Put the resources back to normal and ensure quota sync recovers
|
||||||
|
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||||
|
|
||||||
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
|
||||||
}
|
}
|
||||||
|
// secrets removed, deployments remain
|
||||||
|
assertMonitors(t, qc, "pods", "deployments")
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertMonitors(t *testing.T, qc quotaController, resources ...string) {
|
||||||
|
t.Helper()
|
||||||
|
expected := sets.NewString(resources...)
|
||||||
|
actual := sets.NewString()
|
||||||
|
for m := range qc.Controller.quotaMonitor.monitors {
|
||||||
|
actual.Insert(m.Resource)
|
||||||
|
}
|
||||||
|
if !actual.Equal(expected) {
|
||||||
|
t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// testServerAndClientConfig returns a server that listens and a config that can reference it
|
// testServerAndClientConfig returns a server that listens and a config that can reference it
|
||||||
@ -1169,15 +1242,10 @@ func (*fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceLis
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
|
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) {
|
||||||
f.Lock.Lock()
|
f.Lock.Lock()
|
||||||
defer f.Lock.Unlock()
|
defer f.Lock.Unlock()
|
||||||
f.PreferredResources = resources
|
f.PreferredResources = resources
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeServerResources) setError(err error) {
|
|
||||||
f.Lock.Lock()
|
|
||||||
defer f.Lock.Unlock()
|
|
||||||
f.Error = err
|
f.Error = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user