Avoid deadlock in resource quota resync

This commit is contained in:
Jordan Liggitt 2019-03-13 18:56:39 -07:00
parent e5f7af7058
commit 739df5452a
3 changed files with 279 additions and 5 deletions

View File

@ -60,6 +60,7 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],

View File

@ -439,8 +439,13 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
// wait for caches to fill for a while (our sync period).
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
return
}
// success, remember newly synced resources
@ -466,6 +471,19 @@ func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
}
// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
stopChWithTimeout := make(chan struct{})
go func() {
defer close(stopChWithTimeout)
select {
case <-stopCh:
case <-time.After(timeout):
}
}()
return stopChWithTimeout
}
// resyncMonitors starts or stops quota monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {

View File

@ -18,8 +18,12 @@ package resourcequota
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -30,6 +34,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
@ -83,7 +88,7 @@ type quotaController struct {
stop chan struct{}
}
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController {
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc, discoveryFunc NamespacedResourcesFunc) quotaController {
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
quotaConfiguration := install.NewQuotaConfigurationForControllers(lister)
alwaysStarted := make(chan struct{})
@ -94,9 +99,10 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister
ResyncPeriod: controller.NoResyncPeriodFunc,
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
DiscoveryFunc: mockDiscoveryFunc,
DiscoveryFunc: discoveryFunc,
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
InformersStarted: alwaysStarted,
InformerFactory: informerFactory,
}
qc, err := NewResourceQuotaController(resourceQuotaControllerOptions)
if err != nil {
@ -700,7 +706,7 @@ func TestSyncResourceQuota(t *testing.T) {
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc)
defer close(qc.stop)
if err := qc.syncResourceQuota(&testCase.quota); err != nil {
@ -760,7 +766,7 @@ func TestAddQuota(t *testing.T) {
gvr: newGenericLister(gvr.GroupResource(), newTestPods()),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc)
defer close(qc.stop)
testCases := []struct {
@ -918,3 +924,252 @@ func TestAddQuota(t *testing.T) {
}
}
}
// TestDiscoverySync ensures that a discovery client error
// will not cause the quota controller to block infinitely.
func TestDiscoverySync(t *testing.T) {
serverResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
}
unsyncableServerResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
{Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Lock: sync.Mutex{},
InterfaceUsedCount: 0,
}
testHandler := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/pods": {
200,
[]byte("{}"),
},
"GET" + "/api/v1/secrets": {
404,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = nil
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
pods: newGenericLister(pods.GroupResource(), []runtime.Object{}),
secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources)
defer close(qc.stop)
stopSync := make(chan struct{})
defer close(stopSync)
// The pseudo-code of Sync():
// Sync(client, period, stopCh):
// wait.Until() loops with `period` until the `stopCh` is closed :
// GetQuotableResources()
// resyncMonitors()
// controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
//
// Setting the period to 200ms allows the WaitForCacheSync() to check
// for cache sync ~2 times in every wait.Until() loop.
//
// The 1s sleep in the test allows GetQuotableResources and
// resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up.
go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync)
// Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err)
}
// Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil)
fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()"))
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Remove the error from being returned and see if the quota sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
fakeDiscoveryClient.setError(nil)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Put the resources back to normal and ensure quota sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
}
// testServerAndClientConfig returns a server that listens and a config that can reference it
func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *rest.Config) {
srv := httptest.NewServer(http.HandlerFunc(handler))
config := &rest.Config{
Host: srv.URL,
}
return srv, config
}
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
workerLockAcquired := make(chan struct{})
go func() {
workerLock.Lock()
workerLock.Unlock()
close(workerLockAcquired)
}()
select {
case <-workerLockAcquired:
return nil
case <-time.After(t):
return fmt.Errorf("workerLock blocked for at least %v", t)
}
}
type fakeServerResources struct {
PreferredResources []*metav1.APIResourceList
Error error
Lock sync.Mutex
InterfaceUsedCount int
}
func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
return nil, nil
}
func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
func (_ *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.PreferredResources = resources
}
func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err
}
func (f *fakeServerResources) getInterfaceUsedCount() int {
f.Lock.Lock()
defer f.Lock.Unlock()
return f.InterfaceUsedCount
}
func (f *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.InterfaceUsedCount++
return f.PreferredResources, f.Error
}
// fakeAction records information about requests to aid in testing.
type fakeAction struct {
method string
path string
query string
}
// String returns method=path to aid in testing
func (f *fakeAction) String() string {
return strings.Join([]string{f.method, f.path}, "=")
}
type FakeResponse struct {
statusCode int
content []byte
}
// fakeActionHandler holds a list of fakeActions received
type fakeActionHandler struct {
// statusCode and content returned by this handler for different method + path.
response map[string]FakeResponse
lock sync.Mutex
actions []fakeAction
}
// ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
func() {
f.lock.Lock()
defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok {
fakeResponse.statusCode = 200
fakeResponse.content = []byte("{\"kind\": \"List\"}")
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
}()
// This is to allow the fakeActionHandler to simulate a watch being opened
if strings.Contains(request.URL.RawQuery, "watch=true") {
hijacker, ok := response.(http.Hijacker)
if !ok {
return
}
connection, _, err := hijacker.Hijack()
if err != nil {
return
}
defer connection.Close()
time.Sleep(30 * time.Second)
}
}