Add simple service redirection

This commit is contained in:
Daniel Smith 2014-08-25 14:36:15 -07:00
parent dcf9f30592
commit 4de254444e
11 changed files with 229 additions and 25 deletions

View File

@ -32,20 +32,20 @@ import (
)
// Codec defines methods for serializing and deserializing API
// objects
// objects.
type Codec interface {
Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error
}
// mux is an object that can register http handlers
// mux is an object that can register http handlers.
type mux interface {
Handle(pattern string, handler http.Handler)
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
// defaultAPIServer exposes nested objects for testability
// defaultAPIServer exposes nested objects for testability.
type defaultAPIServer struct {
http.Handler
group *APIGroup
@ -95,12 +95,14 @@ func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup {
func (g *APIGroup) InstallREST(mux mux, paths ...string) {
restHandler := &g.handler
watchHandler := &WatchHandler{g.handler.storage, g.handler.codec}
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
for _, prefix := range paths {
prefix = strings.TrimRight(prefix, "/")
mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler))
mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler))
mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler))
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
}
@ -129,6 +131,7 @@ func RecoverPanics(handler http.Handler) http.Handler {
httplog.StatusIsNot(
http.StatusOK,
http.StatusAccepted,
http.StatusTemporaryRedirect,
http.StatusConflict,
http.StatusNotFound,
),

View File

@ -71,6 +71,9 @@ type SimpleRESTStorage struct {
requestedFieldSelector labels.Selector
requestedResourceVersion uint64
// The location
requestedResourceLocationID string
// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj receives the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error)
@ -142,6 +145,15 @@ func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVe
return storage.fakeWatch, nil
}
// Implement Redirector.
func (storage *SimpleRESTStorage) ResourceLocation(id string) (string, error) {
storage.requestedResourceLocationID = id
if err := storage.errors["resourceLocation"]; err != nil {
return "", err
}
return id, nil
}
func extractBody(response *http.Response, object interface{}) (string, error) {
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)

View File

@ -55,3 +55,9 @@ type ResourceWatcher interface {
// particular version.
Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// Redirectors know how to return a remote resource's location.
type Redirector interface {
// ResourceLocation should return the remote location of the given resource, or an error.
ResourceLocation(id string) (remoteLocation string, err error)
}

61
pkg/apiserver/redirect.go Normal file
View File

@ -0,0 +1,61 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
)
type RedirectHandler struct {
storage map[string]RESTStorage
codec Codec
}
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
if len(parts) != 2 || req.Method != "GET" {
notFound(w, req)
return
}
resourceName := parts[0]
id := parts[1]
storage, ok := r.storage[resourceName]
if !ok {
httplog.LogOf(w).Addf("'%v' has no storage object", resourceName)
notFound(w, req)
return
}
redirector, ok := storage.(Redirector)
if !ok {
httplog.LogOf(w).Addf("'%v' is not a redirector", resourceName)
notFound(w, req)
return
}
location, err := redirector.ResourceLocation(id)
if err != nil {
status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w)
return
}
w.Header().Set("Location", location)
w.WriteHeader(http.StatusTemporaryRedirect)
}

View File

@ -0,0 +1,75 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"errors"
"net/http"
"net/http/httptest"
"net/url"
"testing"
)
func TestRedirect(t *testing.T) {
simpleStorage := &SimpleRESTStorage{
errors: map[string]error{},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix/version")
server := httptest.NewServer(handler)
dontFollow := errors.New("don't follow")
client := http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return dontFollow
},
}
table := []struct {
id string
err error
code int
}{
{"cozy", nil, http.StatusTemporaryRedirect},
{"horse", errors.New("no such id"), http.StatusInternalServerError},
}
for _, item := range table {
simpleStorage.errors["resourceLocation"] = item.err
resp, err := client.Get(server.URL + "/prefix/version/redirect/foo/" + item.id)
if resp == nil {
t.Fatalf("Unexpected nil resp")
}
resp.Body.Close()
if e, a := item.code, resp.StatusCode; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := item.id, simpleStorage.requestedResourceLocationID; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if item.err != nil {
continue
}
if err == nil || err.(*url.Error).Err != dontFollow {
t.Errorf("Unexpected err %#v", err)
}
if e, a := item.id, resp.Header.Get("Location"); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}

View File

@ -313,6 +313,20 @@ func (r *Registry) GetService(name string) (*api.Service, error) {
return &svc, nil
}
// GetEndpoints obtains the endpoints for the service identified by 'name'.
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) {
key := makeServiceEndpointsKey(name)
var endpoints api.Endpoints
err := r.ExtractObj(key, &endpoints, false)
if tools.IsEtcdNotFound(err) {
return nil, apiserver.NewNotFoundErr("endpoints", name)
}
if err != nil {
return nil, err
}
return &endpoints, nil
}
func makeServiceEndpointsKey(name string) string {
return "/registry/services/endpoints/" + name
}
@ -354,23 +368,9 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
// GetEndpoints obtains endpoints specified by a service name
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) {
obj := &api.Endpoints{}
if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil {
if tools.IsEtcdNotFound(err) {
if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) {
return nil, apiserver.NewNotFoundErr("service", name)
}
return obj, nil
}
return nil, err
}
return obj, nil
}
// UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(input interface{}) (interface{}, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters

View File

@ -805,18 +805,21 @@ func TestEtcdUpdateService(t *testing.T) {
func TestEtcdGetEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := &api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"127.0.0.1:34855"},
}), 0)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints, err := registry.GetEndpoints("foo")
}
fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(endpoints), 0)
got, err := registry.GetEndpoints("foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) {
t.Errorf("Unexpected endpoints: %#v", endpoints)
if e, a := endpoints, got; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a)
}
}

View File

@ -66,7 +66,8 @@ func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVe
return nil, r.Err
}
func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) {
func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) {
r.GottenID = id
return &r.Endpoints, r.Err
}

View File

@ -18,6 +18,7 @@ package service
import (
"fmt"
"math/rand"
"strconv"
"strings"
@ -167,6 +168,18 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
}), nil
}
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *RegistryStorage) ResourceLocation(id string) (string, error) {
e, err := rs.registry.GetEndpoints(id)
if err != nil {
return "", err
}
if len(e.Endpoints) == 0 {
return "", fmt.Errorf("no endpoints available for %v", id)
}
return e.Endpoints[rand.Intn(len(e.Endpoints))], nil
}
func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error {
if !service.CreateExternalLoadBalancer || rs.cloud == nil {
return nil

View File

@ -275,6 +275,35 @@ func TestServiceRegistryGet(t *testing.T) {
}
}
func TestServiceRegistryResourceLocation(t *testing.T) {
registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
registry.CreateService(api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
})
redirector := storage.(apiserver.Redirector)
location, err := redirector.ResourceLocation("foo")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if e, a := "foo:80", location; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if e, a := "foo", registry.GottenID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
// Test error path
registry.Err = fmt.Errorf("fake error")
if _, err = redirector.ResourceLocation("foo"); err == nil {
t.Errorf("unexpected nil error")
}
}
func TestServiceRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}

View File

@ -72,6 +72,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
}
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
}
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{
JSONBase: api.JSONBase{ID: service.ID},
Endpoints: endpoints,