From 739df5452a407636a927ac75395784d5bcca9460 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 18:56:39 -0700 Subject: [PATCH] Avoid deadlock in resource quota resync --- pkg/controller/resourcequota/BUILD | 1 + .../resource_quota_controller.go | 20 +- .../resource_quota_controller_test.go | 263 +++++++++++++++++- 3 files changed, 279 insertions(+), 5 deletions(-) diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index 08f7bacf890..2728d1cb967 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -60,6 +60,7 @@ go_test( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 4447cf5de4d..e87e4cf49a2 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -439,8 +439,13 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } - if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) { + // wait for caches to fill for a while (our sync period). + // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. + // informers keep attempting to sync in the background, so retrying doesn't interrupt them. + // the call to resyncMonitors on the reattempt will no-op for resources that still exist. + if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) + return } // success, remember newly synced resources @@ -466,6 +471,19 @@ func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List()) } +// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached +func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} { + stopChWithTimeout := make(chan struct{}) + go func() { + defer close(stopChWithTimeout) + select { + case <-stopCh: + case <-time.After(timeout): + } + }() + return stopChWithTimeout +} + // resyncMonitors starts or stops quota monitors as needed to ensure that all // (and only) those resources present in the map are monitored. func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 78ee509891a..25fae49784d 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -18,8 +18,12 @@ package resourcequota import ( "fmt" + "net/http" + "net/http/httptest" "strings" + "sync" "testing" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -30,6 +34,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" @@ -83,7 +88,7 @@ type quotaController struct { stop chan struct{} } -func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController { +func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc, discoveryFunc NamespacedResourcesFunc) quotaController { informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) quotaConfiguration := install.NewQuotaConfigurationForControllers(lister) alwaysStarted := make(chan struct{}) @@ -94,9 +99,10 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister ResyncPeriod: controller.NoResyncPeriodFunc, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, - DiscoveryFunc: mockDiscoveryFunc, + DiscoveryFunc: discoveryFunc, Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), InformersStarted: alwaysStarted, + InformerFactory: informerFactory, } qc, err := NewResourceQuotaController(resourceQuotaControllerOptions) if err != nil { @@ -700,7 +706,7 @@ func TestSyncResourceQuota(t *testing.T) { listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items), } - qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc) defer close(qc.stop) if err := qc.syncResourceQuota(&testCase.quota); err != nil { @@ -760,7 +766,7 @@ func TestAddQuota(t *testing.T) { gvr: newGenericLister(gvr.GroupResource(), newTestPods()), } - qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc) defer close(qc.stop) testCases := []struct { @@ -918,3 +924,252 @@ func TestAddQuota(t *testing.T) { } } } + +// TestDiscoverySync ensures that a discovery client error +// will not cause the quota controller to block infinitely. +func TestDiscoverySync(t *testing.T) { + serverResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, + } + unsyncableServerResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + {Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, + } + fakeDiscoveryClient := &fakeServerResources{ + PreferredResources: serverResources, + Error: nil, + Lock: sync.Mutex{}, + InterfaceUsedCount: 0, + } + + testHandler := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/pods": { + 200, + []byte("{}"), + }, + "GET" + "/api/v1/secrets": { + 404, + []byte("{}"), + }, + }, + } + + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + clientConfig.ContentConfig.NegotiatedSerializer = nil + kubeClient, err := kubernetes.NewForConfig(clientConfig) + if err != nil { + t.Fatal(err) + } + + pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} + listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ + pods: newGenericLister(pods.GroupResource(), []runtime.Object{}), + secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}), + } + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources) + defer close(qc.stop) + + stopSync := make(chan struct{}) + defer close(stopSync) + // The pseudo-code of Sync(): + // Sync(client, period, stopCh): + // wait.Until() loops with `period` until the `stopCh` is closed : + // GetQuotableResources() + // resyncMonitors() + // controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. + // + // Setting the period to 200ms allows the WaitForCacheSync() to check + // for cache sync ~2 times in every wait.Until() loop. + // + // The 1s sleep in the test allows GetQuotableResources and + // resyncMonitors to run ~5 times to ensure the changes to the + // fakeDiscoveryClient are picked up. + go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync) + + // Wait until the sync discovers the initial resources + time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning an error + fakeDiscoveryClient.setPreferredResources(nil) + fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()")) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Remove the error from being returned and see if the quota sync is still working + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) + fakeDiscoveryClient.setError(nil) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Put the resources back to normal and ensure quota sync recovers + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } +} + +// testServerAndClientConfig returns a server that listens and a config that can reference it +func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *rest.Config) { + srv := httptest.NewServer(http.HandlerFunc(handler)) + config := &rest.Config{ + Host: srv.URL, + } + return srv, config +} + +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { + before := fakeDiscoveryClient.getInterfaceUsedCount() + t := 1 * time.Second + time.Sleep(t) + after := fakeDiscoveryClient.getInterfaceUsedCount() + if before == after { + return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) + } + + workerLockAcquired := make(chan struct{}) + go func() { + workerLock.Lock() + workerLock.Unlock() + close(workerLockAcquired) + }() + select { + case <-workerLockAcquired: + return nil + case <-time.After(t): + return fmt.Errorf("workerLock blocked for at least %v", t) + } +} + +type fakeServerResources struct { + PreferredResources []*metav1.APIResourceList + Error error + Lock sync.Mutex + InterfaceUsedCount int +} + +func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + return nil, nil +} + +func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +func (_ *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.PreferredResources = resources +} + +func (f *fakeServerResources) setError(err error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.Error = err +} + +func (f *fakeServerResources) getInterfaceUsedCount() int { + f.Lock.Lock() + defer f.Lock.Unlock() + return f.InterfaceUsedCount +} + +func (f *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.InterfaceUsedCount++ + return f.PreferredResources, f.Error +} + +// fakeAction records information about requests to aid in testing. +type fakeAction struct { + method string + path string + query string +} + +// String returns method=path to aid in testing +func (f *fakeAction) String() string { + return strings.Join([]string{f.method, f.path}, "=") +} + +type FakeResponse struct { + statusCode int + content []byte +} + +// fakeActionHandler holds a list of fakeActions received +type fakeActionHandler struct { + // statusCode and content returned by this handler for different method + path. + response map[string]FakeResponse + + lock sync.Mutex + actions []fakeAction +} + +// ServeHTTP logs the action that occurred and always returns the associated status code +func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { + func() { + f.lock.Lock() + defer f.lock.Unlock() + + f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery}) + fakeResponse, ok := f.response[request.Method+request.URL.Path] + if !ok { + fakeResponse.statusCode = 200 + fakeResponse.content = []byte("{\"kind\": \"List\"}") + } + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(fakeResponse.statusCode) + response.Write(fakeResponse.content) + }() + + // This is to allow the fakeActionHandler to simulate a watch being opened + if strings.Contains(request.URL.RawQuery, "watch=true") { + hijacker, ok := response.(http.Hijacker) + if !ok { + return + } + connection, _, err := hijacker.Hijack() + if err != nil { + return + } + defer connection.Close() + time.Sleep(30 * time.Second) + } +}