diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index ac819d4db0a..dee022828f5 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -64,10 +64,3 @@ func InterpretDeleteError(err error, kind, name string) error { return err } } - -// InterpretResourceVersionError returns the appropriate api error -// for a failure to convert the resource version of an object sent -// to the API to an etcd uint64 index. -func InterpretResourceVersionError(err error, kind, value string) error { - return errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", value)}) -} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 359866a6570..c190b5b9c5f 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -18,7 +18,6 @@ package etcd import ( "fmt" - "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -97,21 +96,6 @@ func makePodKey(ctx api.Context, id string) (string, error) { return MakeEtcdItemKey(ctx, PodPath, id) } -// ParseWatchResourceVersion takes a resource version argument and converts it to -// the etcd version we should pass to helper.Watch(). Because resourceVersion is -// an opaque value, the default watch behavior for non-zero watch is to watch -// the next value (if you pass "1", you will see updates from "2" onwards). -func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { - if resourceVersion == "" || resourceVersion == "0" { - return 0, nil - } - version, err := strconv.ParseUint(resourceVersion, 10, 64) - if err != nil { - return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion) - } - return version + 1, nil -} - // ListPods obtains a list of pods with labels that match selector. func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { @@ -143,7 +127,7 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool // WatchPods begins watching for new, changed, or deleted pods. func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "pod") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod") if err != nil { return nil, err } @@ -354,7 +338,7 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL // WatchControllers begins watching for new, changed, or deleted controllers. func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "replicationControllers") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers") if err != nil { return nil, err } @@ -516,7 +500,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error { // WatchServices begins watching for new, changed, or deleted service configurations. func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "service") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "service") if err != nil { return nil, err } @@ -561,7 +545,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "endpoints") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints") if err != nil { return nil, err } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 3836b7e0f6e..a69a9bb5b5f 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -42,41 +42,6 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { return registry } -func TestEtcdParseWatchResourceVersion(t *testing.T) { - testCases := []struct { - Version string - Kind string - ExpectVersion uint64 - Err bool - }{ - {Version: "", ExpectVersion: 0}, - {Version: "a", Err: true}, - {Version: " ", Err: true}, - {Version: "1", ExpectVersion: 2}, - {Version: "10", ExpectVersion: 11}, - } - for _, testCase := range testCases { - version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) - switch { - case testCase.Err: - if err == nil { - t.Errorf("%s: unexpected non-error", testCase.Version) - continue - } - if !errors.IsInvalid(err) { - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - case !testCase.Err && err != nil: - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - if version != testCase.ExpectVersion { - t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) - } - } -} - // TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash func TestEtcdGetPodDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go index cb4b530b8ed..26f4948b6bd 100644 --- a/pkg/registry/event/rest.go +++ b/pkg/registry/event/rest.go @@ -109,7 +109,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch returns Events events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) } diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 888cbd0a440..ae8e68e7cbb 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -50,6 +51,8 @@ func TestRESTCreate(t *testing.T) { if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } + // Ensure we implement the interface + _ = apiserver.ResourceWatcher(rest) } func TestRESTDelete(t *testing.T) { @@ -216,7 +219,7 @@ func TestRESTWatch(t *testing.T) { Reason: "forTesting", } reg, rest := NewTestREST() - wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0) + wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), "0") if err != nil { t.Fatalf("Unexpected error %v", err) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 90ca035b707..0d27246e685 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -91,8 +91,12 @@ func (e *Etcd) Delete(ctx api.Context, id string) error { // Watch starts a watch for the items that m matches. // TODO: Detect if m references a single object instead of a list. -func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { - return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { +func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { + version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) + if err != nil { + return nil, err + } + return e.Helper.WatchList(e.KeyRoot, version, func(obj runtime.Object) bool { matches, err := m.Matches(obj) return err == nil && matches }) diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 6f1ed30ed2f..dd76590263a 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -417,7 +417,7 @@ func TestEtcdWatch(t *testing.T) { } fakeClient, registry := NewTestGenericEtcdRegistry(t) - wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1) + wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1") if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/registry/generic/registry.go b/pkg/registry/generic/registry.go index d368905cd8e..2f80ab81029 100644 --- a/pkg/registry/generic/registry.go +++ b/pkg/registry/generic/registry.go @@ -76,7 +76,7 @@ type Registry interface { Update(ctx api.Context, id string, obj runtime.Object) error Get(ctx api.Context, id string) (runtime.Object, error) Delete(ctx api.Context, id string) error - Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) + Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error) } // FilterList filters any list object that conforms to the api conventions, diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 01b5d0293b3..6142f90f7eb 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -52,7 +52,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje return generic.FilterList(r.ObjectList, m) } -func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { +func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.Mux.Watch(), nil } diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index 18c57b012ff..8921b921462 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -17,13 +17,16 @@ limitations under the License. package tools import ( + "strconv" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -37,6 +40,21 @@ func Everything(runtime.Object) bool { return true } +// ParseWatchResourceVersion takes a resource version argument and converts it to +// the etcd version we should pass to helper.Watch(). Because resourceVersion is +// an opaque value, the default watch behavior for non-zero watch is to watch +// the next value (if you pass "1", you will see updates from "2" onwards). +func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { + if resourceVersion == "" || resourceVersion == "0" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return 0, errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", resourceVersion)}) + } + return version + 1, nil +} + // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned // watch.Interface. resourceVersion may be used to specify what version to begin diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 0274ff6607c..ad5bd071634 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -601,3 +602,38 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("An injected error did not cause a graceful shutdown") } } + +func TestEtcdParseWatchResourceVersion(t *testing.T) { + testCases := []struct { + Version string + Kind string + ExpectVersion uint64 + Err bool + }{ + {Version: "", ExpectVersion: 0}, + {Version: "a", Err: true}, + {Version: " ", Err: true}, + {Version: "1", ExpectVersion: 2}, + {Version: "10", ExpectVersion: 11}, + } + for _, testCase := range testCases { + version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) + switch { + case testCase.Err: + if err == nil { + t.Errorf("%s: unexpected non-error", testCase.Version) + continue + } + if !errors.IsInvalid(err) { + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + case !testCase.Err && err != nil: + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + if version != testCase.ExpectVersion { + t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) + } + } +}