Merge pull request #26493 from caesarxuchao/fix-gc-flake

Automatic merge from submit-queue

Fixes 25890 flake. Let GC convert ListOptions to v1 before passing it to the dynamic client

GC's ListWatcher directly passed the api.ListOptions to the dynamic client, but the parameter codec of dynamic client converts the options to queries based on the tags in the struct, which are not present in api.ListOptions, so the queries are not sent to the server. As a result, the Watch request was sent without a resourceVersion, causing missed events. Flake #25890 is caused by the missed deletion events.

This PR converts the api.ListOptions to v1.ListOptions before the GC passes it to the dynamic codec. The flaky test has successfully passed 79 times ([log](https://00e9e64bacd064560a027fbee9c5a373a1614f3a56e652ae40-apidata.googleusercontent.com/download/storage/v1_internal/b/kubernetes-jenkins/o/pr-logs%2Fpull%2F25923%2Fkubernetes-pull-test-unit-integration%2F28364%2Fbuild-log.txt?qk=AD5uMEv72OjSUqDyk5i-ZLurcmM4i7gket1c7WaqR7yuIYz7WhPYT7ewVBafijV0ymnPTYqxRYt1kp6S9YQv7chPwC-3UtrKetKfhYnvAFrPGXAIBxHytTmpFohRAYgsARN1B6j1f9vyK5lM-8jyzRGhCK3sCRsAPnbDBWIWFlbH4b1n3vUET3P71QamHrF5itYyaqRU5pMZV3Cwwr81X8q7h5hCzm3Ip78RpMzfjEqTG0RcM2TLGccUrlkWVBLh4hn0NFpUIkzVFugFA5ooJffo-0AdJnO3mGWEOnXNVFWftJbK8cKnTns0DISrYFOyH_PlOe_YHCxgIXIT-dW8G-nbqoUjn5SBqunr36rcpaYCIwe2va4W_AcLCT43xiEAezRER_U9AuIqi_22KMd6SuHTyljhmWFPvPk8-gpjthLWXhcE7LPO5dV41hnZHnbI4n_9eI1nSVm7q9XdSvX1sWKV1GCwn8oj017AnxVvl9bScultko_0dTC747UqJ6UTFakLuFcHFe-F5Tz7ItDWlBVPoXeC7gTpyuicFKLsdqGlW9F5X6kIwNrBRj9uRsS-QuzSER-fVkQCn4dUTcokttRH_0bYvyfr9oqiDXmywMgOp-L0sKayk8JOVynh2q0Tju9sdkvFr0PxoAjhofomfIC1SZ_JkOzwAT1TUW8dLjPHluMct34xW_-qna1AmkoxM4bZQLhllap96NTC-0IdtzeKDrTul8p7u3WXSJjjEMSijibTNMlnkB0AluT1_RNO94OnzuFv4YlcV24FPhJzchhbyKREkOb_wzgcnSbRwGHjIcfRgkX-IzoXHVBcMYFUrPmsXrnRcfad4XwjkUOgvivkURW2_EwnzgrLDh-IKek51_0FpT1MnFCSG0gQbVSs_iMVPr6UXNAw62LGbKVtl3ZMXyapEpcO8azNbn6Wvd550R704JXxYlU)).

@lavalamp @krousey @smarterclayton
This commit is contained in:
k8s-merge-robot 2016-06-04 01:52:31 -07:00
commit 707cc2bbb8
4 changed files with 102 additions and 31 deletions

View File

@ -42,6 +42,11 @@ type Client struct {
cl *restclient.RESTClient cl *restclient.RESTClient
} }
type ClientWithParameterCodec struct {
client *Client
parameterCodec runtime.ParameterCodec
}
// NewClient returns a new client based on the passed in config. The // NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec. // codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *restclient.Config) (*Client, error) { func NewClient(conf *restclient.Config) (*Client, error) {
@ -79,9 +84,9 @@ func NewClient(conf *restclient.Config) (*Client, error) {
return &Client{cl: cl}, nil return &Client{cl: cl}, nil
} }
// Resource returns an API interface to the specified resource for // Resource returns an API interface to the specified resource for this client's
// this client's group and version. If resource is not a namespaced // group and version. If resource is not a namespaced resource, then namespace
// resource, then namespace is ignored. // is ignored.
func (c *Client) Resource(resource *unversioned.APIResource, namespace string) *ResourceClient { func (c *Client) Resource(resource *unversioned.APIResource, namespace string) *ResourceClient {
return &ResourceClient{ return &ResourceClient{
cl: c.cl, cl: c.cl,
@ -90,17 +95,42 @@ func (c *Client) Resource(resource *unversioned.APIResource, namespace string) *
} }
} }
// ParameterCodec wraps a parameterCodec around the Client.
func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) *ClientWithParameterCodec {
return &ClientWithParameterCodec{
client: c,
parameterCodec: parameterCodec,
}
}
// Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceClient inherits the parameter codec of c.
func (c *ClientWithParameterCodec) Resource(resource *unversioned.APIResource, namespace string) *ResourceClient {
return &ResourceClient{
cl: c.client.cl,
resource: resource,
ns: namespace,
parameterCodec: c.parameterCodec,
}
}
// ResourceClient is an API interface to a specific resource under a // ResourceClient is an API interface to a specific resource under a
// dynamic client. // dynamic client.
type ResourceClient struct { type ResourceClient struct {
cl *restclient.RESTClient cl *restclient.RESTClient
resource *unversioned.APIResource resource *unversioned.APIResource
ns string ns string
parameterCodec runtime.ParameterCodec
} }
// List returns a list of objects for this resource. // List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts runtime.Object) (*runtime.UnstructuredList, error) { func (rc *ResourceClient) List(opts runtime.Object) (*runtime.UnstructuredList, error) {
result := new(runtime.UnstructuredList) result := new(runtime.UnstructuredList)
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
err := rc.cl.Get(). err := rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced). NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name). Resource(rc.resource.Name).
@ -135,6 +165,10 @@ func (rc *ResourceClient) Delete(name string, opts *v1.DeleteOptions) error {
// DeleteCollection deletes a collection of objects. // DeleteCollection deletes a collection of objects.
func (rc *ResourceClient) DeleteCollection(deleteOptions *v1.DeleteOptions, listOptions runtime.Object) error { func (rc *ResourceClient) DeleteCollection(deleteOptions *v1.DeleteOptions, listOptions runtime.Object) error {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Delete(). return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced). NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name). Resource(rc.resource.Name).
@ -174,6 +208,10 @@ func (rc *ResourceClient) Update(obj *runtime.Unstructured) (*runtime.Unstructur
// Watch returns a watch.Interface that watches the resource. // Watch returns a watch.Interface that watches the resource.
func (rc *ResourceClient) Watch(opts runtime.Object) (watch.Interface, error) { func (rc *ResourceClient) Watch(opts runtime.Object) (watch.Interface, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Get(). return rc.cl.Get().
Prefix("watch"). Prefix("watch").
NamespaceIfScoped(rc.ns, rc.resource.Namespaced). NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
@ -231,4 +269,4 @@ func (parameterCodec) DecodeParameters(parameters url.Values, from unversioned.G
return errors.New("DecodeParameters not implemented on dynamic parameterCodec") return errors.New("DecodeParameters not implemented on dynamic parameterCodec")
} }
var parameterEncoder runtime.ParameterCodec = parameterCodec{} var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{}

View File

@ -433,6 +433,35 @@ type GarbageCollector struct {
propagator *Propagator propagator *Propagator
} }
// TODO: make special List and Watch function that removes fields other than
// ObjectMeta.
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := unversioned.APIResource{Name: resource.Resource}
// The default parameter codec used by the dynamic client cannot
// encode api.ListOptions.
// TODO: api.ParameterCodec doesn't support thirdparty objects.
// We need a generic parameter codec.
return client.ParameterCodec(api.ParameterCodec).Resource(&apiResource, api.NamespaceAll).List(&options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := unversioned.APIResource{Name: resource.Resource}
// The default parameter codec used by the dynamic client cannot
// encode api.ListOptions.
return client.ParameterCodec(api.ParameterCodec).Resource(&apiResource, api.NamespaceAll).Watch(&options)
},
}
}
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) { func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) {
// TODO: consider store in one storage. // TODO: consider store in one storage.
glog.V(6).Infof("create storage for resource %s", resource) glog.V(6).Infof("create storage for resource %s", resource)
@ -442,26 +471,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
return monitor, err return monitor, err
} }
monitor.store, monitor.controller = framework.NewInformer( monitor.store, monitor.controller = framework.NewInformer(
// TODO: make special List and Watch function that removes fields other gcListWatcher(client, resource),
// than ObjectMeta.
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := unversioned.APIResource{Name: resource.Resource}
return client.Resource(&apiResource, api.NamespaceAll).List(&options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := unversioned.APIResource{Name: resource.Resource}
return client.Resource(&apiResource, api.NamespaceAll).Watch(&options)
},
},
nil, nil,
ResourceResyncTime, ResourceResyncTime,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{

View File

@ -52,6 +52,7 @@ func TestNewGarbageCollector(t *testing.T) {
type fakeAction struct { type fakeAction struct {
method string method string
path string path string
query string
} }
// String returns method=path to aid in testing // String returns method=path to aid in testing
@ -78,7 +79,7 @@ func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *htt
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path}) f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path] fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok { if !ok {
fakeResponse.statusCode = 200 fakeResponse.statusCode = 200
@ -317,3 +318,28 @@ func TestDependentsRace(t *testing.T) {
} }
}() }()
} }
// test the list and watch functions correctly converts the ListOptions
func TestGCListWatcher(t *testing.T) {
testHandler := &fakeActionHandler{}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
podResource := unversioned.GroupVersionResource{Version: "v1", Resource: "pods"}
client, err := clientPool.ClientForGroupVersion(podResource.GroupVersion())
if err != nil {
t.Fatal(err)
}
lw := gcListWatcher(client, podResource)
lw.Watch(api.ListOptions{ResourceVersion: "1"})
lw.List(api.ListOptions{ResourceVersion: "1"})
if e, a := 2, len(testHandler.actions); e != a {
t.Errorf("expect %d requests, got %d", e, a)
}
if e, a := "resourceVersion=1", testHandler.actions[0].query; e != a {
t.Errorf("expect %s, got %s", e, a)
}
if e, a := "resourceVersion=1", testHandler.actions[1].query; e != a {
t.Errorf("expect %s, got %s", e, a)
}
}

View File

@ -149,9 +149,6 @@ func setup(t *testing.T) (*garbagecollector.GarbageCollector, clientset.Interfac
// This test simulates the cascading deletion. // This test simulates the cascading deletion.
func TestCascadingDeletion(t *testing.T) { func TestCascadingDeletion(t *testing.T) {
// TODO: Figure out what's going on with this test!
t.Log("This test is failing too much-- lavalamp removed it to stop the submit queue bleeding")
return
gc, clientSet := setup(t) gc, clientSet := setup(t)
oldEnableGarbageCollector := registry.EnableGarbageCollector oldEnableGarbageCollector := registry.EnableGarbageCollector
registry.EnableGarbageCollector = true registry.EnableGarbageCollector = true