Fix watch for events; add test for interface implementation so it won't break again.

This commit is contained in:
Daniel Smith 2014-11-12 13:30:13 -08:00
parent 02a0593df0
commit 178d0af795
11 changed files with 72 additions and 69 deletions

View File

@ -64,10 +64,3 @@ func InterpretDeleteError(err error, kind, name string) error {
return err return err
} }
} }
// InterpretResourceVersionError returns the appropriate api error
// for a failure to convert the resource version of an object sent
// to the API to an etcd uint64 index.
func InterpretResourceVersionError(err error, kind, value string) error {
return errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", value)})
}

View File

@ -18,7 +18,6 @@ package etcd
import ( import (
"fmt" "fmt"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -97,21 +96,6 @@ func makePodKey(ctx api.Context, id string) (string, error) {
return MakeEtcdItemKey(ctx, PodPath, id) return MakeEtcdItemKey(ctx, PodPath, id)
} }
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion)
}
return version + 1, nil
}
// ListPods obtains a list of pods with labels that match selector. // ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
@ -143,7 +127,7 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool
// WatchPods begins watching for new, changed, or deleted pods. // WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "pod") version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -354,7 +338,7 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL
// WatchControllers begins watching for new, changed, or deleted controllers. // WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "replicationControllers") version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -516,7 +500,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
// WatchServices begins watching for new, changed, or deleted service configurations. // WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "service") version, err := tools.ParseWatchResourceVersion(resourceVersion, "service")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -561,7 +545,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "endpoints") version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -42,41 +42,6 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
return registry return registry
} }
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash // TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
func TestEtcdGetPodDifferentNamespace(t *testing.T) { func TestEtcdGetPodDifferentNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)

View File

@ -109,7 +109,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch returns Events events via a watch.Interface. // Watch returns Events events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -50,6 +51,8 @@ func TestRESTCreate(t *testing.T) {
if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) { if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a)) t.Errorf("diff: %s", util.ObjectDiff(e, a))
} }
// Ensure we implement the interface
_ = apiserver.ResourceWatcher(rest)
} }
func TestRESTDelete(t *testing.T) { func TestRESTDelete(t *testing.T) {
@ -216,7 +219,7 @@ func TestRESTWatch(t *testing.T) {
Reason: "forTesting", Reason: "forTesting",
} }
reg, rest := NewTestREST() reg, rest := NewTestREST()
wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0) wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), "0")
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
} }

View File

@ -91,8 +91,12 @@ func (e *Etcd) Delete(ctx api.Context, id string) error {
// Watch starts a watch for the items that m matches. // Watch starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list. // TODO: Detect if m references a single object instead of a list.
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
return e.Helper.WatchList(e.KeyRoot, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj) matches, err := m.Matches(obj)
return err == nil && matches return err == nil && matches
}) })

View File

@ -417,7 +417,7 @@ func TestEtcdWatch(t *testing.T) {
} }
fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient, registry := NewTestGenericEtcdRegistry(t)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1) wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1")
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -76,7 +76,7 @@ type Registry interface {
Update(ctx api.Context, id string, obj runtime.Object) error Update(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error) Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string) error Delete(ctx api.Context, id string) error
Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
} }
// FilterList filters any list object that conforms to the api conventions, // FilterList filters any list object that conforms to the api conventions,

View File

@ -52,7 +52,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje
return generic.FilterList(r.ObjectList, m) return generic.FilterList(r.ObjectList, m)
} }
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :( // TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Mux.Watch(), nil return r.Mux.Watch(), nil
} }

View File

@ -17,13 +17,16 @@ limitations under the License.
package tools package tools
import ( import (
"strconv"
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -37,6 +40,21 @@ func Everything(runtime.Object) bool {
return true return true
} }
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", resourceVersion)})
}
return version + 1, nil
}
// WatchList begins watching the specified key's items. Items are decoded into // WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned // API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin // watch.Interface. resourceVersion may be used to specify what version to begin

View File

@ -23,6 +23,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -601,3 +602,38 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("An injected error did not cause a graceful shutdown") t.Errorf("An injected error did not cause a graceful shutdown")
} }
} }
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}