Add self links to objects sent down the watch channel.

This commit is contained in:
Daniel Smith 2014-11-05 17:22:18 -08:00
parent f4cffdc7cf
commit 4196780eda
5 changed files with 85 additions and 22 deletions

View File

@ -46,7 +46,7 @@ func GetReference(obj runtime.Object) (*ObjectReference, error) {
} }
version := versionFromSelfLink.FindStringSubmatch(meta.SelfLink()) version := versionFromSelfLink.FindStringSubmatch(meta.SelfLink())
if len(version) < 2 { if len(version) < 2 {
return nil, fmt.Errorf("unexpected self link format: %v", meta.SelfLink()) return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", meta.SelfLink(), version)
} }
return &ObjectReference{ return &ObjectReference{
Kind: kind, Kind: kind,

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -89,7 +90,12 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP
// in a slash. // in a slash.
func (g *APIGroup) InstallREST(mux Mux, paths ...string) { func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
restHandler := &g.handler restHandler := &g.handler
watchHandler := &WatchHandler{g.handler.storage, g.handler.codec} watchHandler := &WatchHandler{
storage: g.handler.storage,
codec: g.handler.codec,
canonicalPrefix: g.handler.canonicalPrefix,
selfLinker: g.handler.selfLinker,
}
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec} redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
opHandler := &OperationHandler{g.handler.ops, g.handler.codec} opHandler := &OperationHandler{g.handler.ops, g.handler.codec}

View File

@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -44,17 +45,17 @@ func convert(obj runtime.Object) (runtime.Object, error) {
return obj, nil return obj, nil
} }
var codec = latest.Codec var codec = testapi.Codec()
var selfLinker = latest.SelfLinker var selfLinker = latest.SelfLinker
func init() { func init() {
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}) api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{})
api.Scheme.AddKnownTypes(latest.Version, &Simple{}, &SimpleList{}) api.Scheme.AddKnownTypes(testapi.Version(), &Simple{}, &SimpleList{})
} }
type Simple struct { type Simple struct {
api.TypeMeta `yaml:",inline" json:",inline"` api.TypeMeta `yaml:",inline" json:",inline"`
api.ObjectMeta `yaml:"metadata,inline" json:"metadata,inline"` api.ObjectMeta `yaml:"metadata" json:"metadata"`
Other string `yaml:"other,omitempty" json:"other,omitempty"` Other string `yaml:"other,omitempty" json:"other,omitempty"`
} }
@ -68,6 +69,21 @@ type SimpleList struct {
func (*SimpleList) IsAnAPIObject() {} func (*SimpleList) IsAnAPIObject() {}
func TestSimpleSetupRight(t *testing.T) {
s := &Simple{ObjectMeta: api.ObjectMeta{Name: "aName"}}
wire, err := codec.Encode(s)
if err != nil {
t.Fatal(err)
}
s2, err := codec.Decode(wire)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, s2) {
t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2)
}
}
type SimpleRESTStorage struct { type SimpleRESTStorage struct {
errors map[string]error errors map[string]error
list []Simple list []Simple

View File

@ -19,21 +19,39 @@ package apiserver
import ( import (
"net/http" "net/http"
"net/url" "net/url"
"path"
"regexp" "regexp"
"strings" "strings"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"code.google.com/p/go.net/websocket"
"github.com/golang/glog"
) )
type WatchHandler struct { type WatchHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
codec runtime.Codec codec runtime.Codec
canonicalPrefix string
selfLinker runtime.SelfLinker
}
// setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type.
func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error {
name, err := h.selfLinker.Name(obj)
if err != nil {
return err
}
newURL := *req.URL
newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name)
newURL.RawQuery = ""
newURL.Fragment = ""
return h.selfLinker.SetSelfLink(obj, newURL.String())
} }
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) { func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) {
@ -84,7 +102,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// TODO: This is one watch per connection. We want to multiplex, so that // TODO: This is one watch per connection. We want to multiplex, so that
// multiple watches of the same thing don't create two watches downstream. // multiple watches of the same thing don't create two watches downstream.
watchServer := &WatchServer{watching, h.codec} watchServer := &WatchServer{watching, h.codec, func(obj runtime.Object) {
if err := h.setSelfLinkAddName(obj, req); err != nil {
glog.Errorf("Failed to set self link for object %#v", obj)
}
}}
if isWebsocketRequest(req) { if isWebsocketRequest(req) {
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req)
} else { } else {
@ -100,6 +122,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
type WatchServer struct { type WatchServer struct {
watching watch.Interface watching watch.Interface
codec runtime.Codec codec runtime.Codec
fixup func(runtime.Object)
} }
// HandleWS implements a websocket handler. // HandleWS implements a websocket handler.
@ -122,6 +145,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) {
// End of results. // End of results.
return return
} }
w.fixup(event.Object)
obj, err := watchjson.Object(w.codec, &event) obj, err := watchjson.Object(w.codec, &event)
if err != nil { if err != nil {
// Client disconnect. // Client disconnect.
@ -171,6 +195,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// End of results. // End of results.
return return
} }
self.fixup(event.Object)
if err := encoder.Encode(&event); err != nil { if err := encoder.Encode(&event); err != nil {
// Client disconnect. // Client disconnect.
self.watching.Stop() self.watching.Stop()

View File

@ -40,9 +40,9 @@ var watchTestTable = []struct {
t watch.EventType t watch.EventType
obj runtime.Object obj runtime.Object
}{ }{
{watch.Added, &Simple{Other: "A Name"}}, {watch.Added, &Simple{ObjectMeta: api.ObjectMeta{Name: "foo"}}},
{watch.Modified, &Simple{Other: "Another Name"}}, {watch.Modified, &Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}},
{watch.Deleted, &Simple{Other: "Another Name"}}, {watch.Deleted, &Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}},
} }
func TestWatchWebsocket(t *testing.T) { func TestWatchWebsocket(t *testing.T) {
@ -50,13 +50,13 @@ func TestWatchWebsocket(t *testing.T) {
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := Handle(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version", selfLinker) }, codec, "/api/version", selfLinker)
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
defer server.Close() defer server.Close()
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo" dest.Path = "/api/version/watch/foo"
dest.RawQuery = "" dest.RawQuery = ""
ws, err := websocket.Dial(dest.String(), "", "http://localhost") ws, err := websocket.Dial(dest.String(), "", "http://localhost")
@ -76,7 +76,14 @@ func TestWatchWebsocket(t *testing.T) {
if got.Type != action { if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type) t.Errorf("Unexpected type: %v", got.Type)
} }
if e, a := runtime.EncodeOrDie(codec, object), string(got.Object); !reflect.DeepEqual(e, a) { gotObj, err := codec.Decode(got.Object)
if err != nil {
t.Fatalf("Decode error: %v", err)
}
if _, err := api.GetReference(gotObj); err != nil {
t.Errorf("Unable to construct reference: %v", err)
}
if e, a := object, gotObj; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }
} }
@ -97,13 +104,13 @@ func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version", selfLinker) }, codec, "/api/version", selfLinker)
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
defer server.Close() defer server.Close()
client := http.Client{} client := http.Client{}
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo" dest.Path = "/api/version/watch/foo"
dest.RawQuery = "" dest.RawQuery = ""
request, err := http.NewRequest("GET", dest.String(), nil) request, err := http.NewRequest("GET", dest.String(), nil)
@ -134,7 +141,16 @@ func TestWatchHTTP(t *testing.T) {
if got.Type != item.t { if got.Type != item.t {
t.Errorf("%d: Unexpected type: %v", i, got.Type) t.Errorf("%d: Unexpected type: %v", i, got.Type)
} }
if e, a := runtime.EncodeOrDie(codec, item.obj), string(got.Object); !reflect.DeepEqual(e, a) { t.Logf("obj: %v", string(got.Object))
gotObj, err := codec.Decode(got.Object)
if err != nil {
t.Fatalf("Decode error: %v", err)
}
t.Logf("obj: %#v", gotObj)
if _, err := api.GetReference(gotObj); err != nil {
t.Errorf("Unable to construct reference: %v", err)
}
if e, a := item.obj, gotObj; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }
} }
@ -151,12 +167,12 @@ func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version", selfLinker) }, codec, "/api/version", selfLinker)
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
defer server.Close() defer server.Close()
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo" dest.Path = "/api/version/watch/foo"
table := []struct { table := []struct {
rawQuery string rawQuery string
@ -223,14 +239,14 @@ func TestWatchProtocolSelection(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := Handle(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version", selfLinker) }, codec, "/api/version", selfLinker)
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
defer server.Close() defer server.Close()
defer server.CloseClientConnections() defer server.CloseClientConnections()
client := http.Client{} client := http.Client{}
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo" dest.Path = "/api/version/watch/foo"
dest.RawQuery = "" dest.RawQuery = ""
table := []struct { table := []struct {