Merge pull request #61201 from jennybuckley/fix-gc-empty-map

Automatic merge from submit-queue (batch tested with PRs 61284, 61119, 61201). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Prevent garbage collector from attempting to sync with 0 resources

**What this PR does / why we need it**:
As of #55259 we enabled garbagecollector.GetDeletableResources to return partial discovery results (including an empty set of discovery results).
This had the unintended consequence of allowing the garbage collector to enter a blocked state that can only be fixed by restarting.

From [this comment](https://github.com/kubernetes/kubernetes/issues/60037#issuecomment-372801088):

> 1. The Sync function periodically calls GetDeletableResources
>
> 2. According to the comment above GetDeletableResources, All discovery errors are considered temporary. Upon encountering any error, GetDeletableResources will log and return any discovered resources it was able to process (which may be none)., an error in discovery causes the discovery client to no longer discover resources in the cluster, but instead of failing and returning an error, it simply logs the error as garbagecollector.go:601] failed to discover preferred resources: %vthe server was unable to return a response in the time allotted, but may still be processing the request and returns an empty list of resources
>
> 3. The Sync function, upon recieving an empty resource list from discovery, detects that the resources have changed, and calls resyncMonitors, which calls dependencyGraphBuilder.syncMonitors with map[] as the argument as shown in the log as garbagecollector.go:189] syncing garbage collector with updated resources from discovery: map[], which sets the list of monitors to an empty list because it thinks there are no resources to monitor.
>
> 4. Lastly the Sync function calls controller.WaitForCacheSync, which calls cache.WaitForCacheSync, which will continually retry the garbagecollector.IsSynced function until it returns true, but it will always return false because len(gb.monitors) is 0.

This PR prevents that specific race condition from arising.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #60037

**Release note**:
```release-note
Fix bug allowing garbage collector to enter a broken state that could only be fixed by restarting the controller-manager.
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-16 16:56:03 -07:00 committed by GitHub
commit f8f67da082
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 11 deletions

View File

@ -68,6 +68,7 @@ go_test(
"//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -166,12 +166,20 @@ type resettableRESTMapper interface {
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
// the mapper's underlying discovery client will be unnecessarily reset during // the mapper's underlying discovery client will be unnecessarily reset during
// the course of detecting new resources. // the course of detecting new resources.
func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
oldResources := make(map[schema.GroupVersionResource]struct{}) oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() { wait.Until(func() {
// Get the current resource list from discovery. // Get the current resource list from discovery.
newResources := GetDeletableResources(discoveryClient) newResources := GetDeletableResources(discoveryClient)
// This can occur if there is an internal error in GetDeletableResources.
// If the gc attempts to sync with 0 resources it will block forever.
// TODO: Implement a more complete solution for the garbage collector hanging.
if len(newResources) == 0 {
glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync")
return
}
// Decide whether discovery has reported a change. // Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) { if reflect.DeepEqual(oldResources, newResources) {
glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync") glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -41,6 +42,7 @@ import (
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -142,18 +144,34 @@ type fakeActionHandler struct {
// ServeHTTP logs the action that occurred and always returns the associated status code // ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
f.lock.Lock() func() {
defer f.lock.Unlock() f.lock.Lock()
defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery}) f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path] fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok { if !ok {
fakeResponse.statusCode = 200 fakeResponse.statusCode = 200
fakeResponse.content = []byte("{\"kind\": \"List\"}") fakeResponse.content = []byte("{\"kind\": \"List\"}")
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
}()
// This is to allow the fakeActionHandler to simulate a watch being opened
if strings.Contains(request.URL.RawQuery, "watch=true") {
hijacker, ok := response.(http.Hijacker)
if !ok {
return
}
connection, _, err := hijacker.Hijack()
if err != nil {
return
}
defer connection.Close()
time.Sleep(30 * time.Second)
} }
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
} }
// testServerAndClientConfig returns a server that listens and a config that can reference it // testServerAndClientConfig returns a server that listens and a config that can reference it
@ -766,9 +784,101 @@ func TestGetDeletableResources(t *testing.T) {
} }
} }
// TestGarbageCollectorSync ensures that a discovery client error
// will not cause the garbage collector to block infinitely.
func TestGarbageCollectorSync(t *testing.T) {
serverResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Lock: sync.Mutex{},
InterfaceUsedCount: 0,
}
testHandler := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/pods": {
200,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = nil
client, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
rm := &testRESTMapper{legacyscheme.Registry.RESTMapper()}
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
podResource := map[schema.GroupVersionResource]struct{}{
{Group: "", Version: "v1", Resource: "pods"}: {},
}
sharedInformers := informers.NewSharedInformerFactory(client, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go gc.Run(1, stopCh)
go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh)
// Wait until the sync discovers the initial resources
fmt.Printf("Test output")
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
}
// Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil)
fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()"))
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Remove the error from being returned and see if the garbage collector sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
}
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
return nil
}
type fakeServerResources struct { type fakeServerResources struct {
PreferredResources []*metav1.APIResourceList PreferredResources []*metav1.APIResourceList
Error error Error error
Lock sync.Mutex
InterfaceUsedCount int
} }
func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
@ -780,9 +890,30 @@ func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, erro
} }
func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) { func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.InterfaceUsedCount++
return f.PreferredResources, f.Error return f.PreferredResources, f.Error
} }
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.PreferredResources = resources
}
func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err
}
func (f *fakeServerResources) getInterfaceUsedCount() int {
f.Lock.Lock()
defer f.Lock.Unlock()
return f.InterfaceUsedCount
}
func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return nil, nil return nil, nil
} }