mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Add unit test TestGarbageCollectorSync
This commit is contained in:
parent
02611149c1
commit
68e2a96016
@ -68,6 +68,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||||
|
@ -166,7 +166,7 @@ type resettableRESTMapper interface {
|
|||||||
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
||||||
// the mapper's underlying discovery client will be unnecessarily reset during
|
// the mapper's underlying discovery client will be unnecessarily reset during
|
||||||
// the course of detecting new resources.
|
// the course of detecting new resources.
|
||||||
func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
|
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
|
||||||
oldResources := make(map[schema.GroupVersionResource]struct{})
|
oldResources := make(map[schema.GroupVersionResource]struct{})
|
||||||
wait.Until(func() {
|
wait.Until(func() {
|
||||||
// Get the current resource list from discovery.
|
// Get the current resource list from discovery.
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
@ -41,6 +42,7 @@ import (
|
|||||||
"k8s.io/client-go/discovery"
|
"k8s.io/client-go/discovery"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
@ -142,18 +144,34 @@ type fakeActionHandler struct {
|
|||||||
|
|
||||||
// ServeHTTP logs the action that occurred and always returns the associated status code
|
// ServeHTTP logs the action that occurred and always returns the associated status code
|
||||||
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
|
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
|
||||||
f.lock.Lock()
|
func() {
|
||||||
defer f.lock.Unlock()
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
|
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
|
||||||
fakeResponse, ok := f.response[request.Method+request.URL.Path]
|
fakeResponse, ok := f.response[request.Method+request.URL.Path]
|
||||||
if !ok {
|
if !ok {
|
||||||
fakeResponse.statusCode = 200
|
fakeResponse.statusCode = 200
|
||||||
fakeResponse.content = []byte("{\"kind\": \"List\"}")
|
fakeResponse.content = []byte("{\"kind\": \"List\"}")
|
||||||
|
}
|
||||||
|
response.Header().Set("Content-Type", "application/json")
|
||||||
|
response.WriteHeader(fakeResponse.statusCode)
|
||||||
|
response.Write(fakeResponse.content)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// This is to allow the fakeActionHandler to simulate a watch being opened
|
||||||
|
if strings.Contains(request.URL.RawQuery, "watch=true") {
|
||||||
|
hijacker, ok := response.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
connection, _, err := hijacker.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer connection.Close()
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
}
|
}
|
||||||
response.Header().Set("Content-Type", "application/json")
|
|
||||||
response.WriteHeader(fakeResponse.statusCode)
|
|
||||||
response.Write(fakeResponse.content)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// testServerAndClientConfig returns a server that listens and a config that can reference it
|
// testServerAndClientConfig returns a server that listens and a config that can reference it
|
||||||
@ -766,9 +784,101 @@ func TestGetDeletableResources(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestGarbageCollectorSync ensures that a discovery client error
|
||||||
|
// will not cause the garbage collector to block infinitely.
|
||||||
|
func TestGarbageCollectorSync(t *testing.T) {
|
||||||
|
serverResources := []*metav1.APIResourceList{
|
||||||
|
{
|
||||||
|
GroupVersion: "v1",
|
||||||
|
APIResources: []metav1.APIResource{
|
||||||
|
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fakeDiscoveryClient := &fakeServerResources{
|
||||||
|
PreferredResources: serverResources,
|
||||||
|
Error: nil,
|
||||||
|
Lock: sync.Mutex{},
|
||||||
|
InterfaceUsedCount: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
testHandler := &fakeActionHandler{
|
||||||
|
response: map[string]FakeResponse{
|
||||||
|
"GET" + "/api/v1/pods": {
|
||||||
|
200,
|
||||||
|
[]byte("{}"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||||
|
defer srv.Close()
|
||||||
|
clientConfig.ContentConfig.NegotiatedSerializer = nil
|
||||||
|
client, err := kubernetes.NewForConfig(clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rm := &testRESTMapper{legacyscheme.Registry.RESTMapper()}
|
||||||
|
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||||
|
clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||||
|
podResource := map[schema.GroupVersionResource]struct{}{
|
||||||
|
{Group: "", Version: "v1", Resource: "pods"}: {},
|
||||||
|
}
|
||||||
|
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||||
|
alwaysStarted := make(chan struct{})
|
||||||
|
close(alwaysStarted)
|
||||||
|
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
go gc.Run(1, stopCh)
|
||||||
|
go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh)
|
||||||
|
|
||||||
|
// Wait until the sync discovers the initial resources
|
||||||
|
fmt.Printf("Test output")
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate the discovery client returning an error
|
||||||
|
fakeDiscoveryClient.setPreferredResources(nil)
|
||||||
|
fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()"))
|
||||||
|
|
||||||
|
// Wait until sync discovers the change
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// Remove the error from being returned and see if the garbage collector sync is still working
|
||||||
|
fakeDiscoveryClient.setPreferredResources(serverResources)
|
||||||
|
fakeDiscoveryClient.setError(nil)
|
||||||
|
|
||||||
|
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
|
||||||
|
before := fakeDiscoveryClient.getInterfaceUsedCount()
|
||||||
|
t := 1 * time.Second
|
||||||
|
time.Sleep(t)
|
||||||
|
after := fakeDiscoveryClient.getInterfaceUsedCount()
|
||||||
|
if before == after {
|
||||||
|
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type fakeServerResources struct {
|
type fakeServerResources struct {
|
||||||
PreferredResources []*metav1.APIResourceList
|
PreferredResources []*metav1.APIResourceList
|
||||||
Error error
|
Error error
|
||||||
|
Lock sync.Mutex
|
||||||
|
InterfaceUsedCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
|
func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
|
||||||
@ -780,9 +890,30 @@ func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
|
func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
|
||||||
|
f.Lock.Lock()
|
||||||
|
defer f.Lock.Unlock()
|
||||||
|
f.InterfaceUsedCount++
|
||||||
return f.PreferredResources, f.Error
|
return f.PreferredResources, f.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
|
||||||
|
f.Lock.Lock()
|
||||||
|
defer f.Lock.Unlock()
|
||||||
|
f.PreferredResources = resources
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeServerResources) setError(err error) {
|
||||||
|
f.Lock.Lock()
|
||||||
|
defer f.Lock.Unlock()
|
||||||
|
f.Error = err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeServerResources) getInterfaceUsedCount() int {
|
||||||
|
f.Lock.Lock()
|
||||||
|
defer f.Lock.Unlock()
|
||||||
|
return f.InterfaceUsedCount
|
||||||
|
}
|
||||||
|
|
||||||
func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user