From 82bcdd3b3bef623a6a158481d4d1958f5b1c96d3 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 7 Oct 2014 16:51:28 -0400 Subject: [PATCH 1/3] Make ResourceVersion a string internally instead of uint64 Allows us to define different watch versioning regimes in the future as well as to encode information with the resource version. This changes /watch/resources?resourceVersion=3 to start the watch at 4 instead of 3, which means clients can read a resource version and then send it back to the server. Clients should no longer do math on resource versions. --- cmd/kubecfg/kubecfg.go | 4 +- pkg/api/errors/etcd/etcd.go | 7 ++ pkg/api/latest/latest_test.go | 20 +++++- pkg/api/ref_test.go | 10 +-- pkg/api/serialization_test.go | 18 ++++- pkg/api/types.go | 4 +- pkg/api/v1beta1/conversion.go | 66 +++++++++++++++++++ pkg/api/v1beta2/conversion.go | 66 +++++++++++++++++++ pkg/apiserver/apiserver_test.go | 4 +- pkg/apiserver/interfaces.go | 2 +- pkg/apiserver/watch.go | 9 +-- pkg/apiserver/watch_test.go | 12 ++-- pkg/client/cache/reflector.go | 8 +-- pkg/client/cache/reflector_test.go | 65 +++++++++--------- pkg/client/client.go | 26 ++++---- pkg/client/client_test.go | 6 +- pkg/client/fake.go | 6 +- pkg/client/request.go | 8 +++ pkg/controller/replication_controller.go | 6 +- pkg/controller/replication_controller_test.go | 4 +- pkg/kubelet/config/etcd.go | 2 +- pkg/master/master.go | 2 +- pkg/proxy/config/api.go | 24 +++---- pkg/proxy/config/api_test.go | 56 ++++++++-------- pkg/registry/controller/registry.go | 2 +- pkg/registry/controller/rest.go | 2 +- pkg/registry/controller/rest_test.go | 4 +- pkg/registry/endpoint/registry.go | 2 +- pkg/registry/endpoint/rest.go | 2 +- pkg/registry/endpoint/rest_test.go | 4 +- pkg/registry/etcd/etcd.go | 62 ++++++++++++----- pkg/registry/etcd/etcd_test.go | 58 ++++++++++++---- pkg/registry/pod/registry.go | 2 +- pkg/registry/pod/rest.go | 2 +- pkg/registry/pod/rest_test.go | 4 +- pkg/registry/registrytest/controller.go | 2 +- pkg/registry/registrytest/pod.go | 2 +- pkg/registry/registrytest/service.go | 4 +- pkg/registry/service/registry.go | 2 +- pkg/registry/service/rest.go | 2 +- pkg/registry/service/rest_test.go | 4 +- pkg/runtime/interfaces.go | 4 +- pkg/runtime/jsonbase.go | 16 ++--- pkg/runtime/jsonbase_test.go | 32 ++++----- pkg/runtime/types.go | 2 +- pkg/service/endpoints_controller.go | 2 +- pkg/service/endpoints_controller_test.go | 8 +-- pkg/tools/etcd_tools.go | 34 +++++++++- pkg/tools/etcd_tools_test.go | 18 ++--- pkg/tools/etcd_tools_watch.go | 4 +- pkg/tools/etcd_tools_watch_test.go | 10 +-- plugin/pkg/scheduler/factory/factory.go | 4 +- plugin/pkg/scheduler/factory/factory_test.go | 22 ++++--- test/integration/etcd_tools_test.go | 7 +- 54 files changed, 518 insertions(+), 240 deletions(-) diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index 0e6c01d4c2f..7260249f608 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -311,7 +311,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { validStorage := checkStorage(storage) verb := "" setBody := false - var version uint64 + var version string printer := getPrinter() @@ -369,7 +369,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { r.ParseSelectorParam("labels", *selector) } if setBody { - if version != 0 { + if len(version) > 0 { data := readConfig(storage, c.RESTClient.Codec) obj, err := latest.Codec.Decode(data) if err != nil { diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index dee022828f5..af4a3c8b796 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -64,3 +64,10 @@ func InterpretDeleteError(err error, kind, name string) error { return err } } + +// InterpretResourceVersionError returns the appropriate api error +// for a failure to convert the resource version of an object sent +// to the API to an etcd uint64 index. +func InterpretResourceVersionError(err error, kind, value string) error { + return errors.NewInvalid(kind, "", errors.ErrorList{errors.NewFieldInvalid("resourceVersion", value)}) +} diff --git a/pkg/api/latest/latest_test.go b/pkg/api/latest/latest_test.go index 9ea3a0f9d9a..aca3a4b16ad 100644 --- a/pkg/api/latest/latest_test.go +++ b/pkg/api/latest/latest_test.go @@ -19,6 +19,7 @@ package latest import ( "encoding/json" "reflect" + "strconv" "testing" internal "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -41,7 +42,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( // TODO: Fix JSON/YAML packages and/or write custom encoding // for uint64's. Somehow the LS *byte* of this is lost, but // only when all 8 bytes are set. - j.ResourceVersion = c.RandUint64() >> 8 + j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) j.SelfLink = c.RandString() var sec, nsec int64 @@ -49,6 +50,19 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( c.Fuzz(&nsec) j.CreationTimestamp = util.Unix(sec, nsec).Rfc3339Copy() }, + func(j *internal.ObjectReference, c fuzz.Continue) { + // We have to customize the randomization of TypeMetas because their + // APIVersion and Kind must remain blank in memory. + j.APIVersion = c.RandString() + j.Kind = c.RandString() + j.Namespace = c.RandString() + j.Name = c.RandString() + // TODO: Fix JSON/YAML packages and/or write custom encoding + // for uint64's. Somehow the LS *byte* of this is lost, but + // only when all 8 bytes are set. + j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) + j.FieldPath = c.RandString() + }, func(intstr *util.IntOrString, c fuzz.Continue) { // util.IntOrString will panic if its kind is set wrong. if c.RandBool() { @@ -120,12 +134,12 @@ func TestInternalRoundTrip(t *testing.T) { } func TestResourceVersioner(t *testing.T) { - pod := internal.Pod{TypeMeta: internal.TypeMeta{ResourceVersion: 10}} + pod := internal.Pod{TypeMeta: internal.TypeMeta{ResourceVersion: "10"}} version, err := ResourceVersioner.ResourceVersion(&pod) if err != nil { t.Fatalf("unexpected error: %v", err) } - if version != 10 { + if version != "10" { t.Errorf("unexpected version %d", version) } } diff --git a/pkg/api/ref_test.go b/pkg/api/ref_test.go index 1f408ed0e81..32cb5fb7d36 100644 --- a/pkg/api/ref_test.go +++ b/pkg/api/ref_test.go @@ -37,7 +37,7 @@ func TestGetReference(t *testing.T) { obj: &Pod{ TypeMeta: TypeMeta{ ID: "foo", - ResourceVersion: 42, + ResourceVersion: "42", SelfLink: "/api/v1beta1/pods/foo", }, }, @@ -46,14 +46,14 @@ func TestGetReference(t *testing.T) { APIVersion: "v1beta1", Name: "foo", UID: "foo", - ResourceVersion: 42, + ResourceVersion: "42", }, }, "serviceList": { obj: &ServiceList{ TypeMeta: TypeMeta{ ID: "foo", - ResourceVersion: 42, + ResourceVersion: "42", SelfLink: "/api/v1beta2/services", }, }, @@ -62,14 +62,14 @@ func TestGetReference(t *testing.T) { APIVersion: "v1beta2", Name: "foo", UID: "foo", - ResourceVersion: 42, + ResourceVersion: "42", }, }, "badSelfLink": { obj: &ServiceList{ TypeMeta: TypeMeta{ ID: "foo", - ResourceVersion: 42, + ResourceVersion: "42", SelfLink: "v1beta2/services", }, }, diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index afe4b28eb49..a6612e93809 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -21,6 +21,7 @@ import ( "flag" "math/rand" "reflect" + "strconv" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -49,7 +50,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( // TODO: Fix JSON/YAML packages and/or write custom encoding // for uint64's. Somehow the LS *byte* of this is lost, but // only when all 8 bytes are set. - j.ResourceVersion = c.RandUint64() >> 8 + j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) j.SelfLink = c.RandString() var sec, nsec int64 @@ -66,7 +67,7 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( // TODO: Fix JSON/YAML packages and/or write custom encoding // for uint64's. Somehow the LS *byte* of this is lost, but // only when all 8 bytes are set. - j.ResourceVersion = c.RandUint64() >> 8 + j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) j.SelfLink = c.RandString() var sec, nsec int64 @@ -74,6 +75,19 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( c.Fuzz(&nsec) j.CreationTimestamp = util.Unix(sec, nsec).Rfc3339Copy() }, + func(j *api.ObjectReference, c fuzz.Continue) { + // We have to customize the randomization of TypeMetas because their + // APIVersion and Kind must remain blank in memory. + j.APIVersion = c.RandString() + j.Kind = c.RandString() + j.Namespace = c.RandString() + j.Name = c.RandString() + // TODO: Fix JSON/YAML packages and/or write custom encoding + // for uint64's. Somehow the LS *byte* of this is lost, but + // only when all 8 bytes are set. + j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) + j.FieldPath = c.RandString() + }, func(intstr *util.IntOrString, c fuzz.Continue) { // util.IntOrString will panic if its kind is set wrong. if c.RandBool() { diff --git a/pkg/api/types.go b/pkg/api/types.go index a4491e3fb9b..aa92998bb4e 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -250,7 +250,7 @@ type TypeMeta struct { ID string `json:"id,omitempty" yaml:"id,omitempty"` CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"` UID string `json:"uid,omitempty" yaml:"uid,omitempty"` @@ -675,7 +675,7 @@ type ObjectReference struct { Name string `json:"name,omitempty" yaml:"name,omitempty"` UID string `json:"uid,omitempty" yaml:"uid,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` // Optional. If referring to a piece of an object instead of an entire object, this string // should contain a valid field access statement. For example, diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index ed6d70cb59e..d7284483a3c 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -17,12 +17,78 @@ limitations under the License. package v1beta1 import ( + "strconv" + newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" ) func init() { newer.Scheme.AddConversionFuncs( + // TypeMeta has changed type of ResourceVersion internally + func(in *newer.TypeMeta, out *TypeMeta, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.ID = in.ID + out.CreationTimestamp = in.CreationTimestamp + out.SelfLink = in.SelfLink + out.Annotations = in.Annotations + + if len(in.ResourceVersion) > 0 { + v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) + if err != nil { + return err + } + out.ResourceVersion = v + } + return nil + }, + func(in *TypeMeta, out *newer.TypeMeta, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.ID = in.ID + out.CreationTimestamp = in.CreationTimestamp + out.SelfLink = in.SelfLink + out.Annotations = in.Annotations + + if in.ResourceVersion != 0 { + out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) + } + return nil + }, + + // ObjectReference has changed type of ResourceVersion internally + func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.Name = in.Name + out.FieldPath = in.FieldPath + + if len(in.ResourceVersion) > 0 { + v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) + if err != nil { + return err + } + out.ResourceVersion = v + } + return nil + }, + func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.Name = in.Name + out.FieldPath = in.FieldPath + + if in.ResourceVersion != 0 { + out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) + } + return nil + }, + // EnvVar's Key is deprecated in favor of Name. func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error { out.Value = in.Value diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 65cae8aadc9..abe3857ebca 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -17,12 +17,78 @@ limitations under the License. package v1beta2 import ( + "strconv" + newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" ) func init() { newer.Scheme.AddConversionFuncs( + // TypeMeta has changed type of ResourceVersion internally + func(in *newer.TypeMeta, out *TypeMeta, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.ID = in.ID + out.CreationTimestamp = in.CreationTimestamp + out.SelfLink = in.SelfLink + out.Annotations = in.Annotations + + if len(in.ResourceVersion) > 0 { + v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) + if err != nil { + return err + } + out.ResourceVersion = v + } + return nil + }, + func(in *TypeMeta, out *newer.TypeMeta, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.ID = in.ID + out.CreationTimestamp = in.CreationTimestamp + out.SelfLink = in.SelfLink + out.Annotations = in.Annotations + + if in.ResourceVersion != 0 { + out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) + } + return nil + }, + + // ObjectReference has changed type of ResourceVersion internally + func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.Name = in.Name + out.FieldPath = in.FieldPath + + if len(in.ResourceVersion) > 0 { + v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) + if err != nil { + return err + } + out.ResourceVersion = v + } + return nil + }, + func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error { + out.APIVersion = in.APIVersion + out.Kind = in.Kind + out.Namespace = in.Namespace + out.Name = in.Name + out.FieldPath = in.FieldPath + + if in.ResourceVersion != 0 { + out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) + } + return nil + }, + // EnvVar's Key is deprecated in favor of Name. func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error { out.Value = in.Value diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index f418486f0a1..6141fc4fa55 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -78,7 +78,7 @@ type SimpleRESTStorage struct { fakeWatch *watch.FakeWatcher requestedLabelSelector labels.Selector requestedFieldSelector labels.Selector - requestedResourceVersion uint64 + requestedResourceVersion string // The id requested, and location to return for ResourceLocation requestedResourceLocationID string @@ -144,7 +144,7 @@ func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (< } // Implement ResourceWatcher. -func (storage *SimpleRESTStorage) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (storage *SimpleRESTStorage) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { storage.requestedLabelSelector = label storage.requestedFieldSelector = field storage.requestedResourceVersion = resourceVersion diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 78adc146fcd..e700765fa0f 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -54,7 +54,7 @@ type ResourceWatcher interface { // are supported; an error should be returned if 'field' tries to select on a field that // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a // particular version. - Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // Redirector know how to return a remote resource's location. diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 13023971588..9d4c54e052d 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -20,7 +20,6 @@ import ( "net/http" "net/url" "regexp" - "strconv" "strings" "code.google.com/p/go.net/websocket" @@ -37,7 +36,7 @@ type WatchHandler struct { codec runtime.Codec } -func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { +func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) { if s, err := labels.ParseSelector(query.Get("labels")); err != nil { label = labels.Everything() } else { @@ -48,10 +47,8 @@ func getWatchParams(query url.Values) (label, field labels.Selector, resourceVer } else { field = s } - if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { - resourceVersion = rv - } - return label, field, resourceVersion + resourceVersion = query.Get("resourceVersion") + return } var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 3b734a888e1..56d03b1c7e8 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -156,28 +156,28 @@ func TestWatchParamParsing(t *testing.T) { table := []struct { rawQuery string - resourceVersion uint64 + resourceVersion string labelSelector string fieldSelector string }{ { rawQuery: "resourceVersion=1234", - resourceVersion: 1234, + resourceVersion: "1234", labelSelector: "", fieldSelector: "", }, { rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", - resourceVersion: 314159, + resourceVersion: "314159", labelSelector: "name=foo", fieldSelector: "Host=", }, { rawQuery: "fields=ID%3dfoo&resourceVersion=1492", - resourceVersion: 1492, + resourceVersion: "1492", labelSelector: "", fieldSelector: "ID=foo", }, { rawQuery: "", - resourceVersion: 0, + resourceVersion: "", labelSelector: "", fieldSelector: "", }, @@ -186,7 +186,7 @@ func TestWatchParamParsing(t *testing.T) { for _, item := range table { simpleStorage.requestedLabelSelector = nil simpleStorage.requestedFieldSelector = nil - simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases + simpleStorage.requestedResourceVersion = "5" // Prove this is set in all cases dest.RawQuery = item.rawQuery resp, err := http.Get(dest.String()) if err != nil { diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 748eb5b96e6..20298657260 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -35,7 +35,7 @@ type ListerWatcher interface { // ResourceVersion field will be used to start the watch in the right place. List() (runtime.Object, error) // Watch should begin a watch at the specified version. - Watch(resourceVersion uint64) (watch.Interface, error) + Watch(resourceVersion string) (watch.Interface, error) } // Reflector watches a specified resource and causes all changes to be reflected in the given store. @@ -71,7 +71,7 @@ func (r *Reflector) Run() { } func (r *Reflector) listAndWatch() { - var resourceVersion uint64 + var resourceVersion string list, err := r.listerWatcher.List() if err != nil { @@ -124,7 +124,7 @@ func (r *Reflector) syncWith(items []runtime.Object) error { } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error { start := time.Now() eventCount := 0 for { @@ -157,7 +157,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err default: glog.Errorf("unable to understand watch event %#v", event) } - *resourceVersion = jsonBase.ResourceVersion() + 1 + *resourceVersion = jsonBase.ResourceVersion() eventCount++ } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 4134a19d450..6162e8ea843 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -18,6 +18,7 @@ package cache import ( "fmt" + "strconv" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -27,11 +28,11 @@ import ( type testLW struct { ListFunc func() (runtime.Object, error) - WatchFunc func(resourceVersion uint64) (watch.Interface, error) + WatchFunc func(resourceVersion string) (watch.Interface, error) } func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } -func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) { +func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { return t.WatchFunc(resourceVersion) } @@ -42,7 +43,7 @@ func TestReflector_watchHandlerError(t *testing.T) { go func() { fw.Stop() }() - var resumeRV uint64 + var resumeRV string err := g.watchHandler(fw, &resumeRV) if err == nil { t.Errorf("unexpected non-error") @@ -58,11 +59,11 @@ func TestReflector_watchHandler(t *testing.T) { go func() { fw.Add(&api.Service{TypeMeta: api.TypeMeta{ID: "rejected"}}) fw.Delete(&api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}) - fw.Modify(&api.Pod{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: 55}}) - fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: 32}}) + fw.Modify(&api.Pod{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "55"}}) + fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "32"}}) fw.Stop() }() - var resumeRV uint64 + var resumeRV string err := g.watchHandler(fw, &resumeRV) if err != nil { t.Errorf("unexpected error %v", err) @@ -70,13 +71,13 @@ func TestReflector_watchHandler(t *testing.T) { table := []struct { ID string - RV uint64 + RV string exists bool }{ - {"foo", 0, false}, - {"rejected", 0, false}, - {"bar", 55, true}, - {"baz", 32, true}, + {"foo", "", false}, + {"rejected", "", false}, + {"bar", "55", true}, + {"baz", "32", true}, } for _, item := range table { obj, exists := s.Get(item.ID) @@ -92,7 +93,7 @@ func TestReflector_watchHandler(t *testing.T) { } // RV should stay 1 higher than the last id we see. - if e, a := uint64(33), resumeRV; e != a { + if e, a := "33", resumeRV; e != a { t.Errorf("expected %v, got %v", e, a) } } @@ -103,9 +104,9 @@ func TestReflector_listAndWatch(t *testing.T) { // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc // to get called at the beginning of the watch with 1, and again with 4 when we // inject an error at 3. - expectedRVs := []uint64{1, 4} + expectedRVs := []string{"1", "4"} lw := &testLW{ - WatchFunc: func(rv uint64) (watch.Interface, error) { + WatchFunc: func(rv string) (watch.Interface, error) { fw := watch.NewFake() if e, a := expectedRVs[0], rv; e != a { t.Errorf("Expected rv %v, but got %v", e, a) @@ -117,7 +118,7 @@ func TestReflector_listAndWatch(t *testing.T) { return fw, nil }, ListFunc: func() (runtime.Object, error) { - return &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: 1}}, nil + return &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}}, nil }, } s := NewFIFO() @@ -130,9 +131,9 @@ func TestReflector_listAndWatch(t *testing.T) { if fw == nil { fw = <-createdFakes } - sendingRV := uint64(i + 2) + sendingRV := strconv.FormatUint(uint64(i+2), 10) fw.Add(&api.Pod{TypeMeta: api.TypeMeta{ID: id, ResourceVersion: sendingRV}}) - if sendingRV == 3 { + if sendingRV == "3" { // Inject a failure. fw.Stop() fw = nil @@ -145,7 +146,7 @@ func TestReflector_listAndWatch(t *testing.T) { if e, a := id, pod.ID; e != a { t.Errorf("%v: Expected %v, got %v", i, e, a) } - if e, a := uint64(i+2), pod.ResourceVersion; e != a { + if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a { t.Errorf("%v: Expected %v, got %v", i, e, a) } } @@ -156,10 +157,10 @@ func TestReflector_listAndWatch(t *testing.T) { } func TestReflector_listAndWatchWithErrors(t *testing.T) { - mkPod := func(id string, rv uint64) *api.Pod { + mkPod := func(id string, rv string) *api.Pod { return &api.Pod{TypeMeta: api.TypeMeta{ID: id, ResourceVersion: rv}} } - mkList := func(rv uint64, pods ...*api.Pod) *api.PodList { + mkList := func(rv string, pods ...*api.Pod) *api.PodList { list := &api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: rv}} for _, pod := range pods { list.Items = append(list.Items, *pod) @@ -173,29 +174,29 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { watchErr error }{ { - list: mkList(1), + list: mkList("1"), events: []watch.Event{ - {watch.Added, mkPod("foo", 2)}, - {watch.Added, mkPod("bar", 3)}, + {watch.Added, mkPod("foo", "2")}, + {watch.Added, mkPod("bar", "3")}, }, }, { - list: mkList(3, mkPod("foo", 2), mkPod("bar", 3)), + list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")), events: []watch.Event{ - {watch.Deleted, mkPod("foo", 4)}, - {watch.Added, mkPod("qux", 5)}, + {watch.Deleted, mkPod("foo", "4")}, + {watch.Added, mkPod("qux", "5")}, }, }, { listErr: fmt.Errorf("a list error"), }, { - list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)), + list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")), watchErr: fmt.Errorf("a watch error"), }, { - list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)), + list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")), events: []watch.Event{ - {watch.Added, mkPod("baz", 6)}, + {watch.Added, mkPod("baz", "6")}, }, }, { - list: mkList(6, mkPod("bar", 3), mkPod("qux", 5), mkPod("baz", 6)), + list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")), }, } @@ -204,7 +205,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { if item.list != nil { // Test that the list is what currently exists in the store. current := s.List() - checkMap := map[string]uint64{} + checkMap := map[string]string{} for _, item := range current { pod := item.(*api.Pod) checkMap[pod.ID] = pod.ResourceVersion @@ -220,7 +221,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { } watchRet, watchErr := item.events, item.watchErr lw := &testLW{ - WatchFunc: func(rv uint64) (watch.Interface, error) { + WatchFunc: func(rv string) (watch.Interface, error) { if watchErr != nil { return nil, watchErr } diff --git a/pkg/client/client.go b/pkg/client/client.go index c24d115ea34..80ecdfa0a38 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -53,7 +53,7 @@ type ReplicationControllerInterface interface { CreateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error) UpdateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error) DeleteReplicationController(ctx api.Context, id string) error - WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // ServiceInterface has methods to work with Service resources. @@ -63,14 +63,14 @@ type ServiceInterface interface { CreateService(ctx api.Context, srv *api.Service) (*api.Service, error) UpdateService(ctx api.Context, srv *api.Service) (*api.Service, error) DeleteService(ctx api.Context, id string) error - WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // EndpointsInterface has methods to work with Endpoints resources type EndpointsInterface interface { ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) - WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // VersionInterface has a method to retrieve the server version. @@ -122,7 +122,7 @@ func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err // UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} - if pod.ResourceVersion == 0 { + if len(pod.ResourceVersion) == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", pod) return } @@ -154,7 +154,7 @@ func (c *Client) CreateReplicationController(ctx api.Context, controller *api.Re // UpdateReplicationController updates an existing replication controller. func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} - if controller.ResourceVersion == 0 { + if len(controller.ResourceVersion) == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", controller) return } @@ -168,11 +168,11 @@ func (c *Client) DeleteReplicationController(ctx api.Context, id string) error { } // WatchReplicationControllers returns a watch.Interface that watches the requested controllers. -func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). Path("watch"). Path("replicationControllers"). - UintParam("resourceVersion", resourceVersion). + Param("resourceVersion", resourceVersion). SelectorParam("labels", label). SelectorParam("fields", field). Watch() @@ -202,7 +202,7 @@ func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.S // UpdateService updates an existing service. func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) { result = &api.Service{} - if svc.ResourceVersion == 0 { + if len(svc.ResourceVersion) == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", svc) return } @@ -216,11 +216,11 @@ func (c *Client) DeleteService(ctx api.Context, id string) error { } // WatchServices returns a watch.Interface that watches the requested services. -func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). Path("watch"). Path("services"). - UintParam("resourceVersion", resourceVersion). + Param("resourceVersion", resourceVersion). SelectorParam("labels", label). SelectorParam("fields", field). Watch() @@ -241,11 +241,11 @@ func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints } // WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. -func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). Path("watch"). Path("endpoints"). - UintParam("resourceVersion", resourceVersion). + Param("resourceVersion", resourceVersion). SelectorParam("labels", label). SelectorParam("fields", field). Watch() @@ -259,7 +259,7 @@ func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*ap func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) { result := &api.Endpoints{} - if endpoints.ResourceVersion == 0 { + if len(endpoints.ResourceVersion) == 0 { return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints) } err := c.Put(). diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 0e450211cfa..b77e725a5a4 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -265,7 +265,7 @@ func TestCreatePod(t *testing.T) { func TestUpdatePod(t *testing.T) { requestPod := &api.Pod{ - TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}, + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, CurrentState: api.PodState{ Status: "Foobar", }, @@ -330,7 +330,7 @@ func TestGetController(t *testing.T) { func TestUpdateController(t *testing.T) { requestController := &api.ReplicationController{ - TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}, + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, } c := &testClient{ Request: testRequest{Method: "PUT", Path: "/replicationControllers/foo"}, @@ -464,7 +464,7 @@ func TestCreateService(t *testing.T) { } func TestUpdateService(t *testing.T) { - svc := &api.Service{TypeMeta: api.TypeMeta{ID: "service-1", ResourceVersion: 1}} + svc := &api.Service{TypeMeta: api.TypeMeta{ID: "service-1", ResourceVersion: "1"}} c := &testClient{ Request: testRequest{Method: "PUT", Path: "/services/service-1", Body: svc}, Response: Response{StatusCode: 200, Body: svc}, diff --git a/pkg/client/fake.go b/pkg/client/fake.go index a5538fef43c..41eb9cd9c33 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -92,7 +92,7 @@ func (c *Fake) DeleteReplicationController(ctx api.Context, controller string) e return nil } -func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion}) return c.Watch, nil } @@ -122,7 +122,7 @@ func (c *Fake) DeleteService(ctx api.Context, service string) error { return nil } -func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) return c.Watch, c.Err } @@ -137,7 +137,7 @@ func (c *Fake) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error return &api.Endpoints{}, nil } -func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) return c.Watch, c.Err } diff --git a/pkg/client/request.go b/pkg/client/request.go index 17b84ad0436..0a3fba29a74 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -114,6 +114,14 @@ func (r *Request) UintParam(paramName string, u uint64) *Request { return r.setParam(paramName, strconv.FormatUint(u, 10)) } +// Param creates a query parameter with the given string value. +func (r *Request) Param(paramName, s string) *Request { + if r.err != nil { + return r + } + return r.setParam(paramName, s) +} + func (r *Request) setParam(paramName, value string) *Request { if specialParams.Has(paramName) { r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index f8ff2d7e7c4..43a7369e1b6 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -87,12 +87,12 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager { // Run begins watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { rm.syncTime = time.Tick(period) - resourceVersion := uint64(0) + resourceVersion := "" go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period) } // resourceVersion is a pointer to the resource version to use/update. -func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { +func (rm *ReplicationManager) watchControllers(resourceVersion *string) { ctx := api.NewContext() watching, err := rm.kubeClient.WatchReplicationControllers( ctx, @@ -124,7 +124,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { continue } // If we get disconnected, start where we left off. - *resourceVersion = rc.ResourceVersion + 1 + *resourceVersion = rc.ResourceVersion // Sync even if this is a deletion event, to ensure that we leave // it in the desired state. glog.V(4).Infof("About to sync from watch: %v", rc.ID) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index d884a7aa134..fa3c3890d91 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -324,7 +324,7 @@ type FakeWatcher struct { *client.Fake } -func (fw FakeWatcher) WatchReplicationControllers(ctx api.Context, l, f labels.Selector, rv uint64) (watch.Interface, error) { +func (fw FakeWatcher) WatchReplicationControllers(ctx api.Context, l, f labels.Selector, rv string) (watch.Interface, error) { return fw.w, nil } @@ -341,7 +341,7 @@ func TestWatchControllers(t *testing.T) { return nil } - resourceVersion := uint64(0) + resourceVersion := "" go manager.watchControllers(&resourceVersion) // Test normal case diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 008c710ef54..db6b4de458e 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -47,7 +47,7 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface helper := tools.EtcdHelper{ client, latest.Codec, - latest.ResourceVersioner, + tools.RuntimeVersionAdapter{latest.ResourceVersioner}, } source := &SourceEtcd{ key: key, diff --git a/pkg/master/master.go b/pkg/master/master.go index 895454df8e5..b7d9272fd28 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -78,7 +78,7 @@ func NewEtcdHelper(etcdServers []string, version string) (helper tools.EtcdHelpe if err != nil { return helper, err } - return tools.EtcdHelper{client, versionInterfaces.Codec, versionInterfaces.ResourceVersioner}, nil + return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.ResourceVersioner}}, nil } // New returns a new instance of Master connected to the given etcd server. diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index e1e28c55cee..e62c122e3f1 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -31,8 +31,8 @@ import ( type Watcher interface { ListServices(ctx api.Context, label labels.Selector) (*api.ServiceList, error) ListEndpoints(ctx api.Context, label labels.Selector) (*api.EndpointsList, error) - WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) - WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) + WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } // SourceAPI implements a configuration source for services and endpoints that @@ -57,12 +57,12 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU // prevent hot loops if the server starts to misbehave reconnectDuration: time.Second * 1, } - serviceVersion := uint64(0) + serviceVersion := "" go util.Forever(func() { config.runServices(&serviceVersion) time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) }, period) - endpointVersion := uint64(0) + endpointVersion := "" go util.Forever(func() { config.runEndpoints(&endpointVersion) time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) @@ -71,9 +71,9 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU } // runServices loops forever looking for changes to services. -func (s *SourceAPI) runServices(resourceVersion *uint64) { +func (s *SourceAPI) runServices(resourceVersion *string) { ctx := api.NewContext() - if *resourceVersion == 0 { + if len(*resourceVersion) == 0 { services, err := s.client.ListServices(ctx, labels.Everything()) if err != nil { glog.Errorf("Unable to load services: %v", err) @@ -97,7 +97,7 @@ func (s *SourceAPI) runServices(resourceVersion *uint64) { } // handleServicesWatch loops over an event channel and delivers config changes to an update channel. -func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) { +func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) { for { select { case event, ok := <-ch: @@ -107,7 +107,7 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates } service := event.Object.(*api.Service) - *resourceVersion = service.ResourceVersion + 1 + *resourceVersion = service.ResourceVersion switch event.Type { case watch.Added, watch.Modified: @@ -121,9 +121,9 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates } // runEndpoints loops forever looking for changes to endpoints. -func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { +func (s *SourceAPI) runEndpoints(resourceVersion *string) { ctx := api.NewContext() - if *resourceVersion == 0 { + if len(*resourceVersion) == 0 { endpoints, err := s.client.ListEndpoints(ctx, labels.Everything()) if err != nil { glog.Errorf("Unable to load endpoints: %v", err) @@ -147,7 +147,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { } // handleEndpointsWatch loops over an event channel and delivers config changes to an update channel. -func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { +func handleEndpointsWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { for { select { case event, ok := <-ch: @@ -157,7 +157,7 @@ func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, update } endpoints := event.Object.(*api.Endpoints) - *resourceVersion = endpoints.ResourceVersion + 1 + *resourceVersion = endpoints.ResourceVersion switch event.Type { case watch.Added, watch.Modified: diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index f1cb92ef4ba..91b8e2167b7 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -27,13 +27,13 @@ import ( ) func TestServices(t *testing.T) { - service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}} + service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}} fakeWatch := watch.NewFake() fakeClient := &client.Fake{Watch: fakeWatch} services := make(chan ServiceUpdate) source := SourceAPI{client: fakeClient, services: services} - resourceVersion := uint64(1) + resourceVersion := "1" go func() { // called twice source.runServices(&resourceVersion) @@ -42,7 +42,7 @@ func TestServices(t *testing.T) { // test adding a service to the watch fakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) { t.Errorf("expected call to watch-services, got %#v", fakeClient) } @@ -66,26 +66,26 @@ func TestServices(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}, {"watch-services", uint64(3)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}, {"watch-services", "3"}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } func TestServicesFromZero(t *testing.T) { - service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}} + service := api.Service{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}} fakeWatch := watch.NewFake() fakeWatch.Stop() fakeClient := &client.Fake{Watch: fakeWatch} fakeClient.ServiceList = api.ServiceList{ - TypeMeta: api.TypeMeta{ResourceVersion: 2}, + TypeMeta: api.TypeMeta{ResourceVersion: "2"}, Items: []api.Service{ service, }, } services := make(chan ServiceUpdate) source := SourceAPI{client: fakeClient, services: services} - resourceVersion := uint64(0) + resourceVersion := "" ch := make(chan struct{}) go func() { source.runServices(&resourceVersion) @@ -101,10 +101,10 @@ func TestServicesFromZero(t *testing.T) { // should have listed, then watched <-ch - if resourceVersion != 2 { + if resourceVersion != "2" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", uint64(2)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", "2"}}) { t.Errorf("unexpected actions, got %#v", fakeClient) } } @@ -113,7 +113,7 @@ func TestServicesError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} services := make(chan ServiceUpdate) source := SourceAPI{client: fakeClient, services: services} - resourceVersion := uint64(1) + resourceVersion := "1" ch := make(chan struct{}) go func() { source.runServices(&resourceVersion) @@ -122,10 +122,10 @@ func TestServicesError(t *testing.T) { // should have listed only <-ch - if resourceVersion != 1 { + if resourceVersion != "1" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) { t.Errorf("unexpected actions, got %#v", fakeClient) } } @@ -134,7 +134,7 @@ func TestServicesFromZeroError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} services := make(chan ServiceUpdate) source := SourceAPI{client: fakeClient, services: services} - resourceVersion := uint64(0) + resourceVersion := "" ch := make(chan struct{}) go func() { source.runServices(&resourceVersion) @@ -143,7 +143,7 @@ func TestServicesFromZeroError(t *testing.T) { // should have listed only <-ch - if resourceVersion != 0 { + if resourceVersion != "" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}}) { @@ -152,13 +152,13 @@ func TestServicesFromZeroError(t *testing.T) { } func TestEndpoints(t *testing.T) { - endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} + endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}} fakeWatch := watch.NewFake() fakeClient := &client.Fake{Watch: fakeWatch} endpoints := make(chan EndpointsUpdate) source := SourceAPI{client: fakeClient, endpoints: endpoints} - resourceVersion := uint64(1) + resourceVersion := "1" go func() { // called twice source.runEndpoints(&resourceVersion) @@ -167,7 +167,7 @@ func TestEndpoints(t *testing.T) { // test adding an endpoint to the watch fakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } @@ -191,26 +191,26 @@ func TestEndpoints(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}, {"watch-endpoints", uint64(3)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "3"}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } func TestEndpointsFromZero(t *testing.T) { - endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} + endpoint := api.Endpoints{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}} fakeWatch := watch.NewFake() fakeWatch.Stop() fakeClient := &client.Fake{Watch: fakeWatch} fakeClient.EndpointsList = api.EndpointsList{ - TypeMeta: api.TypeMeta{ResourceVersion: 2}, + TypeMeta: api.TypeMeta{ResourceVersion: "2"}, Items: []api.Endpoints{ endpoint, }, } endpoints := make(chan EndpointsUpdate) source := SourceAPI{client: fakeClient, endpoints: endpoints} - resourceVersion := uint64(0) + resourceVersion := "" ch := make(chan struct{}) go func() { source.runEndpoints(&resourceVersion) @@ -226,10 +226,10 @@ func TestEndpointsFromZero(t *testing.T) { // should have listed, then watched <-ch - if resourceVersion != 2 { + if resourceVersion != "2" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", uint64(2)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", "2"}}) { t.Errorf("unexpected actions, got %#v", fakeClient) } } @@ -238,7 +238,7 @@ func TestEndpointsError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} endpoints := make(chan EndpointsUpdate) source := SourceAPI{client: fakeClient, endpoints: endpoints} - resourceVersion := uint64(1) + resourceVersion := "1" ch := make(chan struct{}) go func() { source.runEndpoints(&resourceVersion) @@ -247,10 +247,10 @@ func TestEndpointsError(t *testing.T) { // should have listed only <-ch - if resourceVersion != 1 { + if resourceVersion != "1" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) { t.Errorf("unexpected actions, got %#v", fakeClient) } } @@ -259,7 +259,7 @@ func TestEndpointsFromZeroError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} endpoints := make(chan EndpointsUpdate) source := SourceAPI{client: fakeClient, endpoints: endpoints} - resourceVersion := uint64(0) + resourceVersion := "" ch := make(chan struct{}) go func() { source.runEndpoints(&resourceVersion) @@ -268,7 +268,7 @@ func TestEndpointsFromZeroError(t *testing.T) { // should have listed only <-ch - if resourceVersion != 0 { + if resourceVersion != "" { t.Errorf("unexpected resource version, got %#v", resourceVersion) } if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}}) { diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index 8afa3ee1563..b432ecbc1cc 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -24,7 +24,7 @@ import ( // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) - WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) + WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) CreateController(ctx api.Context, controller *api.ReplicationController) error UpdateController(ctx api.Context, controller *api.ReplicationController) error diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index d604a482747..98aa2cac522 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -149,7 +149,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje // Watch returns ReplicationController events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 72617edf85e..32e4fe6953a 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -50,7 +50,7 @@ func TestListControllersError(t *testing.T) { } func TestListEmptyControllerList(t *testing.T) { - mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{TypeMeta: api.TypeMeta{ResourceVersion: 1}}} + mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}}} storage := REST{ registry: &mockRegistry, } @@ -63,7 +63,7 @@ func TestListEmptyControllerList(t *testing.T) { if len(controllers.(*api.ReplicationControllerList).Items) != 0 { t.Errorf("Unexpected non-zero ctrl list: %#v", controllers) } - if controllers.(*api.ReplicationControllerList).ResourceVersion != 1 { + if controllers.(*api.ReplicationControllerList).ResourceVersion != "1" { t.Errorf("Unexpected resource version: %#v", controllers) } } diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index cb88fd3fe9e..f93d6bed686 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -26,6 +26,6 @@ import ( type Registry interface { ListEndpoints(ctx api.Context) (*api.EndpointsList, error) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) - WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion string) (watch.Interface, error) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error } diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index ad8068b5d62..51c945bcf8e 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -55,7 +55,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch returns Endpoint events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion) } diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index f895d8c0fa5..282cd80622e 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -74,7 +74,7 @@ func TestEndpointsRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() storage := NewREST(registry) registry.EndpointsList = api.EndpointsList{ - TypeMeta: api.TypeMeta{ResourceVersion: 1}, + TypeMeta: api.TypeMeta{ResourceVersion: "1"}, Items: []api.Endpoints{ {TypeMeta: api.TypeMeta{ID: "foo"}}, {TypeMeta: api.TypeMeta{ID: "bar"}}, @@ -92,7 +92,7 @@ func TestEndpointsRegistryList(t *testing.T) { if e, a := "bar", sl.Items[1].ID; e != a { t.Errorf("Expected %v, but got %v", e, a) } - if sl.ResourceVersion != 1 { + if sl.ResourceVersion != "1" { t.Errorf("Unexpected resource version: %#v", sl) } } diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 9f131e85c93..7c8b5815ee5 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -18,6 +18,7 @@ package etcd import ( "fmt" + "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" @@ -53,6 +54,21 @@ func makePodKey(podID string) string { return "/registry/pods/" + podID } +// parseWatchResourceVersion takes a resource version argument and converts it to +// the etcd version we should pass to helper.Watch(). Because resourceVersion is +// an opaque value, the default watch behavior for non-zero watch is to watch +// the next value (if you pass "1", you will see updates from "2" onwards). +func parseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { + if resourceVersion == "" || resourceVersion == "0" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion) + } + return version + 1, nil +} + // ListPods obtains a list of pods with labels that match selector. func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { @@ -63,7 +79,7 @@ func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.Pod // ListPodsPredicate obtains a list of pods that match filter. func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { allPods := api.PodList{} - err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion) + err := r.ExtractToList("/registry/pods", &allPods) if err != nil { return nil, err } @@ -82,8 +98,12 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool { +func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { + version, err := parseWatchResourceVersion(resourceVersion, "pod") + if err != nil { + return nil, err + } + return r.WatchList("/registry/pods", version, func(obj runtime.Object) bool { switch t := obj.(type) { case *api.Pod: return filter(t) @@ -227,13 +247,17 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { // ListControllers obtains a list of ReplicationControllers. func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) { controllers := &api.ReplicationControllerList{} - err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion) + err := r.ExtractToList("/registry/controllers", controllers) return controllers, err } // WatchControllers begins watching for new, changed, or deleted controllers. -func (r *Registry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) { - return r.WatchList("/registry/controllers", resourceVersion, tools.Everything) +func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { + version, err := parseWatchResourceVersion(resourceVersion, "replicationControllers") + if err != nil { + return nil, err + } + return r.WatchList("/registry/controllers", version, tools.Everything) } func makeControllerKey(id string) string { @@ -277,7 +301,7 @@ func makeServiceKey(name string) string { // ListServices obtains a list of Services. func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) { list := &api.ServiceList{} - err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion) + err := r.ExtractToList("/registry/services/specs", list) return list, err } @@ -337,15 +361,19 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error { } // WatchServices begins watching for new, changed, or deleted service configurations. -func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + version, err := parseWatchResourceVersion(resourceVersion, "service") + if err != nil { + return nil, err + } if !label.Empty() { return nil, fmt.Errorf("label selectors are not supported on services") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceKey(value), resourceVersion), nil + return r.Watch(makeServiceKey(value), version), nil } if field.Empty() { - return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything) + return r.WatchList("/registry/services/specs", version, tools.Everything) } return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } @@ -353,7 +381,7 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, // ListEndpoints obtains a list of Services. func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { list := &api.EndpointsList{} - err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion) + err := r.ExtractToList("/registry/services/endpoints", list) return list, err } @@ -369,15 +397,19 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { } // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. -func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + version, err := parseWatchResourceVersion(resourceVersion, "endpoints") + if err != nil { + return nil, err + } if !label.Empty() { return nil, fmt.Errorf("label selectors are not supported on endpoints") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil + return r.Watch(makeServiceEndpointsKey(value), version), nil } if field.Empty() { - return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything) + return r.WatchList("/registry/services/endpoints", version, tools.Everything) } return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } @@ -388,7 +420,7 @@ func makeMinionKey(minionID string) string { func (r *Registry) ListMinions(ctx api.Context) (*api.MinionList, error) { minions := &api.MinionList{} - err := r.ExtractList("/registry/minions", &minions.Items, &minions.ResourceVersion) + err := r.ExtractToList("/registry/minions", minions) return minions, err } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index afdcaedfaa3..648c8d7d1b2 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -18,6 +18,7 @@ package etcd import ( "reflect" + "strconv" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,13 +34,48 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, latest.ResourceVersioner}, + registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, &pod.BasicManifestFactory{ ServiceRegistry: ®istrytest.ServiceRegistry{}, }) return registry } +func TestEtcdParseWatchResourceVersion(t *testing.T) { + testCases := []struct { + Version string + Kind string + ExpectVersion uint64 + Err bool + }{ + {Version: "", ExpectVersion: 0}, + {Version: "a", Err: true}, + {Version: " ", Err: true}, + {Version: "1", ExpectVersion: 2}, + {Version: "10", ExpectVersion: 11}, + } + for _, testCase := range testCases { + version, err := parseWatchResourceVersion(testCase.Version, testCase.Kind) + switch { + case testCase.Err: + if err == nil { + t.Errorf("%s: unexpected non-error", testCase.Version) + continue + } + if !errors.IsInvalid(err) { + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + case !testCase.Err && err != nil: + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + if version != testCase.ExpectVersion { + t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) + } + } +} + func TestEtcdGetPod(t *testing.T) { ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) @@ -661,7 +697,7 @@ func TestEtcdUpdateController(t *testing.T) { resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.UpdateController(ctx, &api.ReplicationController{ - TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)}, DesiredState: api.ReplicationControllerState{ Replicas: 2, }, @@ -807,7 +843,7 @@ func TestEtcdUpdateService(t *testing.T) { resp, _ := fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) testService := api.Service{ - TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)}, Labels: map[string]string{ "baz": "bar", }, @@ -826,8 +862,8 @@ func TestEtcdUpdateService(t *testing.T) { } // Clear modified indices before the equality test. - svc.ResourceVersion = 0 - testService.ResourceVersion = 0 + svc.ResourceVersion = "" + testService.ResourceVersion = "" if !reflect.DeepEqual(*svc, testService) { t.Errorf("Unexpected service: got\n %#v\n, wanted\n %#v", svc, testService) } @@ -919,7 +955,7 @@ func TestEtcdWatchServices(t *testing.T) { watching, err := registry.WatchServices(ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"ID": "foo"}), - 1, + "1", ) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -948,7 +984,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), - 0, + "", ) if err == nil { t.Errorf("unexpected non-error: %v", err) @@ -958,7 +994,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { ctx, labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.Everything(), - 0, + "", ) if err == nil { t.Errorf("unexpected non-error: %v", err) @@ -973,7 +1009,7 @@ func TestEtcdWatchEndpoints(t *testing.T) { ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"ID": "foo"}), - 1, + "1", ) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -1002,7 +1038,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) { ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), - 0, + "", ) if err == nil { t.Errorf("unexpected non-error: %v", err) @@ -1012,7 +1048,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) { ctx, labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.Everything(), - 0, + "", ) if err == nil { t.Errorf("unexpected non-error: %v", err) diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 9b535ad82b2..ae630fed1bc 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -29,7 +29,7 @@ type Registry interface { // ListPodsPredicate obtains a list of pods for which filter returns true. ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods - WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) + WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod GetPod(ctx api.Context, podID string) (*api.Pod, error) // Create a pod based on a specification. diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 597ff3df007..e70e28a45c9 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -176,7 +176,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj } // Watch begins watching for new, changed, or deleted pods. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field)) } diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index c7050b4dd3e..3b538062cc3 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -143,7 +143,7 @@ func TestListPodsError(t *testing.T) { } func TestListEmptyPodList(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(&api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: 1}}) + podRegistry := registrytest.NewPodRegistry(&api.PodList{TypeMeta: api.TypeMeta{ResourceVersion: "1"}}) storage := REST{ registry: podRegistry, } @@ -156,7 +156,7 @@ func TestListEmptyPodList(t *testing.T) { if len(pods.(*api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) } - if pods.(*api.PodList).ResourceVersion != 1 { + if pods.(*api.PodList).ResourceVersion != "1" { t.Errorf("Unexpected resource version: %#v", pods) } } diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index e0a3912f518..041834b20f8 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -47,6 +47,6 @@ func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error return r.Err } -func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) { +func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index f77014d7ade..1ee58294fc7 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -63,7 +63,7 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api. }) } -func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { +func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.mux.Watch(), nil } diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 5d0b3b71379..8578d738a20 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -63,7 +63,7 @@ func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error return r.Err } -func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion string) (watch.Interface, error) { return nil, r.Err } @@ -81,6 +81,6 @@ func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) err return r.Err } -func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index ce4512213ee..dc6e95eef42 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -30,7 +30,7 @@ type Registry interface { GetService(ctx api.Context, name string) (*api.Service, error) DeleteService(ctx api.Context, name string) error UpdateService(ctx api.Context, svc *api.Service) error - WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion string) (watch.Interface, error) // TODO: endpoints and their implementation should be separated, setting endpoints should be // supported via the API, and the endpoints-controller should use the API to update endpoints. diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 0b607e02cf3..3742c31dff0 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -142,7 +142,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch returns Services events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.WatchServices(ctx, label, field, resourceVersion) } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 9aecceff5ea..71a9fb85bef 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -371,7 +371,7 @@ func TestServiceRegistryList(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo2"}, Selector: map[string]string{"bar2": "baz2"}, }) - registry.List.ResourceVersion = 1 + registry.List.ResourceVersion = "1" s, _ := storage.List(ctx, labels.Everything(), labels.Everything()) sl := s.(*api.ServiceList) if len(fakeCloud.Calls) != 0 { @@ -386,7 +386,7 @@ func TestServiceRegistryList(t *testing.T) { if e, a := "foo2", sl.Items[1].ID; e != a { t.Errorf("Expected %v, but got %v", e, a) } - if sl.ResourceVersion != 1 { + if sl.ResourceVersion != "1" { t.Errorf("Unexpected resource version: %#v", sl) } } diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index 517802e7699..ea914d4b5d7 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -36,8 +36,8 @@ type Codec interface { // ResourceVersioner provides methods for setting and retrieving // the resource version from an API object. type ResourceVersioner interface { - SetResourceVersion(obj Object, version uint64) error - ResourceVersion(obj Object) (uint64, error) + SetResourceVersion(obj Object, version string) error + ResourceVersion(obj Object) (string, error) } // SelfLinker provides methods for setting and retrieving the SelfLink field of an API object. diff --git a/pkg/runtime/jsonbase.go b/pkg/runtime/jsonbase.go index c0806d24ca9..f6f2af174b6 100644 --- a/pkg/runtime/jsonbase.go +++ b/pkg/runtime/jsonbase.go @@ -30,15 +30,15 @@ func NewTypeMetaResourceVersioner() ResourceVersioner { // jsonBaseModifier implements ResourceVersioner and SelfLinker. type jsonBaseModifier struct{} -func (v jsonBaseModifier) ResourceVersion(obj Object) (uint64, error) { +func (v jsonBaseModifier) ResourceVersion(obj Object) (string, error) { json, err := FindTypeMeta(obj) if err != nil { - return 0, err + return "", err } return json.ResourceVersion(), nil } -func (v jsonBaseModifier) SetResourceVersion(obj Object, version uint64) error { +func (v jsonBaseModifier) SetResourceVersion(obj Object, version string) error { json, err := FindTypeMeta(obj) if err != nil { return err @@ -86,8 +86,8 @@ type TypeMetaInterface interface { SetAPIVersion(version string) Kind() string SetKind(kind string) - ResourceVersion() uint64 - SetResourceVersion(version uint64) + ResourceVersion() string + SetResourceVersion(version string) SelfLink() string SetSelfLink(selfLink string) } @@ -96,7 +96,7 @@ type genericTypeMeta struct { id *string apiVersion *string kind *string - resourceVersion *uint64 + resourceVersion *string selfLink *string } @@ -124,11 +124,11 @@ func (g genericTypeMeta) SetKind(kind string) { *g.kind = kind } -func (g genericTypeMeta) ResourceVersion() uint64 { +func (g genericTypeMeta) ResourceVersion() string { return *g.resourceVersion } -func (g genericTypeMeta) SetResourceVersion(version uint64) { +func (g genericTypeMeta) SetResourceVersion(version string) { *g.resourceVersion = version } diff --git a/pkg/runtime/jsonbase_test.go b/pkg/runtime/jsonbase_test.go index 557ce132600..a70e0924c28 100644 --- a/pkg/runtime/jsonbase_test.go +++ b/pkg/runtime/jsonbase_test.go @@ -29,14 +29,14 @@ func TestGenericTypeMeta(t *testing.T) { ID string `json:"id,omitempty" yaml:"id,omitempty"` CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` } j := TypeMeta{ ID: "foo", APIVersion: "a", Kind: "b", - ResourceVersion: 1, + ResourceVersion: "1", SelfLink: "some/place/only/we/know", } g, err := newGenericTypeMeta(reflect.ValueOf(&j).Elem()) @@ -54,7 +54,7 @@ func TestGenericTypeMeta(t *testing.T) { if e, a := "b", jbi.Kind(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := uint64(1), jbi.ResourceVersion(); e != a { + if e, a := "1", jbi.ResourceVersion(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := "some/place/only/we/know", jbi.SelfLink(); e != a { @@ -64,7 +64,7 @@ func TestGenericTypeMeta(t *testing.T) { jbi.SetID("bar") jbi.SetAPIVersion("c") jbi.SetKind("d") - jbi.SetResourceVersion(2) + jbi.SetResourceVersion("2") jbi.SetSelfLink("google.com") // Prove that jbi changes the original object. @@ -77,7 +77,7 @@ func TestGenericTypeMeta(t *testing.T) { if e, a := "d", j.Kind; e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := uint64(2), j.ResourceVersion; e != a { + if e, a := "2", j.ResourceVersion; e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := "google.com", j.SelfLink; e != a { @@ -99,12 +99,12 @@ func (*MyIncorrectlyMarkedAsAPIObject) IsAnAPIObject() {} func TestResourceVersionerOfAPI(t *testing.T) { type T struct { Object - Expected uint64 + Expected string } testCases := map[string]T{ - "empty api object": {&MyAPIObject{}, 0}, - "api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1}, - "pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1}, + "empty api object": {&MyAPIObject{}, ""}, + "api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"}, + "pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"}, } versioning := NewTypeMetaResourceVersioner() for key, testCase := range testCases { @@ -119,9 +119,9 @@ func TestResourceVersionerOfAPI(t *testing.T) { failingCases := map[string]struct { Object - Expected uint64 + Expected string }{ - "not a valid object to try": {&MyIncorrectlyMarkedAsAPIObject{}, 1}, + "not a valid object to try": {&MyIncorrectlyMarkedAsAPIObject{}, "1"}, } for key, testCase := range failingCases { _, err := versioning.ResourceVersion(testCase.Object) @@ -132,20 +132,20 @@ func TestResourceVersionerOfAPI(t *testing.T) { setCases := map[string]struct { Object - Expected uint64 + Expected string }{ - "pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: 1}}, 1}, + "pointer to api object with version": {&MyAPIObject{TypeMeta: TypeMeta{ResourceVersion: "1"}}, "1"}, } for key, testCase := range setCases { - if err := versioning.SetResourceVersion(testCase.Object, 5); err != nil { + if err := versioning.SetResourceVersion(testCase.Object, "5"); err != nil { t.Errorf("%s: unexpected error %#v", key, err) } actual, err := versioning.ResourceVersion(testCase.Object) if err != nil { t.Errorf("%s: unexpected error %#v", key, err) } - if actual != 5 { - t.Errorf("%s: expected %d, got %d", key, 5, actual) + if actual != "5" { + t.Errorf("%s: expected %d, got %d", key, "5", actual) } } } diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index 94805f159d4..717550041f1 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -39,7 +39,7 @@ type TypeMeta struct { ID string `json:"id,omitempty" yaml:"id,omitempty"` CreationTimestamp util.Time `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` } diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 5acfbc75cdf..1c408d10f37 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -90,7 +90,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { *newEndpoints = *currentEndpoints newEndpoints.Endpoints = endpoints - if currentEndpoints.ResourceVersion == 0 { + if len(currentEndpoints.ResourceVersion) == 0 { // No previous endpoints, create them _, err = e.client.CreateEndpoints(nsCtx, newEndpoints) } else { diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index f59d9fdf549..e85f1d77f3e 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -194,7 +194,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { serverResponse{http.StatusOK, api.Endpoints{ TypeMeta: api.TypeMeta{ ID: "foo", - ResourceVersion: 1, + ResourceVersion: "1", }, Endpoints: []string{"6.7.8.9:1000"}, }}) @@ -206,7 +206,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{ TypeMeta: api.TypeMeta{ ID: "foo", - ResourceVersion: 1, + ResourceVersion: "1", }, Endpoints: []string{"1.2.3.4:8080"}, }) @@ -229,7 +229,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { serverResponse{http.StatusOK, serviceList}, serverResponse{http.StatusOK, api.Endpoints{ TypeMeta: api.TypeMeta{ - ResourceVersion: 1, + ResourceVersion: "1", }, Endpoints: []string{"1.2.3.4:8080"}, }}) @@ -263,7 +263,7 @@ func TestSyncEndpointsItems(t *testing.T) { } data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{ TypeMeta: api.TypeMeta{ - ResourceVersion: 0, + ResourceVersion: "", }, Endpoints: []string{"1.2.3.4:8080"}, }) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index bc8f4e75a03..b3fcb1facae 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "reflect" + "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/coreos/go-etcd/etcd" @@ -62,12 +63,43 @@ type EtcdGetSet interface { Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } +type EtcdResourceVersioner interface { + SetResourceVersion(obj runtime.Object, version uint64) error + ResourceVersion(obj runtime.Object) (uint64, error) +} + +// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner +type RuntimeVersionAdapter struct { + Versioner runtime.ResourceVersioner +} + +// SetResourceVersion implements EtcdResourceVersioner +func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error { + if version == 0 { + return a.Versioner.SetResourceVersion(obj, "") + } + s := strconv.FormatUint(version, 10) + return a.Versioner.SetResourceVersion(obj, s) +} + +// SetResourceVersion implements EtcdResourceVersioner +func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) { + version, err := a.Versioner.ResourceVersion(obj) + if err != nil { + return 0, err + } + if version == "" { + return 0, nil + } + return strconv.ParseUint(version, 10, 64) +} + // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { Client EtcdGetSet Codec runtime.Codec // optional, no atomic operations can be performed without this interface - ResourceVersioner runtime.ResourceVersioner + ResourceVersioner EtcdResourceVersioner } // IsEtcdNotFound returns true iff err is an etcd not found error. diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index c6cb3992cbf..54c6432c05f 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -44,7 +44,7 @@ func (*TestResource) IsAnAPIObject() {} var scheme *runtime.Scheme var codec runtime.Codec -var versioner = runtime.NewTypeMetaResourceVersioner() +var versioner = RuntimeVersionAdapter{runtime.NewTypeMetaResourceVersioner()} func init() { scheme = runtime.NewScheme() @@ -89,11 +89,11 @@ func TestExtractToList(t *testing.T) { }, } expect := api.PodList{ - TypeMeta: api.TypeMeta{ResourceVersion: 10}, + TypeMeta: api.TypeMeta{ResourceVersion: "10"}, Items: []api.Pod{ - {TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}}, - {TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: 2}}, - {TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: 3}}, + {TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}}, + {TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}}, + {TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "3"}}, }, } @@ -204,7 +204,7 @@ func TestSetObj(t *testing.T) { } func TestSetObjWithVersion(t *testing.T) { - obj := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: 1}} + obj := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}} fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true fakeClient.Data["/some/key"] = EtcdResponseWithError{ @@ -254,7 +254,7 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) { func TestAtomicUpdate(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()} + helper := EtcdHelper{fakeClient, codec, versioner} // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -308,7 +308,7 @@ func TestAtomicUpdate(t *testing.T) { func TestAtomicUpdateNoChange(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()} + helper := EtcdHelper{fakeClient, codec, versioner} // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -339,7 +339,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { func TestAtomicUpdate_CreateCollision(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, runtime.NewTypeMetaResourceVersioner()} + helper := EtcdHelper{fakeClient, codec, versioner} fakeClient.ExpectNotFoundGet("/some/key") diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index 6f0ada02519..18c57b012ff 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -82,7 +82,7 @@ type TransformFunc func(runtime.Object) (runtime.Object, error) // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { encoding runtime.Codec - versioner runtime.ResourceVersioner + versioner EtcdResourceVersioner transform TransformFunc list bool // If we're doing a recursive watch, should be true. @@ -107,7 +107,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 817cb32ad74..8c2169f198e 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -395,7 +395,7 @@ func TestWatchFromZeroIndex(t *testing.T) { testCases := map[string]struct { Response EtcdResponseWithError - ExpectedVersion uint64 + ExpectedVersion string ExpectedType watch.EventType }{ "get value created": { @@ -410,7 +410,7 @@ func TestWatchFromZeroIndex(t *testing.T) { EtcdIndex: 2, }, }, - 1, + "1", watch.Added, }, "get value modified": { @@ -425,7 +425,7 @@ func TestWatchFromZeroIndex(t *testing.T) { EtcdIndex: 3, }, }, - 2, + "2", watch.Modified, }, } @@ -510,10 +510,10 @@ func TestWatchListFromZeroIndex(t *testing.T) { if !ok { t.Fatalf("expected a pod, got %#v", event.Object) } - if actualPod.ResourceVersion != 1 { + if actualPod.ResourceVersion != "1" { t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) } - pod.ResourceVersion = 1 + pod.ResourceVersion = "1" if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { t.Errorf("Expected %v, got %v", e, a) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 05a91e3ee28..00a184d87e6 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -116,13 +116,13 @@ func (lw *listWatch) List() (runtime.Object, error) { Get() } -func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) { +func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { return lw.client. Get(). Path("watch"). Path(lw.resource). SelectorParam("fields", lw.fieldSelector). - UintParam("resourceVersion", resourceVersion). + Param("resourceVersion", resourceVersion). Watch() } diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index eec381d17e1..d2934c08021 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -85,37 +85,41 @@ func TestCreateLists(t *testing.T) { func TestCreateWatches(t *testing.T) { factory := ConfigFactory{nil} table := []struct { - rv uint64 + rv string location string factory func() *listWatch }{ // Minion watch { - rv: 0, + rv: "", + location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=", + factory: factory.createMinionLW, + }, { + rv: "0", location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=0", factory: factory.createMinionLW, }, { - rv: 42, + rv: "42", location: "/api/" + testapi.Version() + "/watch/minions?fields=&resourceVersion=42", factory: factory.createMinionLW, }, // Assigned pod watches { - rv: 0, - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", + rv: "", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=", factory: factory.createAssignedPodLW, }, { - rv: 42, + rv: "42", location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", factory: factory.createAssignedPodLW, }, // Unassigned pod watches { - rv: 0, - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", + rv: "", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=", factory: factory.createUnassignedPodLW, }, { - rv: 42, + rv: "42", location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", factory: factory.createUnassignedPodLW, }, diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 5a958003708..025316a6f3f 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -19,6 +19,7 @@ limitations under the License. package integration import ( + "strconv" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -92,7 +93,7 @@ func TestExtractObj(t *testing.T) { func TestWatch(t *testing.T) { client := newEtcdClient() - helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: latest.ResourceVersioner} + helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} withEtcdKey(func(key string) { resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) if err != nil { @@ -109,7 +110,7 @@ func TestWatch(t *testing.T) { // version should match what we set pod := event.Object.(*api.Pod) - if pod.ResourceVersion != expectedVersion { + if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) { t.Errorf("expected version %d, got %#v", expectedVersion, pod) } @@ -134,7 +135,7 @@ func TestWatch(t *testing.T) { t.Errorf("expected deleted event %#v", event) } pod = event.Object.(*api.Pod) - if pod.ResourceVersion != expectedVersion { + if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) { t.Errorf("expected version %d, got %#v", expectedVersion, pod) } }) From a5462c0678857eb4116a9cc1245f5a5c00ee33bf Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 7 Oct 2014 18:31:34 -0400 Subject: [PATCH 2/3] Change test cases to verify the client sends the received resourceVersion --- pkg/client/cache/reflector_test.go | 10 +++++----- pkg/proxy/config/api_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 6162e8ea843..89f977c5dfc 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -92,8 +92,8 @@ func TestReflector_watchHandler(t *testing.T) { } } - // RV should stay 1 higher than the last id we see. - if e, a := "33", resumeRV; e != a { + // RV should send the last version we see. + if e, a := "32", resumeRV; e != a { t.Errorf("expected %v, got %v", e, a) } } @@ -102,9 +102,9 @@ func TestReflector_listAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc - // to get called at the beginning of the watch with 1, and again with 4 when we - // inject an error at 3. - expectedRVs := []string{"1", "4"} + // to get called at the beginning of the watch with 1, and again with 3 when we + // inject an error. + expectedRVs := []string{"1", "3"} lw := &testLW{ WatchFunc: func(rv string) (watch.Interface, error) { fw := watch.NewFake() diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 91b8e2167b7..e7944d28d37 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -66,7 +66,7 @@ func TestServices(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}, {"watch-services", "3"}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}, {"watch-services", "2"}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } @@ -191,7 +191,7 @@ func TestEndpoints(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "3"}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "2"}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } From 094bc3796aeb17b8f9347e8a17b72f21c27be83e Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 7 Oct 2014 19:37:09 -0400 Subject: [PATCH 3/3] Update ObjectReference to use ResourceVersion string everywhere Breaks backwards compat for anyone using older versions --- pkg/api/serialization_test.go | 13 ------------- pkg/api/v1beta1/conversion.go | 30 ------------------------------ pkg/api/v1beta1/types.go | 2 +- pkg/api/v1beta2/conversion.go | 30 ------------------------------ pkg/api/v1beta2/types.go | 2 +- 5 files changed, 2 insertions(+), 75 deletions(-) diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index a6612e93809..16e2a7aace9 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -75,19 +75,6 @@ var apiObjectFuzzer = fuzz.New().NilChance(.5).NumElements(1, 1).Funcs( c.Fuzz(&nsec) j.CreationTimestamp = util.Unix(sec, nsec).Rfc3339Copy() }, - func(j *api.ObjectReference, c fuzz.Continue) { - // We have to customize the randomization of TypeMetas because their - // APIVersion and Kind must remain blank in memory. - j.APIVersion = c.RandString() - j.Kind = c.RandString() - j.Namespace = c.RandString() - j.Name = c.RandString() - // TODO: Fix JSON/YAML packages and/or write custom encoding - // for uint64's. Somehow the LS *byte* of this is lost, but - // only when all 8 bytes are set. - j.ResourceVersion = strconv.FormatUint(c.RandUint64()>>8, 10) - j.FieldPath = c.RandString() - }, func(intstr *util.IntOrString, c fuzz.Continue) { // util.IntOrString will panic if its kind is set wrong. if c.RandBool() { diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index d7284483a3c..a745c97bec4 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -59,36 +59,6 @@ func init() { return nil }, - // ObjectReference has changed type of ResourceVersion internally - func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error { - out.APIVersion = in.APIVersion - out.Kind = in.Kind - out.Namespace = in.Namespace - out.Name = in.Name - out.FieldPath = in.FieldPath - - if len(in.ResourceVersion) > 0 { - v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) - if err != nil { - return err - } - out.ResourceVersion = v - } - return nil - }, - func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error { - out.APIVersion = in.APIVersion - out.Kind = in.Kind - out.Namespace = in.Namespace - out.Name = in.Name - out.FieldPath = in.FieldPath - - if in.ResourceVersion != 0 { - out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) - } - return nil - }, - // EnvVar's Key is deprecated in favor of Name. func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error { out.Value = in.Value diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index b0180d3ddb5..5429024df1e 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -638,7 +638,7 @@ type ObjectReference struct { Name string `json:"name,omitempty" yaml:"name,omitempty"` UID string `json:"uid,omitempty" yaml:"uid,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` // Optional. If referring to a piece of an object instead of an entire object, this string // should contain a valid field access statement. For example, diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index abe3857ebca..59f398bedb6 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -59,36 +59,6 @@ func init() { return nil }, - // ObjectReference has changed type of ResourceVersion internally - func(in *newer.ObjectReference, out *ObjectReference, s conversion.Scope) error { - out.APIVersion = in.APIVersion - out.Kind = in.Kind - out.Namespace = in.Namespace - out.Name = in.Name - out.FieldPath = in.FieldPath - - if len(in.ResourceVersion) > 0 { - v, err := strconv.ParseUint(in.ResourceVersion, 10, 64) - if err != nil { - return err - } - out.ResourceVersion = v - } - return nil - }, - func(in *ObjectReference, out *newer.ObjectReference, s conversion.Scope) error { - out.APIVersion = in.APIVersion - out.Kind = in.Kind - out.Namespace = in.Namespace - out.Name = in.Name - out.FieldPath = in.FieldPath - - if in.ResourceVersion != 0 { - out.ResourceVersion = strconv.FormatUint(in.ResourceVersion, 10) - } - return nil - }, - // EnvVar's Key is deprecated in favor of Name. func(in *newer.EnvVar, out *EnvVar, s conversion.Scope) error { out.Value = in.Value diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 0d45d527943..ce90515b8b2 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -649,7 +649,7 @@ type ObjectReference struct { Name string `json:"name,omitempty" yaml:"name,omitempty"` UID string `json:"uid,omitempty" yaml:"uid,omitempty"` APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"` - ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` // Optional. If referring to a piece of an object instead of an entire object, this string // should contain a valid field access statement. For example,