From 68e2a96016b985b3e856fc0b01e9819b616eedec Mon Sep 17 00:00:00 2001 From: jennybuckley Date: Fri, 16 Mar 2018 11:28:58 -0700 Subject: [PATCH 1/2] Add unit test TestGarbageCollectorSync --- pkg/controller/garbagecollector/BUILD | 1 + .../garbagecollector/garbagecollector.go | 2 +- .../garbagecollector/garbagecollector_test.go | 151 ++++++++++++++++-- 3 files changed, 143 insertions(+), 11 deletions(-) diff --git a/pkg/controller/garbagecollector/BUILD b/pkg/controller/garbagecollector/BUILD index 46f90ca0161..1ef247e2447 100644 --- a/pkg/controller/garbagecollector/BUILD +++ b/pkg/controller/garbagecollector/BUILD @@ -68,6 +68,7 @@ go_test( "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 0f129b7e514..0c606a210db 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -166,7 +166,7 @@ type resettableRESTMapper interface { // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise // the mapper's underlying discovery client will be unnecessarily reset during // the course of detecting new resources. -func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { +func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) { oldResources := make(map[schema.GroupVersionResource]struct{}) wait.Until(func() { // Get the current resource list from discovery. diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 469d7550782..33c6b466536 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" @@ -41,6 +42,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" @@ -142,18 +144,34 @@ type fakeActionHandler struct { // ServeHTTP logs the action that occurred and always returns the associated status code func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { - f.lock.Lock() - defer f.lock.Unlock() + func() { + f.lock.Lock() + defer f.lock.Unlock() - f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery}) - fakeResponse, ok := f.response[request.Method+request.URL.Path] - if !ok { - fakeResponse.statusCode = 200 - fakeResponse.content = []byte("{\"kind\": \"List\"}") + f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery}) + fakeResponse, ok := f.response[request.Method+request.URL.Path] + if !ok { + fakeResponse.statusCode = 200 + fakeResponse.content = []byte("{\"kind\": \"List\"}") + } + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(fakeResponse.statusCode) + response.Write(fakeResponse.content) + }() + + // This is to allow the fakeActionHandler to simulate a watch being opened + if strings.Contains(request.URL.RawQuery, "watch=true") { + hijacker, ok := response.(http.Hijacker) + if !ok { + return + } + connection, _, err := hijacker.Hijack() + if err != nil { + return + } + defer connection.Close() + time.Sleep(30 * time.Second) } - response.Header().Set("Content-Type", "application/json") - response.WriteHeader(fakeResponse.statusCode) - response.Write(fakeResponse.content) } // testServerAndClientConfig returns a server that listens and a config that can reference it @@ -766,9 +784,101 @@ func TestGetDeletableResources(t *testing.T) { } } +// TestGarbageCollectorSync ensures that a discovery client error +// will not cause the garbage collector to block infinitely. +func TestGarbageCollectorSync(t *testing.T) { + serverResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}}, + }, + }, + } + fakeDiscoveryClient := &fakeServerResources{ + PreferredResources: serverResources, + Error: nil, + Lock: sync.Mutex{}, + InterfaceUsedCount: 0, + } + + testHandler := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/pods": { + 200, + []byte("{}"), + }, + }, + } + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + clientConfig.ContentConfig.NegotiatedSerializer = nil + client, err := kubernetes.NewForConfig(clientConfig) + if err != nil { + t.Fatal(err) + } + + rm := &testRESTMapper{legacyscheme.Registry.RESTMapper()} + metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc) + clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc) + podResource := map[schema.GroupVersionResource]struct{}{ + {Group: "", Version: "v1", Resource: "pods"}: {}, + } + sharedInformers := informers.NewSharedInformerFactory(client, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) + if err != nil { + t.Fatal(err) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go gc.Run(1, stopCh) + go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh) + + // Wait until the sync discovers the initial resources + fmt.Printf("Test output") + time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning an error + fakeDiscoveryClient.setPreferredResources(nil) + fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()")) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Remove the error from being returned and see if the garbage collector sync is still working + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } +} + +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error { + before := fakeDiscoveryClient.getInterfaceUsedCount() + t := 1 * time.Second + time.Sleep(t) + after := fakeDiscoveryClient.getInterfaceUsedCount() + if before == after { + return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) + } + return nil +} + type fakeServerResources struct { PreferredResources []*metav1.APIResourceList Error error + Lock sync.Mutex + InterfaceUsedCount int } func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { @@ -780,9 +890,30 @@ func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, erro } func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.InterfaceUsedCount++ return f.PreferredResources, f.Error } +func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.PreferredResources = resources +} + +func (f *fakeServerResources) setError(err error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.Error = err +} + +func (f *fakeServerResources) getInterfaceUsedCount() int { + f.Lock.Lock() + defer f.Lock.Unlock() + return f.InterfaceUsedCount +} + func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { return nil, nil } From 455c6fb049f9dd8c406b509d8c9c32202d875655 Mon Sep 17 00:00:00 2001 From: jennybuckley Date: Fri, 16 Mar 2018 11:44:09 -0700 Subject: [PATCH 2/2] Prevent garbage collector from attempting to sync with 0 resources --- pkg/controller/garbagecollector/garbagecollector.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 0c606a210db..93a96c20feb 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -172,6 +172,14 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf // Get the current resource list from discovery. newResources := GetDeletableResources(discoveryClient) + // This can occur if there is an internal error in GetDeletableResources. + // If the gc attempts to sync with 0 resources it will block forever. + // TODO: Implement a more complete solution for the garbage collector hanging. + if len(newResources) == 0 { + glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync") + return + } + // Decide whether discovery has reported a change. if reflect.DeepEqual(oldResources, newResources) { glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")