From f4d75e0a0aef1ca9be28930e22fd4146a74798cf Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 16 Oct 2015 15:07:14 +0200 Subject: [PATCH] Support timeout in watch requests --- .../mesos/pkg/service/endpoints_controller.go | 6 ++++-- pkg/api/types.go | 2 ++ pkg/api/v1/types.go | 2 ++ pkg/apiserver/apiserver_test.go | 2 ++ pkg/apiserver/resthandler.go | 11 ++++++++++- pkg/apiserver/watch.go | 8 +------- pkg/client/unversioned/daemon_sets.go | 8 +++++--- pkg/client/unversioned/deployment.go | 7 ++++--- pkg/client/unversioned/deployment_test.go | 2 +- pkg/client/unversioned/endpoints.go | 6 +++--- pkg/client/unversioned/events.go | 6 +++--- pkg/client/unversioned/helper.go | 9 +++++++++ .../unversioned/horizontalpodautoscaler.go | 7 ++++--- .../horizontalpodautoscaler_test.go | 2 +- pkg/client/unversioned/ingress.go | 7 ++++--- pkg/client/unversioned/jobs.go | 7 ++++--- pkg/client/unversioned/limit_ranges.go | 6 +++--- pkg/client/unversioned/limit_ranges_test.go | 2 +- pkg/client/unversioned/namespaces.go | 6 +++--- pkg/client/unversioned/namespaces_test.go | 2 +- pkg/client/unversioned/nodes.go | 6 +++--- .../unversioned/persistentvolume_test.go | 2 +- .../unversioned/persistentvolumeclaim.go | 6 +++--- .../unversioned/persistentvolumeclaim_test.go | 2 +- pkg/client/unversioned/persistentvolumes.go | 6 +++--- pkg/client/unversioned/pod_templates.go | 6 +++--- pkg/client/unversioned/pod_templates_test.go | 2 +- pkg/client/unversioned/pods.go | 6 +++--- .../unversioned/replication_controllers.go | 6 +++--- pkg/client/unversioned/request.go | 12 ++++++++++++ pkg/client/unversioned/request_test.go | 19 +++++++++++++++++++ pkg/client/unversioned/resource_quotas.go | 6 +++--- .../unversioned/resource_quotas_test.go | 2 +- pkg/client/unversioned/secrets.go | 6 +++--- pkg/client/unversioned/service_accounts.go | 6 +++--- pkg/client/unversioned/services.go | 6 +++--- pkg/client/unversioned/testclient/actions.go | 9 +++++---- .../testclient/fake_daemon_sets.go | 5 +++-- .../testclient/fake_deployments.go | 4 ++-- .../unversioned/testclient/fake_endpoints.go | 4 ++-- .../unversioned/testclient/fake_events.go | 6 +++--- .../fake_horizontal_pod_autoscalers.go | 4 ++-- .../unversioned/testclient/fake_ingress.go | 4 ++-- .../unversioned/testclient/fake_jobs.go | 4 ++-- .../testclient/fake_limit_ranges.go | 4 ++-- .../unversioned/testclient/fake_namespaces.go | 4 ++-- .../unversioned/testclient/fake_nodes.go | 4 ++-- .../fake_persistent_volume_claims.go | 4 ++-- .../testclient/fake_persistent_volumes.go | 4 ++-- .../testclient/fake_pod_templates.go | 4 ++-- .../unversioned/testclient/fake_pods.go | 4 ++-- .../fake_replication_controllers.go | 4 ++-- .../testclient/fake_resource_quotas.go | 4 ++-- .../unversioned/testclient/fake_secrets.go | 4 ++-- .../testclient/fake_service_accounts.go | 4 ++-- .../unversioned/testclient/fake_services.go | 4 ++-- pkg/controller/daemon/controller.go | 9 ++++++--- .../endpoint/endpoints_controller.go | 6 ++++-- pkg/controller/gc/gc_controller.go | 3 ++- pkg/controller/job/controller.go | 6 ++++-- .../namespace/namespace_controller.go | 3 ++- pkg/controller/node/nodecontroller.go | 6 ++++-- pkg/controller/node/nodecontroller_test.go | 2 +- ...ersistentvolume_claim_binder_controller.go | 6 ++++-- .../persistentvolume_recycler_controller.go | 3 ++- .../replication/replication_controller.go | 6 ++++-- .../serviceaccounts_controller.go | 6 ++++-- .../serviceaccount/tokens_controller.go | 6 ++++-- pkg/kubelet/kubelet.go | 6 ++++-- pkg/volume/util.go | 3 ++- plugin/pkg/admission/limitranger/admission.go | 3 ++- .../namespace/autoprovision/admission.go | 3 ++- .../admission/namespace/exists/admission.go | 3 ++- .../namespace/lifecycle/admission.go | 3 ++- .../pkg/admission/resourcequota/admission.go | 3 ++- .../pkg/admission/serviceaccount/admission.go | 6 ++++-- test/e2e/daemon_restart.go | 3 ++- test/e2e/density.go | 6 ++++-- test/e2e/framework.go | 2 +- test/e2e/latency.go | 3 ++- test/e2e/pods.go | 3 ++- test/e2e/service_latency.go | 3 ++- test/e2e/util.go | 3 ++- test/integration/client_test.go | 2 +- test/integration/persistent_volumes_test.go | 4 ++-- 85 files changed, 256 insertions(+), 160 deletions(-) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index b3f4bc64682..e64d277f46c 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -61,7 +61,8 @@ func NewEndpointController(client *client.Client) *endpointController { return e.client.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Service{}, @@ -81,7 +82,8 @@ func NewEndpointController(client *client.Client) *endpointController { return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/pkg/api/types.go b/pkg/api/types.go index ac0bff869e4..1c4f6b6612d 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1645,6 +1645,8 @@ type ListOptions struct { Watch bool // The resource version to watch (no effect on list yet) ResourceVersion string + // Timeout for the list/watch call. + TimeoutSeconds *int64 } // PodLogOptions is the query options for a Pod's logs REST call diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 1c3bc1cd893..20f6d11ebc6 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -2047,6 +2047,8 @@ type ListOptions struct { // When specified with a watch call, shows changes that occur after that particular version of a resource. // Defaults to changes from the beginning of history. ResourceVersion string `json:"resourceVersion,omitempty"` + // Timeout for the list/watch call. + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } // PodLogOptions is the query options for a Pod's logs REST call. diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index eabe8ebad30..6d0f5f22d15 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -102,6 +102,7 @@ func addTestTypes() { FieldSelector string `json:"fields,omitempty"` Watch bool `json:"watch,omitempty"` ResourceVersion string `json:"resourceVersion,omitempty"` + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } api.Scheme.AddKnownTypes( testVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, @@ -117,6 +118,7 @@ func addNewTestTypes() { FieldSelector string `json:"fieldSelector,omitempty"` Watch bool `json:"watch,omitempty"` ResourceVersion string `json:"resourceVersion,omitempty"` + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } api.Scheme.AddKnownTypes( newVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index eeec1163840..037adecbf7c 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -19,6 +19,7 @@ package apiserver import ( "encoding/json" "fmt" + "math/rand" "net/http" "net/url" gpath "path" @@ -271,7 +272,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch errorJSON(err, scope.Codec, w) return } - serveWatch(watcher, scope, w, req, minRequestTimeout) + // TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=. + timeout := time.Duration(0) + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + if timeout == 0 && minRequestTimeout > 0 { + timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) + } + serveWatch(watcher, scope, w, req, timeout) return } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 2231135543e..32e1989a132 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "math/rand" "net/http" "reflect" "regexp" @@ -66,12 +65,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { } // serveWatch handles serving requests to the server -func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout time.Duration) { - var timeout time.Duration - if minRequestTimeout > 0 { - // Each watch gets a random timeout between minRequestTimeout and 2*minRequestTimeout to avoid thundering herds. - timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) - } +func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, timeout time.Duration) { watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) { if err := setSelfLink(obj, req, scope.Namer); err != nil { glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) diff --git a/pkg/client/unversioned/daemon_sets.go b/pkg/client/unversioned/daemon_sets.go index ebff86e5445..762787d3078 100644 --- a/pkg/client/unversioned/daemon_sets.go +++ b/pkg/client/unversioned/daemon_sets.go @@ -17,6 +17,7 @@ limitations under the License. package unversioned import ( + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -35,7 +36,7 @@ type DaemonSetInterface interface { Update(ctrl *extensions.DaemonSet) (*extensions.DaemonSet, error) UpdateStatus(ctrl *extensions.DaemonSet) (*extensions.DaemonSet, error) Delete(name string) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // daemonSets implements DaemonsSetsNamespacer interface @@ -91,12 +92,13 @@ func (c *daemonSets) Delete(name string) error { } // Watch returns a watch.Interface that watches the requested daemon sets. -func (c *daemonSets) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *daemonSets) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("daemonsets"). - Param("resourceVersion", resourceVersion). + Param("resourceVersion", opts.ResourceVersion). + TimeoutSeconds(TimeoutFromListOptions(opts)). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/deployment.go b/pkg/client/unversioned/deployment.go index 878f5ee1f21..59897129dba 100644 --- a/pkg/client/unversioned/deployment.go +++ b/pkg/client/unversioned/deployment.go @@ -36,7 +36,7 @@ type DeploymentInterface interface { Delete(name string, options *api.DeleteOptions) error Create(Deployment *extensions.Deployment) (*extensions.Deployment, error) Update(Deployment *extensions.Deployment) (*extensions.Deployment, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // deployments implements DeploymentInterface @@ -94,12 +94,13 @@ func (c *deployments) Update(deployment *extensions.Deployment) (result *extensi } // Watch returns a watch.Interface that watches the requested deployments. -func (c *deployments) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *deployments) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.client.Get(). Prefix("watch"). Namespace(c.ns). Resource("deployments"). - Param("resourceVersion", resourceVersion). + Param("resourceVersion", opts.ResourceVersion). + TimeoutSeconds(TimeoutFromListOptions(opts)). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/deployment_test.go b/pkg/client/unversioned/deployment_test.go index 0a94a5f8c9c..d4d596eebaa 100644 --- a/pkg/client/unversioned/deployment_test.go +++ b/pkg/client/unversioned/deployment_test.go @@ -147,6 +147,6 @@ func TestDeploymentWatch(t *testing.T) { }, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).Deployments(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).Deployments(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/endpoints.go b/pkg/client/unversioned/endpoints.go index 40600832498..fbf43db2fe4 100644 --- a/pkg/client/unversioned/endpoints.go +++ b/pkg/client/unversioned/endpoints.go @@ -37,7 +37,7 @@ type EndpointsInterface interface { Get(name string) (*api.Endpoints, error) Delete(name string) error Update(endpoints *api.Endpoints) (*api.Endpoints, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // endpoints implements EndpointsInterface @@ -83,12 +83,12 @@ func (c *endpoints) Delete(name string) error { } // Watch returns a watch.Interface that watches the requested endpoints for a service. -func (c *endpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *endpoints) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("endpoints"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/events.go b/pkg/client/unversioned/events.go index 6094f9773b1..0cf2bcc4f1e 100644 --- a/pkg/client/unversioned/events.go +++ b/pkg/client/unversioned/events.go @@ -38,7 +38,7 @@ type EventInterface interface { Patch(event *api.Event, data []byte) (*api.Event, error) List(label labels.Selector, field fields.Selector) (*api.EventList, error) Get(name string) (*api.Event, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) // Search finds events about the specified object Search(objOrRef runtime.Object) (*api.EventList, error) Delete(name string) error @@ -141,12 +141,12 @@ func (e *events) Get(name string) (*api.Event, error) { } // Watch starts watching for events matching the given selectors. -func (e *events) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (e *events) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return e.client.Get(). Prefix("watch"). NamespaceIfScoped(e.namespace, len(e.namespace) > 0). Resource("events"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/helper.go b/pkg/client/unversioned/helper.go index bc593e2e893..e45e7d978f0 100644 --- a/pkg/client/unversioned/helper.go +++ b/pkg/client/unversioned/helper.go @@ -631,3 +631,12 @@ func DefaultKubernetesUserAgent() string { version = seg[0] return fmt.Sprintf("%s/%s (%s/%s) kubernetes/%s", path.Base(os.Args[0]), version, gruntime.GOOS, gruntime.GOARCH, commit) } + +// TimeoutFromListOptions returns timeout to be set via TimeoutSeconds() method +// based on given options. +func TimeoutFromListOptions(options api.ListOptions) time.Duration { + if options.TimeoutSeconds != nil { + return time.Duration(*options.TimeoutSeconds) * time.Second + } + return 0 +} diff --git a/pkg/client/unversioned/horizontalpodautoscaler.go b/pkg/client/unversioned/horizontalpodautoscaler.go index af98c99df9b..d3c430912ce 100644 --- a/pkg/client/unversioned/horizontalpodautoscaler.go +++ b/pkg/client/unversioned/horizontalpodautoscaler.go @@ -37,7 +37,7 @@ type HorizontalPodAutoscalerInterface interface { Create(horizontalPodAutoscaler *extensions.HorizontalPodAutoscaler) (*extensions.HorizontalPodAutoscaler, error) Update(horizontalPodAutoscaler *extensions.HorizontalPodAutoscaler) (*extensions.HorizontalPodAutoscaler, error) UpdateStatus(horizontalPodAutoscaler *extensions.HorizontalPodAutoscaler) (*extensions.HorizontalPodAutoscaler, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // horizontalPodAutoscalers implements HorizontalPodAutoscalersNamespacer interface @@ -103,12 +103,13 @@ func (c *horizontalPodAutoscalers) UpdateStatus(horizontalPodAutoscaler *extensi } // Watch returns a watch.Interface that watches the requested horizontalPodAutoscalers. -func (c *horizontalPodAutoscalers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *horizontalPodAutoscalers) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.client.Get(). Prefix("watch"). Namespace(c.ns). Resource("horizontalPodAutoscalers"). - Param("resourceVersion", resourceVersion). + Param("resourceVersion", opts.ResourceVersion). + TimeoutSeconds(TimeoutFromListOptions(opts)). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/horizontalpodautoscaler_test.go b/pkg/client/unversioned/horizontalpodautoscaler_test.go index 42b8f7639ca..e56b6689629 100644 --- a/pkg/client/unversioned/horizontalpodautoscaler_test.go +++ b/pkg/client/unversioned/horizontalpodautoscaler_test.go @@ -155,6 +155,6 @@ func TestHorizontalPodAutoscalerWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).Extensions().HorizontalPodAutoscalers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).Extensions().HorizontalPodAutoscalers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/ingress.go b/pkg/client/unversioned/ingress.go index 8c8544c5c47..d9ba8edc28e 100644 --- a/pkg/client/unversioned/ingress.go +++ b/pkg/client/unversioned/ingress.go @@ -36,7 +36,7 @@ type IngressInterface interface { Create(ingress *extensions.Ingress) (*extensions.Ingress, error) Update(ingress *extensions.Ingress) (*extensions.Ingress, error) Delete(name string, options *api.DeleteOptions) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) UpdateStatus(ingress *extensions.Ingress) (*extensions.Ingress, error) } @@ -93,12 +93,13 @@ func (c *ingress) Delete(name string, options *api.DeleteOptions) (err error) { } // Watch returns a watch.Interface that watches the requested ingress. -func (c *ingress) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *ingress) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("ingress"). - Param("resourceVersion", resourceVersion). + Param("resourceVersion", opts.ResourceVersion). + TimeoutSeconds(TimeoutFromListOptions(opts)). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/jobs.go b/pkg/client/unversioned/jobs.go index d6a88edb179..e15d0aeffee 100644 --- a/pkg/client/unversioned/jobs.go +++ b/pkg/client/unversioned/jobs.go @@ -37,7 +37,7 @@ type JobInterface interface { Create(job *extensions.Job) (*extensions.Job, error) Update(job *extensions.Job) (*extensions.Job, error) Delete(name string, options *api.DeleteOptions) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) UpdateStatus(job *extensions.Job) (*extensions.Job, error) } @@ -97,12 +97,13 @@ func (c *jobs) Delete(name string, options *api.DeleteOptions) (err error) { } // Watch returns a watch.Interface that watches the requested jobs. -func (c *jobs) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *jobs) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("jobs"). - Param("resourceVersion", resourceVersion). + Param("resourceVersion", opts.ResourceVersion). + TimeoutSeconds(TimeoutFromListOptions(opts)). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/limit_ranges.go b/pkg/client/unversioned/limit_ranges.go index 5e90480ec6b..de11f8620e4 100644 --- a/pkg/client/unversioned/limit_ranges.go +++ b/pkg/client/unversioned/limit_ranges.go @@ -37,7 +37,7 @@ type LimitRangeInterface interface { Delete(name string) error Create(limitRange *api.LimitRange) (*api.LimitRange, error) Update(limitRange *api.LimitRange) (*api.LimitRange, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // limitRanges implements LimitRangesNamespacer interface @@ -92,12 +92,12 @@ func (c *limitRanges) Update(limitRange *api.LimitRange) (result *api.LimitRange } // Watch returns a watch.Interface that watches the requested resource -func (c *limitRanges) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *limitRanges) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("limitRanges"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/limit_ranges_test.go b/pkg/client/unversioned/limit_ranges_test.go index 23e0fbad5c4..9db69636879 100644 --- a/pkg/client/unversioned/limit_ranges_test.go +++ b/pkg/client/unversioned/limit_ranges_test.go @@ -207,6 +207,6 @@ func TestLimitRangeWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).LimitRanges(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).LimitRanges(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/namespaces.go b/pkg/client/unversioned/namespaces.go index d8baf291842..427a958f57a 100644 --- a/pkg/client/unversioned/namespaces.go +++ b/pkg/client/unversioned/namespaces.go @@ -35,7 +35,7 @@ type NamespaceInterface interface { List(label labels.Selector, field fields.Selector) (*api.NamespaceList, error) Delete(name string) error Update(item *api.Namespace) (*api.Namespace, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) Finalize(item *api.Namespace) (*api.Namespace, error) Status(item *api.Namespace) (*api.Namespace, error) } @@ -114,11 +114,11 @@ func (c *namespaces) Delete(name string) error { } // Watch returns a watch.Interface that watches the requested namespaces. -func (c *namespaces) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *namespaces) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Resource("namespaces"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/namespaces_test.go b/pkg/client/unversioned/namespaces_test.go index 7fc36157345..927959f2b23 100644 --- a/pkg/client/unversioned/namespaces_test.go +++ b/pkg/client/unversioned/namespaces_test.go @@ -174,6 +174,6 @@ func TestNamespaceWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).Namespaces().Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).Namespaces().Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/nodes.go b/pkg/client/unversioned/nodes.go index 4b86c04c62f..6b416ccca73 100644 --- a/pkg/client/unversioned/nodes.go +++ b/pkg/client/unversioned/nodes.go @@ -36,7 +36,7 @@ type NodeInterface interface { Delete(name string) error Update(*api.Node) (*api.Node, error) UpdateStatus(*api.Node) (*api.Node, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // nodes implements NodesInterface @@ -102,12 +102,12 @@ func (c *nodes) UpdateStatus(node *api.Node) (*api.Node, error) { } // Watch returns a watch.Interface that watches the requested nodes. -func (c *nodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *nodes) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(api.NamespaceAll). Resource(c.resourceName()). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/persistentvolume_test.go b/pkg/client/unversioned/persistentvolume_test.go index 6b09f5f518b..4b347d30e67 100644 --- a/pkg/client/unversioned/persistentvolume_test.go +++ b/pkg/client/unversioned/persistentvolume_test.go @@ -180,6 +180,6 @@ func TestPersistentVolumeWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).PersistentVolumes().Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).PersistentVolumes().Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/persistentvolumeclaim.go b/pkg/client/unversioned/persistentvolumeclaim.go index 23453b333b8..094484979d5 100644 --- a/pkg/client/unversioned/persistentvolumeclaim.go +++ b/pkg/client/unversioned/persistentvolumeclaim.go @@ -38,7 +38,7 @@ type PersistentVolumeClaimInterface interface { Update(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) UpdateStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) Delete(name string) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // persistentVolumeClaims implements PersistentVolumeClaimsNamespacer interface @@ -98,12 +98,12 @@ func (c *persistentVolumeClaims) Delete(name string) error { return c.client.Delete().Namespace(c.namespace).Resource("persistentVolumeClaims").Name(name).Do().Error() } -func (c *persistentVolumeClaims) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *persistentVolumeClaims) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.client.Get(). Prefix("watch"). Namespace(c.namespace). Resource("persistentVolumeClaims"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/persistentvolumeclaim_test.go b/pkg/client/unversioned/persistentvolumeclaim_test.go index ae50560aba9..acc8f13aae8 100644 --- a/pkg/client/unversioned/persistentvolumeclaim_test.go +++ b/pkg/client/unversioned/persistentvolumeclaim_test.go @@ -197,6 +197,6 @@ func TestPersistentVolumeClaimWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/persistentvolumes.go b/pkg/client/unversioned/persistentvolumes.go index bf3628645ef..2680c390f51 100644 --- a/pkg/client/unversioned/persistentvolumes.go +++ b/pkg/client/unversioned/persistentvolumes.go @@ -37,7 +37,7 @@ type PersistentVolumeInterface interface { Update(volume *api.PersistentVolume) (*api.PersistentVolume, error) UpdateStatus(persistentVolume *api.PersistentVolume) (*api.PersistentVolume, error) Delete(name string) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // persistentVolumes implements PersistentVolumesInterface @@ -93,11 +93,11 @@ func (c *persistentVolumes) Delete(name string) error { return c.client.Delete().Resource("persistentVolumes").Name(name).Do().Error() } -func (c *persistentVolumes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *persistentVolumes) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.client.Get(). Prefix("watch"). Resource("persistentVolumes"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/pod_templates.go b/pkg/client/unversioned/pod_templates.go index 13d9a6bf853..83fe43c5574 100644 --- a/pkg/client/unversioned/pod_templates.go +++ b/pkg/client/unversioned/pod_templates.go @@ -35,7 +35,7 @@ type PodTemplateInterface interface { Delete(name string, options *api.DeleteOptions) error Create(podTemplate *api.PodTemplate) (*api.PodTemplate, error) Update(podTemplate *api.PodTemplate) (*api.PodTemplate, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // podTemplates implements PodTemplatesNamespacer interface @@ -94,12 +94,12 @@ func (c *podTemplates) Update(podTemplate *api.PodTemplate) (result *api.PodTemp } // Watch returns a watch.Interface that watches the requested podTemplates. -func (c *podTemplates) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *podTemplates) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("podTemplates"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/pod_templates_test.go b/pkg/client/unversioned/pod_templates_test.go index 5f35efa9d4f..f8c7cc2bac3 100644 --- a/pkg/client/unversioned/pod_templates_test.go +++ b/pkg/client/unversioned/pod_templates_test.go @@ -137,6 +137,6 @@ func TestPodTemplateWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).PodTemplates(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).PodTemplates(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/pods.go b/pkg/client/unversioned/pods.go index 2d465b12b1a..cd32566c3c4 100644 --- a/pkg/client/unversioned/pods.go +++ b/pkg/client/unversioned/pods.go @@ -35,7 +35,7 @@ type PodInterface interface { Delete(name string, options *api.DeleteOptions) error Create(pod *api.Pod) (*api.Pod, error) Update(pod *api.Pod) (*api.Pod, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) Bind(binding *api.Binding) error UpdateStatus(pod *api.Pod) (*api.Pod, error) } @@ -96,12 +96,12 @@ func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) { } // Watch returns a watch.Interface that watches the requested pods. -func (c *pods) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *pods) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("pods"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/replication_controllers.go b/pkg/client/unversioned/replication_controllers.go index 1ea08d6cb12..bd545bf6d42 100644 --- a/pkg/client/unversioned/replication_controllers.go +++ b/pkg/client/unversioned/replication_controllers.go @@ -36,7 +36,7 @@ type ReplicationControllerInterface interface { Update(ctrl *api.ReplicationController) (*api.ReplicationController, error) UpdateStatus(ctrl *api.ReplicationController) (*api.ReplicationController, error) Delete(name string) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // replicationControllers implements ReplicationControllersNamespacer interface @@ -91,12 +91,12 @@ func (c *replicationControllers) Delete(name string) error { } // Watch returns a watch.Interface that watches the requested controllers. -func (c *replicationControllers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *replicationControllers) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("replicationControllers"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index 04c71854d63..a572374c827 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -459,6 +459,18 @@ func (r *Request) Timeout(d time.Duration) *Request { return r } +// Timeout makes the request use the given duration as a timeout. Sets the "timeoutSeconds" +// parameter. +func (r *Request) TimeoutSeconds(d time.Duration) *Request { + if r.err != nil { + return r + } + if d != 0 { + r.Param("timeoutSeconds", d.String()) + } + return r +} + // Body makes the request use obj as the body. Optional. // If obj is a string, try to read a file of that name. // If obj is a []byte, send it directly. diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index ca9e5e6982e..f17bbe61b6f 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -173,6 +173,25 @@ func TestRequestVersionedParams(t *testing.T) { } } +func TestRequestVersionedParamsFromListOptions(t *testing.T) { + r := &Request{apiVersion: "v1"} + r.VersionedParams(&api.ListOptions{ResourceVersion: "1"}, api.Scheme) + if !reflect.DeepEqual(r.params, url.Values{ + "resourceVersion": []string{"1"}, + }) { + t.Errorf("should have set a param: %#v", r) + } + + var timeout int64 = 10 + r.VersionedParams(&api.ListOptions{ResourceVersion: "2", TimeoutSeconds: &timeout}, api.Scheme) + if !reflect.DeepEqual(r.params, url.Values{ + "resourceVersion": []string{"1", "2"}, + "timeoutSeconds": []string{"10"}, + }) { + t.Errorf("should have set a param: %#v", r) + } +} + func TestRequestURI(t *testing.T) { r := (&Request{}).Param("foo", "a") r.Prefix("other") diff --git a/pkg/client/unversioned/resource_quotas.go b/pkg/client/unversioned/resource_quotas.go index 46241be294e..7556b9f2648 100644 --- a/pkg/client/unversioned/resource_quotas.go +++ b/pkg/client/unversioned/resource_quotas.go @@ -36,7 +36,7 @@ type ResourceQuotaInterface interface { Create(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) Update(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) UpdateStatus(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // resourceQuotas implements ResourceQuotasNamespacer interface @@ -94,12 +94,12 @@ func (c *resourceQuotas) UpdateStatus(resourceQuota *api.ResourceQuota) (result } // Watch returns a watch.Interface that watches the requested resource -func (c *resourceQuotas) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *resourceQuotas) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("resourceQuotas"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/resource_quotas_test.go b/pkg/client/unversioned/resource_quotas_test.go index 27688fb2cd4..f75de8632fc 100644 --- a/pkg/client/unversioned/resource_quotas_test.go +++ b/pkg/client/unversioned/resource_quotas_test.go @@ -193,6 +193,6 @@ func TestResourceQuotaWatch(t *testing.T) { Query: url.Values{"resourceVersion": []string{}}}, Response: Response{StatusCode: 200}, } - _, err := c.Setup(t).ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), "") + _, err := c.Setup(t).ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) c.Validate(t, nil, err) } diff --git a/pkg/client/unversioned/secrets.go b/pkg/client/unversioned/secrets.go index 0b3712301c5..70291cc4eea 100644 --- a/pkg/client/unversioned/secrets.go +++ b/pkg/client/unversioned/secrets.go @@ -33,7 +33,7 @@ type SecretsInterface interface { Delete(name string) error List(label labels.Selector, field fields.Selector) (*api.SecretList, error) Get(name string) (*api.Secret, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // events implements Secrets interface @@ -91,12 +91,12 @@ func (s *secrets) Get(name string) (*api.Secret, error) { } // Watch starts watching for secrets matching the given selectors. -func (s *secrets) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (s *secrets) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return s.client.Get(). Prefix("watch"). Namespace(s.namespace). Resource("secrets"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/service_accounts.go b/pkg/client/unversioned/service_accounts.go index 45ad664c25e..2b5d5e4d0f1 100644 --- a/pkg/client/unversioned/service_accounts.go +++ b/pkg/client/unversioned/service_accounts.go @@ -33,7 +33,7 @@ type ServiceAccountsInterface interface { Delete(name string) error List(label labels.Selector, field fields.Selector) (*api.ServiceAccountList, error) Get(name string) (*api.ServiceAccount, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) } // serviceAccounts implements ServiceAccounts interface @@ -91,12 +91,12 @@ func (s *serviceAccounts) Get(name string) (*api.ServiceAccount, error) { } // Watch starts watching for serviceAccounts matching the given selectors. -func (s *serviceAccounts) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (s *serviceAccounts) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return s.client.Get(). Prefix("watch"). Namespace(s.namespace). Resource("serviceAccounts"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/services.go b/pkg/client/unversioned/services.go index 54b1171e9a5..d933e2b1153 100644 --- a/pkg/client/unversioned/services.go +++ b/pkg/client/unversioned/services.go @@ -35,7 +35,7 @@ type ServiceInterface interface { Create(srv *api.Service) (*api.Service, error) Update(srv *api.Service) (*api.Service, error) Delete(name string) error - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) ProxyGet(name, path string, params map[string]string) ResponseWrapper } @@ -90,12 +90,12 @@ func (c *services) Delete(name string) error { } // Watch returns a watch.Interface that watches the requested services. -func (c *services) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (c *services) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). Resource("services"). - Param("resourceVersion", resourceVersion). + VersionedParams(&opts, api.Scheme). LabelsSelectorParam(label). FieldsSelectorParam(field). Watch() diff --git a/pkg/client/unversioned/testclient/actions.go b/pkg/client/unversioned/testclient/actions.go index 04132dc25ee..6921fcc2b37 100644 --- a/pkg/client/unversioned/testclient/actions.go +++ b/pkg/client/unversioned/testclient/actions.go @@ -19,6 +19,7 @@ package testclient import ( "strings" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" @@ -149,21 +150,21 @@ func NewDeleteAction(resource, namespace, name string) DeleteActionImpl { return action } -func NewRootWatchAction(resource string, label labels.Selector, field fields.Selector, resourceVersion string) WatchActionImpl { +func NewRootWatchAction(resource string, label labels.Selector, field fields.Selector, opts api.ListOptions) WatchActionImpl { action := WatchActionImpl{} action.Verb = "watch" action.Resource = resource - action.WatchRestrictions = WatchRestrictions{label, field, resourceVersion} + action.WatchRestrictions = WatchRestrictions{label, field, opts.ResourceVersion} return action } -func NewWatchAction(resource, namespace string, label labels.Selector, field fields.Selector, resourceVersion string) WatchActionImpl { +func NewWatchAction(resource, namespace string, label labels.Selector, field fields.Selector, opts api.ListOptions) WatchActionImpl { action := WatchActionImpl{} action.Verb = "watch" action.Resource = resource action.Namespace = namespace - action.WatchRestrictions = WatchRestrictions{label, field, resourceVersion} + action.WatchRestrictions = WatchRestrictions{label, field, opts.ResourceVersion} return action } diff --git a/pkg/client/unversioned/testclient/fake_daemon_sets.go b/pkg/client/unversioned/testclient/fake_daemon_sets.go index b0a4c61cabc..f54e1d6d74f 100644 --- a/pkg/client/unversioned/testclient/fake_daemon_sets.go +++ b/pkg/client/unversioned/testclient/fake_daemon_sets.go @@ -17,6 +17,7 @@ limitations under the License. package testclient import ( + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" kclientlib "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" @@ -79,6 +80,6 @@ func (c *FakeDaemonSets) Delete(name string) error { return err } -func (c *FakeDaemonSets) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("daemonsets", c.Namespace, label, field, resourceVersion)) +func (c *FakeDaemonSets) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("daemonsets", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_deployments.go b/pkg/client/unversioned/testclient/fake_deployments.go index 6febc129827..1cf7f8e69e8 100644 --- a/pkg/client/unversioned/testclient/fake_deployments.go +++ b/pkg/client/unversioned/testclient/fake_deployments.go @@ -77,6 +77,6 @@ func (c *FakeDeployments) Delete(name string, options *api.DeleteOptions) error return err } -func (c *FakeDeployments) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("deployments", c.Namespace, label, field, resourceVersion)) +func (c *FakeDeployments) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("deployments", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_endpoints.go b/pkg/client/unversioned/testclient/fake_endpoints.go index 1ba9b5d4234..5fce701c97c 100644 --- a/pkg/client/unversioned/testclient/fake_endpoints.go +++ b/pkg/client/unversioned/testclient/fake_endpoints.go @@ -71,6 +71,6 @@ func (c *FakeEndpoints) Delete(name string) error { return err } -func (c *FakeEndpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("endpoints", c.Namespace, label, field, resourceVersion)) +func (c *FakeEndpoints) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("endpoints", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_events.go b/pkg/client/unversioned/testclient/fake_events.go index f8186e9a8e3..81da46b6d41 100644 --- a/pkg/client/unversioned/testclient/fake_events.go +++ b/pkg/client/unversioned/testclient/fake_events.go @@ -111,10 +111,10 @@ func (c *FakeEvents) Delete(name string) error { } // Watch starts watching for events matching the given selectors. -func (c *FakeEvents) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - action := NewRootWatchAction("events", label, field, resourceVersion) +func (c *FakeEvents) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + action := NewRootWatchAction("events", label, field, opts) if c.Namespace != "" { - action = NewWatchAction("events", c.Namespace, label, field, resourceVersion) + action = NewWatchAction("events", c.Namespace, label, field, opts) } return c.Fake.InvokesWatch(action) } diff --git a/pkg/client/unversioned/testclient/fake_horizontal_pod_autoscalers.go b/pkg/client/unversioned/testclient/fake_horizontal_pod_autoscalers.go index bc901f9d4a0..cc1c4c508e6 100644 --- a/pkg/client/unversioned/testclient/fake_horizontal_pod_autoscalers.go +++ b/pkg/client/unversioned/testclient/fake_horizontal_pod_autoscalers.go @@ -85,6 +85,6 @@ func (c *FakeHorizontalPodAutoscalers) Delete(name string, options *api.DeleteOp return err } -func (c *FakeHorizontalPodAutoscalers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("horizontalpodautoscalers", c.Namespace, label, field, resourceVersion)) +func (c *FakeHorizontalPodAutoscalers) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("horizontalpodautoscalers", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_ingress.go b/pkg/client/unversioned/testclient/fake_ingress.go index 30be16ffb83..0474848f239 100644 --- a/pkg/client/unversioned/testclient/fake_ingress.go +++ b/pkg/client/unversioned/testclient/fake_ingress.go @@ -72,8 +72,8 @@ func (c *FakeIngress) Delete(name string, options *api.DeleteOptions) error { return err } -func (c *FakeIngress) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("ingress", c.Namespace, label, field, resourceVersion)) +func (c *FakeIngress) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("ingress", c.Namespace, label, field, opts)) } func (c *FakeIngress) UpdateStatus(ingress *extensions.Ingress) (result *extensions.Ingress, err error) { diff --git a/pkg/client/unversioned/testclient/fake_jobs.go b/pkg/client/unversioned/testclient/fake_jobs.go index 6bd1e71802a..e8ba8968576 100644 --- a/pkg/client/unversioned/testclient/fake_jobs.go +++ b/pkg/client/unversioned/testclient/fake_jobs.go @@ -72,8 +72,8 @@ func (c *FakeJobs) Delete(name string, options *api.DeleteOptions) error { return err } -func (c *FakeJobs) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("jobs", c.Namespace, label, field, resourceVersion)) +func (c *FakeJobs) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("jobs", c.Namespace, label, field, opts)) } func (c *FakeJobs) UpdateStatus(job *extensions.Job) (result *extensions.Job, err error) { diff --git a/pkg/client/unversioned/testclient/fake_limit_ranges.go b/pkg/client/unversioned/testclient/fake_limit_ranges.go index 473f3d19dff..26ee13fa98a 100644 --- a/pkg/client/unversioned/testclient/fake_limit_ranges.go +++ b/pkg/client/unversioned/testclient/fake_limit_ranges.go @@ -71,6 +71,6 @@ func (c *FakeLimitRanges) Delete(name string) error { return err } -func (c *FakeLimitRanges) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("limitranges", c.Namespace, label, field, resourceVersion)) +func (c *FakeLimitRanges) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("limitranges", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_namespaces.go b/pkg/client/unversioned/testclient/fake_namespaces.go index 638ee7a92df..51bf7c6f1c7 100644 --- a/pkg/client/unversioned/testclient/fake_namespaces.go +++ b/pkg/client/unversioned/testclient/fake_namespaces.go @@ -70,8 +70,8 @@ func (c *FakeNamespaces) Delete(name string) error { return err } -func (c *FakeNamespaces) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewRootWatchAction("namespaces", label, field, resourceVersion)) +func (c *FakeNamespaces) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewRootWatchAction("namespaces", label, field, opts)) } func (c *FakeNamespaces) Finalize(namespace *api.Namespace) (*api.Namespace, error) { diff --git a/pkg/client/unversioned/testclient/fake_nodes.go b/pkg/client/unversioned/testclient/fake_nodes.go index ff70ce37682..751c0311c8e 100644 --- a/pkg/client/unversioned/testclient/fake_nodes.go +++ b/pkg/client/unversioned/testclient/fake_nodes.go @@ -70,8 +70,8 @@ func (c *FakeNodes) Delete(name string) error { return err } -func (c *FakeNodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewRootWatchAction("nodes", label, field, resourceVersion)) +func (c *FakeNodes) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewRootWatchAction("nodes", label, field, opts)) } func (c *FakeNodes) UpdateStatus(node *api.Node) (*api.Node, error) { diff --git a/pkg/client/unversioned/testclient/fake_persistent_volume_claims.go b/pkg/client/unversioned/testclient/fake_persistent_volume_claims.go index d5752a1339e..63d3c739ba2 100644 --- a/pkg/client/unversioned/testclient/fake_persistent_volume_claims.go +++ b/pkg/client/unversioned/testclient/fake_persistent_volume_claims.go @@ -69,8 +69,8 @@ func (c *FakePersistentVolumeClaims) Delete(name string) error { return err } -func (c *FakePersistentVolumeClaims) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("persistentvolumeclaims", c.Namespace, label, field, resourceVersion)) +func (c *FakePersistentVolumeClaims) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("persistentvolumeclaims", c.Namespace, label, field, opts)) } func (c *FakePersistentVolumeClaims) UpdateStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { diff --git a/pkg/client/unversioned/testclient/fake_persistent_volumes.go b/pkg/client/unversioned/testclient/fake_persistent_volumes.go index d26ead23ae6..69888521d4a 100644 --- a/pkg/client/unversioned/testclient/fake_persistent_volumes.go +++ b/pkg/client/unversioned/testclient/fake_persistent_volumes.go @@ -68,8 +68,8 @@ func (c *FakePersistentVolumes) Delete(name string) error { return err } -func (c *FakePersistentVolumes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewRootWatchAction("persistentvolumes", label, field, resourceVersion)) +func (c *FakePersistentVolumes) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewRootWatchAction("persistentvolumes", label, field, opts)) } func (c *FakePersistentVolumes) UpdateStatus(pv *api.PersistentVolume) (*api.PersistentVolume, error) { diff --git a/pkg/client/unversioned/testclient/fake_pod_templates.go b/pkg/client/unversioned/testclient/fake_pod_templates.go index 74cb68261ba..3c3c6106e26 100644 --- a/pkg/client/unversioned/testclient/fake_pod_templates.go +++ b/pkg/client/unversioned/testclient/fake_pod_templates.go @@ -71,6 +71,6 @@ func (c *FakePodTemplates) Delete(name string, options *api.DeleteOptions) error return err } -func (c *FakePodTemplates) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("podtemplates", c.Namespace, label, field, resourceVersion)) +func (c *FakePodTemplates) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("podtemplates", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_pods.go b/pkg/client/unversioned/testclient/fake_pods.go index 05cda9f84c8..424595ad174 100644 --- a/pkg/client/unversioned/testclient/fake_pods.go +++ b/pkg/client/unversioned/testclient/fake_pods.go @@ -76,8 +76,8 @@ func (c *FakePods) Delete(name string, options *api.DeleteOptions) error { return err } -func (c *FakePods) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("pods", c.Namespace, label, field, resourceVersion)) +func (c *FakePods) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("pods", c.Namespace, label, field, opts)) } func (c *FakePods) Bind(binding *api.Binding) error { diff --git a/pkg/client/unversioned/testclient/fake_replication_controllers.go b/pkg/client/unversioned/testclient/fake_replication_controllers.go index c9eeddd8849..294b4497d46 100644 --- a/pkg/client/unversioned/testclient/fake_replication_controllers.go +++ b/pkg/client/unversioned/testclient/fake_replication_controllers.go @@ -79,6 +79,6 @@ func (c *FakeReplicationControllers) Delete(name string) error { return err } -func (c *FakeReplicationControllers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("replicationcontrollers", c.Namespace, label, field, resourceVersion)) +func (c *FakeReplicationControllers) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("replicationcontrollers", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_resource_quotas.go b/pkg/client/unversioned/testclient/fake_resource_quotas.go index 9dd2bd4f4a0..579c813bb01 100644 --- a/pkg/client/unversioned/testclient/fake_resource_quotas.go +++ b/pkg/client/unversioned/testclient/fake_resource_quotas.go @@ -71,8 +71,8 @@ func (c *FakeResourceQuotas) Delete(name string) error { return err } -func (c *FakeResourceQuotas) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("resourcequotas", c.Namespace, label, field, resourceVersion)) +func (c *FakeResourceQuotas) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("resourcequotas", c.Namespace, label, field, opts)) } func (c *FakeResourceQuotas) UpdateStatus(resourceQuota *api.ResourceQuota) (*api.ResourceQuota, error) { diff --git a/pkg/client/unversioned/testclient/fake_secrets.go b/pkg/client/unversioned/testclient/fake_secrets.go index 7db25bc4011..654d105fe32 100644 --- a/pkg/client/unversioned/testclient/fake_secrets.go +++ b/pkg/client/unversioned/testclient/fake_secrets.go @@ -71,6 +71,6 @@ func (c *FakeSecrets) Delete(name string) error { return err } -func (c *FakeSecrets) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("secrets", c.Namespace, label, field, resourceVersion)) +func (c *FakeSecrets) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("secrets", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_service_accounts.go b/pkg/client/unversioned/testclient/fake_service_accounts.go index 4dcd20d68d7..3021a55c6ac 100644 --- a/pkg/client/unversioned/testclient/fake_service_accounts.go +++ b/pkg/client/unversioned/testclient/fake_service_accounts.go @@ -71,6 +71,6 @@ func (c *FakeServiceAccounts) Delete(name string) error { return err } -func (c *FakeServiceAccounts) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("serviceaccounts", c.Namespace, label, field, resourceVersion)) +func (c *FakeServiceAccounts) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("serviceaccounts", c.Namespace, label, field, opts)) } diff --git a/pkg/client/unversioned/testclient/fake_services.go b/pkg/client/unversioned/testclient/fake_services.go index caaadbb8187..52d8c8de2d7 100644 --- a/pkg/client/unversioned/testclient/fake_services.go +++ b/pkg/client/unversioned/testclient/fake_services.go @@ -72,8 +72,8 @@ func (c *FakeServices) Delete(name string) error { return err } -func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return c.Fake.InvokesWatch(NewWatchAction("services", c.Namespace, label, field, resourceVersion)) +func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("services", c.Namespace, label, field, opts)) } func (c *FakeServices) ProxyGet(name, path string, params map[string]string) unversioned.ResponseWrapper { diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 8cd7ca11af0..f928456d75c 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -100,7 +100,8 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &extensions.DaemonSet{}, @@ -132,7 +133,8 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle return dsc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, @@ -150,7 +152,8 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle return dsc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Node{}, diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 35d7550730f..0fd2b708a94 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -65,7 +65,8 @@ func NewEndpointController(client *client.Client, resyncPeriod controller.Resync return e.client.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Service{}, @@ -86,7 +87,8 @@ func NewEndpointController(client *client.Client, resyncPeriod controller.Resync return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index ea15d29279d..11bb0412e8e 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -69,7 +69,8 @@ func New(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, return gcc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), terminatedSelector) }, WatchFunc: func(rv string) (watch.Interface, error) { - return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, rv) + options := api.ListOptions{ResourceVersion: rv} + return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, options) }, }, &api.Pod{}, diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 1c13cca161c..84500f6c9d7 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -89,7 +89,8 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &extensions.Job{}, @@ -112,7 +113,8 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn return jm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return jm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return jm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 97f3484cb8c..7ceb244b621 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -51,7 +51,8 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A return kubeClient.Namespaces().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Namespace{}, diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5b2eb46c0e9..7ca602df99e 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -167,7 +167,8 @@ func NewNodeController( return nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, @@ -183,7 +184,8 @@ func NewNodeController( return nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Node{}, diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index f4f9eb03fe1..b9a43f85bb6 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -137,7 +137,7 @@ func (m *FakeNodeHandler) UpdateStatus(node *api.Node) (*api.Node, error) { return node, nil } -func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { +func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error) { return nil, nil } diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index ee1c566d5f3..c149f331592 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -59,7 +59,8 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.PersistentVolume{}, @@ -77,7 +78,8 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.PersistentVolumeClaim{}, diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index 67380be0db8..0c64328e224 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -67,7 +67,8 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.PersistentVolume{}, diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index d04079fb3d1..5ba9c35ca37 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -111,7 +111,8 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller. return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.ReplicationController{}, @@ -152,7 +153,8 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller. return rm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index 0bb3b5e4884..6a8e76ee352 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -80,7 +80,8 @@ func NewServiceAccountsController(cl client.Interface, options ServiceAccountsCo return e.client.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), accountSelector) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), accountSelector, rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), accountSelector, options) }, }, &api.ServiceAccount{}, @@ -97,7 +98,8 @@ func NewServiceAccountsController(cl client.Interface, options ServiceAccountsCo return e.client.Namespaces().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Namespaces().Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Namespace{}, diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index e4b6afaf495..92202e12ff2 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -66,7 +66,8 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) * return e.client.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.ServiceAccount{}, @@ -86,7 +87,8 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) * return e.client.Secrets(api.NamespaceAll).List(labels.Everything(), tokenSelector) }, WatchFunc: func(rv string) (watch.Interface, error) { - return e.client.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, rv) + options := api.ListOptions{ResourceVersion: rv} + return e.client.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, options) }, }, &api.Secret{}, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index def79f523ae..e7dd447e6d6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -214,7 +214,8 @@ func NewMainKubelet( return kubeClient.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() @@ -231,7 +232,8 @@ func NewMainKubelet( return kubeClient.Nodes().List(labels.Everything(), fieldSelector) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, options) }, } cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() diff --git a/pkg/volume/util.go b/pkg/volume/util.go index c8379b046d7..84db4e32cb9 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -112,7 +112,8 @@ func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, s return c.client.Pods(namespace).List(labels.Everything(), fieldSelector) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, options) }, } queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index 7df227ed1be..eb74f2f673e 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -103,7 +103,8 @@ func NewLimitRanger(client client.Interface, limitFunc LimitFunc) admission.Inte return client.LimitRanges(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return client.LimitRanges(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return client.LimitRanges(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0) diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 5224c1c6624..b23c9181e62 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -88,7 +88,8 @@ func NewProvision(c client.Interface) admission.Interface { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Namespace{}, diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index ed5d33ceaac..badc431729f 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -95,7 +95,8 @@ func NewExists(c client.Interface) admission.Interface { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Namespace{}, diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index fddec069748..e094342ff74 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -112,7 +112,8 @@ func NewLifecycle(c client.Interface) admission.Interface { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return c.Namespaces().Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Namespace{}, diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index 86167b0814f..c49500bae7e 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -54,7 +54,8 @@ func NewResourceQuota(client client.Interface) admission.Interface { return client.ResourceQuotas(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return client.ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return client.ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) diff --git a/plugin/pkg/admission/serviceaccount/admission.go b/plugin/pkg/admission/serviceaccount/admission.go index 26ee52e69ed..b6fb7790ba0 100644 --- a/plugin/pkg/admission/serviceaccount/admission.go +++ b/plugin/pkg/admission/serviceaccount/admission.go @@ -86,7 +86,8 @@ func NewServiceAccount(cl client.Interface) *serviceAccount { return cl.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return cl.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return cl.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.ServiceAccount{}, @@ -100,7 +101,8 @@ func NewServiceAccount(cl client.Interface) *serviceAccount { return cl.Secrets(api.NamespaceAll).List(labels.Everything(), tokenSelector) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return cl.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, resourceVersion) + options := api.ListOptions{ResourceVersion: resourceVersion} + return cl.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, options) }, }, &api.Secret{}, diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index 2c986fd8cbf..8a4e60ff803 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -225,7 +225,8 @@ var _ = Describe("DaemonRestart", func() { return framework.Client.Pods(ns).List(labelSelector, fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return framework.Client.Pods(ns).Watch(labelSelector, fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return framework.Client.Pods(ns).Watch(labelSelector, fields.Everything(), options) }, }, &api.Pod{}, diff --git a/test/e2e/density.go b/test/e2e/density.go index 514d5da8087..5733d74baa6 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -203,7 +203,8 @@ var _ = Describe("Density", func() { return c.Events(ns).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return c.Events(ns).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return c.Events(ns).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Event{}, @@ -286,7 +287,8 @@ var _ = Describe("Density", func() { return c.Pods(ns).List(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/test/e2e/framework.go b/test/e2e/framework.go index c768a860253..1ae57ff14eb 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -153,7 +153,7 @@ func (f *Framework) WaitForAnEndpoint(serviceName string) error { w, err := f.Client.Endpoints(f.Namespace.Name).Watch( labels.Everything(), fields.Set{"metadata.name": serviceName}.AsSelector(), - rv, + api.ListOptions{ResourceVersion: rv}, ) if err != nil { return err diff --git a/test/e2e/latency.go b/test/e2e/latency.go index ab29d90f78e..57c646f8dca 100644 --- a/test/e2e/latency.go +++ b/test/e2e/latency.go @@ -153,7 +153,8 @@ func runLatencyTest(nodeCount int, c *client.Client, ns string) { return c.Pods(ns).List(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), options) }, }, &api.Pod{}, diff --git a/test/e2e/pods.go b/test/e2e/pods.go index 131485d9ca5..e7ff3e71684 100644 --- a/test/e2e/pods.go +++ b/test/e2e/pods.go @@ -304,7 +304,8 @@ var _ = Describe("Pods", func() { } Expect(len(pods.Items)).To(Equal(0)) w, err := podClient.Watch( - labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything(), pods.ListMeta.ResourceVersion) + labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything(), + api.ListOptions{ResourceVersion: pods.ListMeta.ResourceVersion}) if err != nil { Failf("Failed to set up watch: %v", err) } diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index cc2258ab28a..f734b888e68 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -282,7 +282,8 @@ func startEndpointWatcher(f *Framework, q *endpointQueries) { return f.Client.Endpoints(f.Namespace.Name).List(labels.Everything()) }, WatchFunc: func(rv string) (watch.Interface, error) { - return f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Everything(), rv) + options := api.ListOptions{ResourceVersion: rv} + return f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Everything(), options) }, }, &api.Endpoints{}, diff --git a/test/e2e/util.go b/test/e2e/util.go index 84148238c4f..e78d3932c24 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -145,7 +145,8 @@ func newPodStore(c *client.Client, namespace string, label labels.Selector, fiel return c.Pods(namespace).List(label, field) }, WatchFunc: func(rv string) (watch.Interface, error) { - return c.Pods(namespace).Watch(label, field, rv) + options := api.ListOptions{ResourceVersion: rv} + return c.Pods(namespace).Watch(label, field, options) }, } store := cache.NewStore(cache.MetaNamespaceKeyFunc) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index 02916c230b9..ebff71726a9 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -247,7 +247,7 @@ func TestMultiWatch(t *testing.T) { w, err := client.Pods(ns).Watch( labels.Set{"watchlabel": name}.AsSelector(), fields.Everything(), - rv, + api.ListOptions{ResourceVersion: rv}, ) if err != nil { panic(fmt.Sprintf("watch error for %v: %v", name, err)) diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index c4eb6250567..44453a6965a 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -74,7 +74,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { }, } - w, _ := testClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), "0") + w, _ := testClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) defer w.Stop() _, _ = testClient.PersistentVolumes().Create(pv) @@ -100,7 +100,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { // change the reclamation policy of the PV for the next test pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete - w, _ = testClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), "0") + w, _ = testClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), api.ListOptions{}) defer w.Stop() _, _ = testClient.PersistentVolumes().Create(pv)