From 4de254444e710250864abe35983815217840fe36 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 25 Aug 2014 14:36:15 -0700 Subject: [PATCH] Add simple service redirection --- pkg/apiserver/apiserver.go | 9 ++-- pkg/apiserver/apiserver_test.go | 12 +++++ pkg/apiserver/interfaces.go | 6 +++ pkg/apiserver/redirect.go | 61 ++++++++++++++++++++++ pkg/apiserver/redirect_test.go | 75 ++++++++++++++++++++++++++++ pkg/registry/etcd/etcd.go | 30 +++++------ pkg/registry/etcd/etcd_test.go | 15 +++--- pkg/registry/registrytest/service.go | 3 +- pkg/registry/service/storage.go | 13 +++++ pkg/registry/service/storage_test.go | 29 +++++++++++ pkg/service/endpoints_controller.go | 1 + 11 files changed, 229 insertions(+), 25 deletions(-) create mode 100644 pkg/apiserver/redirect.go create mode 100644 pkg/apiserver/redirect_test.go diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 89a859f5501..e6b3c213ff1 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -32,20 +32,20 @@ import ( ) // Codec defines methods for serializing and deserializing API -// objects +// objects. type Codec interface { Encode(obj interface{}) (data []byte, err error) Decode(data []byte) (interface{}, error) DecodeInto(data []byte, obj interface{}) error } -// mux is an object that can register http handlers +// mux is an object that can register http handlers. type mux interface { Handle(pattern string, handler http.Handler) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) } -// defaultAPIServer exposes nested objects for testability +// defaultAPIServer exposes nested objects for testability. type defaultAPIServer struct { http.Handler group *APIGroup @@ -95,12 +95,14 @@ func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup { func (g *APIGroup) InstallREST(mux mux, paths ...string) { restHandler := &g.handler watchHandler := &WatchHandler{g.handler.storage, g.handler.codec} + redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec} opHandler := &OperationHandler{g.handler.ops, g.handler.codec} for _, prefix := range paths { prefix = strings.TrimRight(prefix, "/") mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler)) mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler)) + mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler)) mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler)) mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler)) } @@ -129,6 +131,7 @@ func RecoverPanics(handler http.Handler) http.Handler { httplog.StatusIsNot( http.StatusOK, http.StatusAccepted, + http.StatusTemporaryRedirect, http.StatusConflict, http.StatusNotFound, ), diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 6f1ca93ffe0..f51b7dafe41 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -71,6 +71,9 @@ type SimpleRESTStorage struct { requestedFieldSelector labels.Selector requestedResourceVersion uint64 + // The location + requestedResourceLocationID string + // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj receives the original input to the update, delete, or create call. injectedFunction func(obj interface{}) (returnObj interface{}, err error) @@ -142,6 +145,15 @@ func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVe return storage.fakeWatch, nil } +// Implement Redirector. +func (storage *SimpleRESTStorage) ResourceLocation(id string) (string, error) { + storage.requestedResourceLocationID = id + if err := storage.errors["resourceLocation"]; err != nil { + return "", err + } + return id, nil +} + func extractBody(response *http.Response, object interface{}) (string, error) { defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 2b1aa23cab6..8929c0991cf 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -55,3 +55,9 @@ type ResourceWatcher interface { // particular version. Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } + +// Redirectors know how to return a remote resource's location. +type Redirector interface { + // ResourceLocation should return the remote location of the given resource, or an error. + ResourceLocation(id string) (remoteLocation string, err error) +} diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go new file mode 100644 index 00000000000..be0a9136327 --- /dev/null +++ b/pkg/apiserver/redirect.go @@ -0,0 +1,61 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/http" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" +) + +type RedirectHandler struct { + storage map[string]RESTStorage + codec Codec +} + +func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + parts := splitPath(req.URL.Path) + if len(parts) != 2 || req.Method != "GET" { + notFound(w, req) + return + } + resourceName := parts[0] + id := parts[1] + storage, ok := r.storage[resourceName] + if !ok { + httplog.LogOf(w).Addf("'%v' has no storage object", resourceName) + notFound(w, req) + return + } + + redirector, ok := storage.(Redirector) + if !ok { + httplog.LogOf(w).Addf("'%v' is not a redirector", resourceName) + notFound(w, req) + return + } + + location, err := redirector.ResourceLocation(id) + if err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + return + } + + w.Header().Set("Location", location) + w.WriteHeader(http.StatusTemporaryRedirect) +} diff --git a/pkg/apiserver/redirect_test.go b/pkg/apiserver/redirect_test.go new file mode 100644 index 00000000000..aa780e6e60c --- /dev/null +++ b/pkg/apiserver/redirect_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "errors" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestRedirect(t *testing.T) { + simpleStorage := &SimpleRESTStorage{ + errors: map[string]error{}, + } + handler := Handle(map[string]RESTStorage{ + "foo": simpleStorage, + }, codec, "/prefix/version") + server := httptest.NewServer(handler) + + dontFollow := errors.New("don't follow") + client := http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return dontFollow + }, + } + + table := []struct { + id string + err error + code int + }{ + {"cozy", nil, http.StatusTemporaryRedirect}, + {"horse", errors.New("no such id"), http.StatusInternalServerError}, + } + + for _, item := range table { + simpleStorage.errors["resourceLocation"] = item.err + resp, err := client.Get(server.URL + "/prefix/version/redirect/foo/" + item.id) + if resp == nil { + t.Fatalf("Unexpected nil resp") + } + resp.Body.Close() + if e, a := item.code, resp.StatusCode; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := item.id, simpleStorage.requestedResourceLocationID; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if item.err != nil { + continue + } + if err == nil || err.(*url.Error).Err != dontFollow { + t.Errorf("Unexpected err %#v", err) + } + if e, a := item.id, resp.Header.Get("Location"); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 0c08c3b586b..88b49892184 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -313,6 +313,20 @@ func (r *Registry) GetService(name string) (*api.Service, error) { return &svc, nil } +// GetEndpoints obtains the endpoints for the service identified by 'name'. +func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { + key := makeServiceEndpointsKey(name) + var endpoints api.Endpoints + err := r.ExtractObj(key, &endpoints, false) + if tools.IsEtcdNotFound(err) { + return nil, apiserver.NewNotFoundErr("endpoints", name) + } + if err != nil { + return nil, err + } + return &endpoints, nil +} + func makeServiceEndpointsKey(name string) string { return "/registry/services/endpoints/" + name } @@ -354,23 +368,9 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } -// GetEndpoints obtains endpoints specified by a service name -func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { - obj := &api.Endpoints{} - if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil { - if tools.IsEtcdNotFound(err) { - if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) { - return nil, apiserver.NewNotFoundErr("service", name) - } - return obj, nil - } - return nil, err - } - return obj, nil -} - // UpdateEndpoints update Endpoints of a Service. func (r *Registry) UpdateEndpoints(e api.Endpoints) error { + // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, func(input interface{}) (interface{}, error) { // TODO: racy - label query is returning different results for two simultaneous updaters diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index e7d574962a1..eeda0a5b7dc 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -805,18 +805,21 @@ func TestEtcdUpdateService(t *testing.T) { func TestEtcdGetEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{ + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + endpoints := &api.Endpoints{ JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:34855"}, - }), 0) - registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - endpoints, err := registry.GetEndpoints("foo") + } + + fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(endpoints), 0) + + got, err := registry.GetEndpoints("foo") if err != nil { t.Errorf("unexpected error: %v", err) } - if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) { - t.Errorf("Unexpected endpoints: %#v", endpoints) + if e, a := endpoints, got; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a) } } diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 33535077285..15324c50dc8 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -66,7 +66,8 @@ func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVe return nil, r.Err } -func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) { +func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) { + r.GottenID = id return &r.Endpoints, r.Err } diff --git a/pkg/registry/service/storage.go b/pkg/registry/service/storage.go index 494e0abc2eb..8ad2623137c 100644 --- a/pkg/registry/service/storage.go +++ b/pkg/registry/service/storage.go @@ -18,6 +18,7 @@ package service import ( "fmt" + "math/rand" "strconv" "strings" @@ -167,6 +168,18 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { }), nil } +// ResourceLocation returns a URL to which one can send traffic for the specified service. +func (rs *RegistryStorage) ResourceLocation(id string) (string, error) { + e, err := rs.registry.GetEndpoints(id) + if err != nil { + return "", err + } + if len(e.Endpoints) == 0 { + return "", fmt.Errorf("no endpoints available for %v", id) + } + return e.Endpoints[rand.Intn(len(e.Endpoints))], nil +} + func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { if !service.CreateExternalLoadBalancer || rs.cloud == nil { return nil diff --git a/pkg/registry/service/storage_test.go b/pkg/registry/service/storage_test.go index 37243465d3e..f561eed9531 100644 --- a/pkg/registry/service/storage_test.go +++ b/pkg/registry/service/storage_test.go @@ -275,6 +275,35 @@ func TestServiceRegistryGet(t *testing.T) { } } +func TestServiceRegistryResourceLocation(t *testing.T) { + registry := registrytest.NewServiceRegistry() + registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) + registry.CreateService(api.Service{ + JSONBase: api.JSONBase{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + }) + redirector := storage.(apiserver.Redirector) + location, err := redirector.ResourceLocation("foo") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := "foo:80", location; e != a { + t.Errorf("Expected %v, but got %v", e, a) + } + if e, a := "foo", registry.GottenID; e != a { + t.Errorf("Expected %v, but got %v", e, a) + } + + // Test error path + registry.Err = fmt.Errorf("fake error") + if _, err = redirector.ResourceLocation("foo"); err == nil { + t.Errorf("unexpected nil error") + } +} + func TestServiceRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 5c0924555b0..eee24540421 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -72,6 +72,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { } endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } + // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{ JSONBase: api.JSONBase{ID: service.ID}, Endpoints: endpoints,