From 7c654a3d1b6da799d44e516d3390de1af4e992cf Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Sat, 7 Mar 2015 11:00:45 +0100 Subject: [PATCH] Expand test coverage in master, kubectl/cmd/util, pkg/registry/resourcequota, and api/rest. --- pkg/api/rest/create_test.go | 74 +++++++ pkg/api/rest/types.go | 10 +- pkg/api/rest/update_test.go | 111 ++++++++++ pkg/apiserver/async.go | 47 ----- pkg/apiserver/operation.go | 154 -------------- pkg/auth/authorizer/abac/abac_test.go | 78 +++++++ pkg/kubectl/cmd/util/helpers.go | 43 ++-- pkg/kubectl/cmd/util/helpers_test.go | 76 ++++++- pkg/master/publish.go | 17 +- pkg/master/publish_test.go | 263 ++++++++++++++++++++++++ pkg/registry/registrytest/endpoint.go | 92 +++++++++ pkg/registry/registrytest/generic.go | 8 +- pkg/registry/resourcequota/rest_test.go | 161 +++++++++++++++ 13 files changed, 900 insertions(+), 234 deletions(-) create mode 100644 pkg/api/rest/update_test.go delete mode 100644 pkg/apiserver/async.go delete mode 100644 pkg/apiserver/operation.go create mode 100644 pkg/master/publish_test.go create mode 100644 pkg/registry/registrytest/endpoint.go diff --git a/pkg/api/rest/create_test.go b/pkg/api/rest/create_test.go index bb2be35245c..88414b0dcaf 100644 --- a/pkg/api/rest/create_test.go +++ b/pkg/api/rest/create_test.go @@ -17,10 +17,12 @@ limitations under the License. package rest import ( + "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) func TestCheckGeneratedNameError(t *testing.T) { @@ -39,3 +41,75 @@ func TestCheckGeneratedNameError(t *testing.T) { t.Errorf("expected try again later error: %v", err) } } + +func TestBeforeCreate(t *testing.T) { + failures := []runtime.Object{ + &api.Service{}, + &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "#$%%invalid", + }, + }, + &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "##&*(&invalid", + Namespace: api.NamespaceDefault, + }, + }, + } + for _, test := range failures { + ctx := api.NewDefaultContext() + err := BeforeCreate(Services, ctx, test) + if err == nil { + t.Errorf("unexpected non-error for %v", test) + } + } + + obj := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"name": "foo"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "foo", + Image: "foo", + ImagePullPolicy: api.PullAlways, + }, + }, + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSDefault, + }, + }, + }, + Status: api.ReplicationControllerStatus{ + Replicas: 3, + }, + } + ctx := api.NewDefaultContext() + err := BeforeCreate(ReplicationControllers, ctx, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(obj.Status, api.ReplicationControllerStatus{}) { + t.Errorf("status was not cleared as expected.") + } + if obj.Name != "foo" || obj.Namespace != api.NamespaceDefault { + t.Errorf("unexpected object metadata: %v", obj.ObjectMeta) + } + + obj.Spec.Replicas = -1 + if err := BeforeCreate(ReplicationControllers, ctx, obj); err == nil { + t.Errorf("unexpected non-error for invalid replication controller.") + } +} diff --git a/pkg/api/rest/types.go b/pkg/api/rest/types.go index f3b6741f82d..025282b9985 100644 --- a/pkg/api/rest/types.go +++ b/pkg/api/rest/types.go @@ -81,7 +81,7 @@ type svcStrategy struct { // Services is the default logic that applies when creating and updating Service // objects. -var Services RESTCreateStrategy = svcStrategy{api.Scheme, api.SimpleNameGenerator} +var Services = svcStrategy{api.Scheme, api.SimpleNameGenerator} // NamespaceScoped is true for services. func (svcStrategy) NamespaceScoped() bool { @@ -100,6 +100,14 @@ func (svcStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { return validation.ValidateService(service) } +func (svcStrategy) AllowCreateOnUpdate() bool { + return true +} + +func (svcStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList { + return validation.ValidateServiceUpdate(old.(*api.Service), obj.(*api.Service)) +} + // nodeStrategy implements behavior for nodes // TODO: move to a node specific package. type nodeStrategy struct { diff --git a/pkg/api/rest/update_test.go b/pkg/api/rest/update_test.go new file mode 100644 index 00000000000..5817a93a4e3 --- /dev/null +++ b/pkg/api/rest/update_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" +) + +func TestBeforeUpdate(t *testing.T) { + tests := []struct { + old runtime.Object + obj runtime.Object + expectErr bool + }{ + { + obj: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "#$%%invalid", + }, + }, + old: &api.Service{}, + expectErr: true, + }, + { + obj: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "valid", + }, + }, + old: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "valid", + }, + }, + expectErr: true, + }, + { + obj: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "valid", + }, + Spec: api.ServiceSpec{ + PortalIP: "1.2.3.4", + }, + }, + old: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "valid", + }, + Spec: api.ServiceSpec{ + PortalIP: "4.3.2.1", + }, + }, + expectErr: true, + }, + { + obj: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.ServiceSpec{ + PortalIP: "1.2.3.4", + Selector: map[string]string{"foo": "bar"}, + }, + }, + old: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.ServiceSpec{ + PortalIP: "1.2.3.4", + Selector: map[string]string{"bar": "foo"}, + }, + }, + }, + } + for _, test := range tests { + ctx := api.NewDefaultContext() + err := BeforeUpdate(Services, ctx, test.obj, test.old) + if test.expectErr && err == nil { + t.Errorf("unexpected non-error for %v", test) + } + if !test.expectErr && err != nil { + t.Errorf("unexpected error: %v for %v -> %v", err, test.obj, test.old) + } + } +} diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go deleted file mode 100644 index ca1d0aa32cf..00000000000 --- a/pkg/apiserver/async.go +++ /dev/null @@ -1,47 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -// WorkFunc is used to perform any time consuming work for an api call, after -// the input has been validated. Pass one of these to MakeAsync to create an -// appropriate return value for the Update, Delete, and Create methods. -type WorkFunc func() (result runtime.Object, err error) - -// MakeAsync takes a function and executes it, delivering the result in the way required -// by RESTStorage's Update, Delete, and Create methods. -func MakeAsync(fn WorkFunc) <-chan RESTResult { - channel := make(chan RESTResult) - go func() { - defer util.HandleCrash() - obj, err := fn() - if err != nil { - channel <- RESTResult{Object: errToAPIStatus(err)} - } else { - channel <- RESTResult{Object: obj} - } - // 'close' is used to signal that no further values will - // be written to the channel. Not strictly necessary, but - // also won't hurt. - close(channel) - }() - return channel -} diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go deleted file mode 100644 index f0c88de6cfb..00000000000 --- a/pkg/apiserver/operation.go +++ /dev/null @@ -1,154 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -// Operation represents an ongoing action which the server is performing. -type Operation struct { - ID string - result RESTResult - onReceive func(RESTResult) - awaiting <-chan RESTResult - finished *time.Time - lock sync.Mutex - notify chan struct{} -} - -// Operations tracks all the ongoing operations. -type Operations struct { - // Access only using functions from atomic. - lastID int64 - - // 'lock' guards the ops map. - lock sync.Mutex - ops map[string]*Operation -} - -// NewOperations returns a new Operations repository. -func NewOperations() *Operations { - ops := &Operations{ - ops: map[string]*Operation{}, - } - go util.Forever(func() { ops.expire(10 * time.Minute) }, 5*time.Minute) - return ops -} - -// NewOperation adds a new operation. It is lock-free. 'onReceive' will be called -// with the value read from 'from', when it is read. -func (ops *Operations) NewOperation(from <-chan RESTResult, onReceive func(RESTResult)) *Operation { - id := atomic.AddInt64(&ops.lastID, 1) - op := &Operation{ - ID: strconv.FormatInt(id, 10), - awaiting: from, - onReceive: onReceive, - notify: make(chan struct{}), - } - go op.wait() - go ops.insert(op) - return op -} - -// insert inserts op into the ops map. -func (ops *Operations) insert(op *Operation) { - ops.lock.Lock() - defer ops.lock.Unlock() - ops.ops[op.ID] = op -} - -// Get returns the operation with the given ID, or nil. -func (ops *Operations) Get(id string) *Operation { - ops.lock.Lock() - defer ops.lock.Unlock() - return ops.ops[id] -} - -// expire garbage collect operations that have finished longer than maxAge ago. -func (ops *Operations) expire(maxAge time.Duration) { - ops.lock.Lock() - defer ops.lock.Unlock() - keep := map[string]*Operation{} - limitTime := time.Now().Add(-maxAge) - for id, op := range ops.ops { - if !op.expired(limitTime) { - keep[id] = op - } - } - ops.ops = keep -} - -// wait waits forever for the operation to complete; call via go when -// the operation is created. Sets op.finished when the operation -// does complete, and closes the notify channel, in case there -// are any WaitFor() calls in progress. -// Does not keep op locked while waiting. -func (op *Operation) wait() { - defer util.HandleCrash() - result := <-op.awaiting - - op.lock.Lock() - defer op.lock.Unlock() - if op.onReceive != nil { - op.onReceive(result) - } - op.result = result - finished := time.Now() - op.finished = &finished - close(op.notify) -} - -// WaitFor waits for the specified duration, or until the operation finishes, -// whichever happens first. -func (op *Operation) WaitFor(timeout time.Duration) { - select { - case <-time.After(timeout): - case <-op.notify: - } -} - -// expired returns true if this operation finished before limitTime. -func (op *Operation) expired(limitTime time.Time) bool { - op.lock.Lock() - defer op.lock.Unlock() - if op.finished == nil { - return false - } - return op.finished.Before(limitTime) -} - -// StatusOrResult returns status information or the result of the operation if it is complete, -// with a bool indicating true in the latter case. -func (op *Operation) StatusOrResult() (description RESTResult, finished bool) { - op.lock.Lock() - defer op.lock.Unlock() - - if op.finished == nil { - return RESTResult{Object: &api.Status{ - Status: api.StatusFailure, - Reason: api.StatusReasonTimeout, - }}, false - } - return op.result, true -} diff --git a/pkg/auth/authorizer/abac/abac_test.go b/pkg/auth/authorizer/abac/abac_test.go index 40cfff0ebfb..88959831e43 100644 --- a/pkg/auth/authorizer/abac/abac_test.go +++ b/pkg/auth/authorizer/abac/abac_test.go @@ -266,3 +266,81 @@ func newWithContents(t *testing.T, contents string) (authorizer.Authorizer, erro pl, err := NewFromFile(f.Name()) return pl, err } + +func TestPolicy(t *testing.T) { + tests := []struct { + policy policy + attr authorizer.Attributes + matches bool + name string + }{ + { + policy: policy{}, + attr: authorizer.AttributesRecord{}, + matches: true, + name: "null", + }, + { + policy: policy{ + Readonly: true, + }, + attr: authorizer.AttributesRecord{}, + matches: false, + name: "read-only mismatch", + }, + { + policy: policy{ + User: "foo", + }, + attr: authorizer.AttributesRecord{ + User: &user.DefaultInfo{ + Name: "bar", + }, + }, + matches: false, + name: "user name mis-match", + }, + { + policy: policy{ + Resource: "foo", + }, + attr: authorizer.AttributesRecord{ + Resource: "bar", + }, + matches: false, + name: "resource mis-match", + }, + { + policy: policy{ + User: "foo", + Resource: "foo", + Namespace: "foo", + }, + attr: authorizer.AttributesRecord{ + User: &user.DefaultInfo{ + Name: "foo", + }, + Resource: "foo", + Namespace: "foo", + }, + matches: true, + name: "namespace mis-match", + }, + { + policy: policy{ + Namespace: "foo", + }, + attr: authorizer.AttributesRecord{ + Namespace: "bar", + }, + matches: false, + name: "resource mis-match", + }, + } + for _, test := range tests { + matches := test.policy.matches(test.attr) + if test.matches != matches { + t.Errorf("unexpected value for %s, expected: %s, saw: %s", test.name, test.matches, matches) + } + } +} diff --git a/pkg/kubectl/cmd/util/helpers.go b/pkg/kubectl/cmd/util/helpers.go index 6d99e6fb63d..f05c70e9872 100644 --- a/pkg/kubectl/cmd/util/helpers.go +++ b/pkg/kubectl/cmd/util/helpers.go @@ -19,6 +19,7 @@ package util import ( "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "os" @@ -59,11 +60,11 @@ func GetFlagBool(cmd *cobra.Command, flag string) bool { if f == nil { glog.Fatalf("Flag accessed but not defined for command %s: %s", cmd.Name(), flag) } - // Caseless compare. - if strings.ToLower(f.Value.String()) == "true" { - return true + result, err := strconv.ParseBool(f.Value.String()) + if err != nil { + glog.Fatalf("Invalid value for a boolean flag: %s", f.Value.String()) } - return false + return result } // Assumes the flag has a default value. @@ -89,6 +90,19 @@ func GetFlagDuration(cmd *cobra.Command, flag string) time.Duration { return v } +func ReadConfigDataFromReader(reader io.Reader, source string) ([]byte, error) { + data, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + if len(data) == 0 { + return nil, fmt.Errorf(`Read from %s but no data found`, source) + } + + return data, nil +} + // ReadConfigData reads the bytes from the specified filesytem or network // location or from stdin if location == "-". // TODO: replace with resource.Builder @@ -99,16 +113,7 @@ func ReadConfigData(location string) ([]byte, error) { if location == "-" { // Read from stdin. - data, err := ioutil.ReadAll(os.Stdin) - if err != nil { - return nil, err - } - - if len(data) == 0 { - return nil, fmt.Errorf(`Read from stdin specified ("-") but no data found`) - } - - return data, nil + return ReadConfigDataFromReader(os.Stdin, "stdin ('-')") } // Use the location as a file path or URL. @@ -127,17 +132,13 @@ func ReadConfigDataFromLocation(location string) ([]byte, error) { if resp.StatusCode != 200 { return nil, fmt.Errorf("unable to read URL, server reported %d %s", resp.StatusCode, resp.Status) } - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("unable to read URL %s: %v\n", location, err) - } - return data, nil + return ReadConfigDataFromReader(resp.Body, location) } else { - data, err := ioutil.ReadFile(location) + file, err := os.Open(location) if err != nil { return nil, fmt.Errorf("unable to read %s: %v\n", location, err) } - return data, nil + return ReadConfigDataFromReader(file, location) } } diff --git a/pkg/kubectl/cmd/util/helpers_test.go b/pkg/kubectl/cmd/util/helpers_test.go index 7b7111a03b3..78ef02ab63e 100644 --- a/pkg/kubectl/cmd/util/helpers_test.go +++ b/pkg/kubectl/cmd/util/helpers_test.go @@ -17,7 +17,11 @@ limitations under the License. package util import ( + "io/ioutil" + "net/http" + "net/http/httptest" "reflect" + "syscall" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -187,5 +191,75 @@ func TestMerge(t *testing.T) { t.Errorf("testcase[%d], unexpected non-error", i) } } - +} + +type fileHandler struct { + data []byte +} + +func (f *fileHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/error" { + res.WriteHeader(http.StatusNotFound) + return + } + res.WriteHeader(http.StatusOK) + res.Write(f.data) +} + +func TestReadConfigData(t *testing.T) { + httpData := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + server := httptest.NewServer(&fileHandler{data: httpData}) + + fileData := []byte{11, 12, 13, 14, 15, 16, 17, 18, 19} + f, err := ioutil.TempFile("", "config") + if err != nil { + t.Errorf("unexpected error setting up config file") + t.Fail() + } + defer syscall.Unlink(f.Name()) + ioutil.WriteFile(f.Name(), fileData, 0644) + // TODO: test TLS here, requires making it possible to inject the HTTP client. + + tests := []struct { + config string + data []byte + expectErr bool + }{ + { + config: server.URL, + data: httpData, + }, + { + config: server.URL + "/error", + expectErr: true, + }, + { + config: "http://some.non.existent.url", + expectErr: true, + }, + { + config: f.Name(), + data: fileData, + }, + { + config: "some-non-existent-file", + expectErr: true, + }, + { + config: "", + expectErr: true, + }, + } + for _, test := range tests { + dataOut, err := ReadConfigData(test.config) + if err != nil && !test.expectErr { + t.Errorf("unexpected err: %v for %s", err, test.config) + } + if err == nil && test.expectErr { + t.Errorf("unexpected non-error for %s", test.config) + } + if !test.expectErr && !reflect.DeepEqual(test.data, dataOut) { + t.Errorf("unexpected data: %v, expected %v", dataOut, test.data) + } + } } diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 7cc73559965..3cfb43f73f4 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -147,14 +147,13 @@ func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) } if !found { e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port}) + if len(e.Endpoints) > m.masterCount { + // We append to the end and remove from the beginning, so this should + // converge rapidly with all masters performing this operation. + e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:] + } + return m.endpointRegistry.UpdateEndpoints(ctx, e) } - if len(e.Endpoints) > m.masterCount { - // We append to the end and remove from the beginning, so this should - // converge rapidly with all masters performing this operation. - e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:] - } else if found { - // We didn't make any changes, no need to actually call update. - return nil - } - return m.endpointRegistry.UpdateEndpoints(ctx, e) + // We didn't make any changes, no need to actually call update. + return nil } diff --git a/pkg/master/publish_test.go b/pkg/master/publish_test.go new file mode 100644 index 00000000000..b9fd3b040ab --- /dev/null +++ b/pkg/master/publish_test.go @@ -0,0 +1,263 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "net" + "reflect" + "sync" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" +) + +func TestEnsureEndpointsContain(t *testing.T) { + tests := []struct { + serviceName string + ip string + port int + expectError bool + expectUpdate bool + endpoints *api.EndpointsList + expectedEndpoints []api.Endpoint + err error + masterCount int + }{ + { + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + expectError: false, + expectUpdate: true, + masterCount: 1, + }, + { + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + expectError: false, + expectUpdate: false, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + }, + Endpoints: []api.Endpoint{ + { + IP: "1.2.3.4", + Port: 8080, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + }, + masterCount: 1, + expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8080}}, + }, + { + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + expectError: false, + expectUpdate: true, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Endpoints: []api.Endpoint{ + { + IP: "4.3.2.1", + Port: 8080, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + }, + masterCount: 1, + }, + { + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + expectError: false, + expectUpdate: true, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Endpoints: []api.Endpoint{ + { + IP: "4.3.2.1", + Port: 9090, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + }, + masterCount: 2, + expectedEndpoints: []api.Endpoint{{"4.3.2.1", 9090}, {"1.2.3.4", 8080}}, + }, + { + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + expectError: false, + expectUpdate: true, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Endpoints: []api.Endpoint{ + { + IP: "4.3.2.1", + Port: 9090, + }, + { + IP: "1.2.3.4", + Port: 8000, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + }, + masterCount: 2, + expectedEndpoints: []api.Endpoint{{"1.2.3.4", 8000}, {"1.2.3.4", 8080}}, + }, + } + for _, test := range tests { + master := Master{} + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + Err: test.err, + } + master.endpointRegistry = registry + master.masterCount = test.masterCount + err := master.ensureEndpointsContain(test.serviceName, net.ParseIP(test.ip), test.port) + if test.expectError && err == nil { + t.Errorf("unexpected non-error") + } + if !test.expectError && err != nil { + t.Errorf("unexpected error: %v", err) + } + if test.expectUpdate { + if test.expectedEndpoints == nil { + test.expectedEndpoints = []api.Endpoint{{test.ip, test.port}} + } + expectedUpdate := api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: test.serviceName, + Namespace: "default", + }, + Endpoints: test.expectedEndpoints, + Protocol: "TCP", + } + if len(registry.Updates) != 1 { + t.Errorf("unexpected updates: %v", registry.Updates) + } else if !reflect.DeepEqual(expectedUpdate, registry.Updates[0]) { + t.Errorf("expected update:\n%#v\ngot:\n%#v\n", expectedUpdate, registry.Updates[0]) + } + } + if !test.expectUpdate && len(registry.Updates) > 0 { + t.Errorf("no update expected, yet saw: %v", registry.Updates) + } + } +} + +func TestEnsureEndpointsContainConverges(t *testing.T) { + master := Master{} + registry := ®istrytest.EndpointRegistry{ + Endpoints: &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Endpoints: []api.Endpoint{ + { + IP: "4.3.2.1", + Port: 9000, + }, + { + IP: "1.2.3.4", + Port: 8000, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + }, + } + master.endpointRegistry = registry + master.masterCount = 2 + // This is purposefully racy, it shouldn't matter the order that these things arrive, + // we should still converge on the right answer. + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + for i := 0; i < 10; i++ { + if err := master.ensureEndpointsContain("foo", net.ParseIP("4.3.2.1"), 9090); err != nil { + t.Errorf("unexpected error: %v", err) + t.Fail() + } + } + wg.Done() + }() + go func() { + for i := 0; i < 10; i++ { + if err := master.ensureEndpointsContain("foo", net.ParseIP("1.2.3.4"), 8080); err != nil { + t.Errorf("unexpected error: %v", err) + t.Fail() + } + } + wg.Done() + }() + wg.Wait() + + // We should see at least two updates. + if len(registry.Updates) > 2 { + t.Errorf("unexpected updates: %v", registry.Updates) + } + // Pick up the last update and validate. + endpoints := registry.Updates[len(registry.Updates)-1] + if len(endpoints.Endpoints) != 2 { + t.Errorf("unexpected update: %v", endpoints) + } + for _, endpoint := range endpoints.Endpoints { + if endpoint.IP == "4.3.2.1" && endpoint.Port != 9090 { + t.Errorf("unexpected endpoint state: %v", endpoint) + } + if endpoint.IP == "1.2.3.4" && endpoint.Port != 8080 { + t.Errorf("unexpected endpoint state: %v", endpoint) + } + } +} diff --git a/pkg/registry/registrytest/endpoint.go b/pkg/registry/registrytest/endpoint.go new file mode 100644 index 00000000000..d4d814a30d0 --- /dev/null +++ b/pkg/registry/registrytest/endpoint.go @@ -0,0 +1,92 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "fmt" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Registry is an interface for things that know how to store endpoints. +type EndpointRegistry struct { + Endpoints *api.EndpointsList + Updates []api.Endpoints + Err error + + lock sync.Mutex +} + +func (e *EndpointRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { + // TODO: support namespaces in this mock + e.lock.Lock() + defer e.lock.Unlock() + + return e.Endpoints, e.Err +} + +func (e *EndpointRegistry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { + // TODO: support namespaces in this mock + e.lock.Lock() + defer e.lock.Unlock() + if e.Err != nil { + return nil, e.Err + } + if e.Endpoints != nil { + for _, endpoint := range e.Endpoints.Items { + if endpoint.Name == name { + return &endpoint, nil + } + } + } + return nil, errors.NewNotFound("Endpoints", name) +} + +func (e *EndpointRegistry) WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion string) (watch.Interface, error) { + return nil, fmt.Errorf("unimplemented!") +} + +func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) error { + // TODO: support namespaces in this mock + e.lock.Lock() + defer e.lock.Unlock() + + e.Updates = append(e.Updates, *endpoints) + + if e.Err != nil { + return e.Err + } + if e.Endpoints == nil { + e.Endpoints = &api.EndpointsList{ + Items: []api.Endpoints{ + *endpoints, + }, + } + return nil + } + for ix := range e.Endpoints.Items { + if e.Endpoints.Items[ix].Name == endpoints.Name { + e.Endpoints.Items[ix] = *endpoints + } + } + e.Endpoints.Items = append(e.Endpoints.Items, *endpoints) + return nil +} diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 3cecc0a10a9..924724b7f76 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -59,7 +59,13 @@ func (r *GenericRegistry) WatchPredicate(ctx api.Context, m generic.Matcher, res func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) { r.Lock() defer r.Unlock() - return r.Object, r.Err + if r.Err != nil { + return nil, r.Err + } + if r.Object != nil { + return r.Object, nil + } + panic("generic registry should either have an object or an error for Get") } func (r *GenericRegistry) CreateWithName(ctx api.Context, id string, obj runtime.Object) error { diff --git a/pkg/registry/resourcequota/rest_test.go b/pkg/registry/resourcequota/rest_test.go index 45930005154..fb18fd6f346 100644 --- a/pkg/registry/resourcequota/rest_test.go +++ b/pkg/registry/resourcequota/rest_test.go @@ -15,3 +15,164 @@ limitations under the License. */ package resourcequota + +import ( + "fmt" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" +) + +func makeRegistry(resourceList runtime.Object) (*registrytest.GenericRegistry, *REST) { + registry := registrytest.NewGeneric(resourceList) + rest := NewREST(registry) + return registry, rest +} + +func TestGet(t *testing.T) { + registry, rest := makeRegistry(&api.ResourceQuotaList{}) + registry.Object = &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + } + ctx := api.NewDefaultContext() + obj, err := rest.Get(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if obj == nil { + t.Errorf("unexpected nil object") + } + registry.Object = nil + registry.Err = errors.NewNotFound("ResourceQuota", "bar") + + obj, err = rest.Get(ctx, "bar") + if err == nil { + t.Errorf("unexpected non-error") + } + if obj != nil { + t.Errorf("unexpected object: %v", obj) + } + +} + +func TestList(t *testing.T) { + _, rest := makeRegistry(&api.ResourceQuotaList{ + Items: []api.ResourceQuota{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + }, + }, + }) + + ctx := api.NewDefaultContext() + obj, err := rest.List(ctx, labels.Set{}.AsSelector(), labels.Set{}.AsSelector()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if obj == nil { + t.Errorf("unexpected nil object") + } + list, ok := obj.(*api.ResourceQuotaList) + if !ok || len(list.Items) != 1 { + t.Errorf("unexpected list object: %v", obj) + } + + obj, err = rest.List(ctx, labels.Set{"foo": "bar"}.AsSelector(), labels.Set{}.AsSelector()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if obj == nil { + t.Errorf("unexpected nil object") + } + list, ok = obj.(*api.ResourceQuotaList) + if !ok || len(list.Items) != 0 { + t.Errorf("unexpected list object: %v", obj) + } +} + +func TestUpdate(t *testing.T) { + registry, rest := makeRegistry(&api.ResourceQuotaList{}) + resourceStatus := api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(10.0, resource.BinarySI), + }, + } + registry.Object = &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + Labels: map[string]string{ + "bar": "foo", + }, + }, + Status: resourceStatus, + } + invalidUpdates := []struct { + obj runtime.Object + err error + }{ + {&api.Pod{}, nil}, + {&api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: "$%#%"}}, nil}, + {&api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: api.NamespaceDefault, + }, + }, fmt.Errorf("test error")}, + } + for _, test := range invalidUpdates { + registry.Err = test.err + ctx := api.NewDefaultContext() + _, _, err := rest.Update(ctx, test.obj) + if err == nil { + t.Errorf("unexpected non-error for: %v", test.obj) + } + registry.Err = nil + } + + ctx := api.NewDefaultContext() + update := &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(10.0, resource.BinarySI), + }, + }, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(20.0, resource.BinarySI), + }, + }, + } + obj, _, err := rest.Update(ctx, update) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if !reflect.DeepEqual(obj.(*api.ResourceQuota).Labels, update.Labels) { + t.Errorf("unexpected update object, labels don't match: %v vs %v", obj.(*api.ResourceQuota).Labels, update.Labels) + } + if !reflect.DeepEqual(obj.(*api.ResourceQuota).Spec, update.Spec) { + t.Errorf("unexpected update object, specs don't match: %v vs %v", obj.(*api.ResourceQuota).Spec, update.Spec) + } + if !reflect.DeepEqual(obj.(*api.ResourceQuota).Status, registry.Object.(*api.ResourceQuota).Status) { + t.Errorf("unexpected update object, status wasn't preserved: %v vs %v", obj.(*api.ResourceQuota).Status, registry.Object.(*api.ResourceQuota).Status) + } +}