diff --git a/pkg/api/types.go b/pkg/api/types.go index 29dc5fc5a9c..ce4921cc291 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -18,6 +18,7 @@ package api import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/fsouza/go-dockerclient" ) @@ -338,3 +339,12 @@ type ServerOpList struct { JSONBase `yaml:",inline" json:",inline"` Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"` } + +// WatchEvent objects are streamed from the api server in response to a watch request. +type WatchEvent struct { + // The type of the watch event; added, modified, or deleted. + Type watch.EventType + + // An object which can be decoded via api.Decode + EmbeddedObject []byte +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 4fdef4bc297..11c651acff7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -25,12 +26,14 @@ import ( "strings" "time" + "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -72,6 +75,13 @@ type RESTStorage interface { Update(interface{}) (<-chan interface{}, error) } +// ResourceWatcher should be implemented by all RESTStorage objects that +// want to offer the ability to watch for changes through the watch api. +type ResourceWatcher interface { + WatchAll() (watch.Interface, error) + WatchSingle(id string) (watch.Interface, error) +} + // WorkFunc is used to perform any time consuming work for an api call, after // the input has been validated. Pass one of these to MakeAsync to create an // appropriate return value for the Update, Delete, and Create methods. @@ -136,13 +146,24 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer { s.mux.HandleFunc("/index.html", s.handleIndex) // Handle both operations and operations/* with the same handler - opPrefix := path.Join(s.prefix, "operations") - s.mux.HandleFunc(opPrefix, s.handleOperationRequest) - s.mux.HandleFunc(opPrefix+"/", s.handleOperationRequest) + s.mux.HandleFunc(s.operationPrefix(), s.handleOperationRequest) + s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperationRequest) + + s.mux.HandleFunc(s.watchPrefix()+"/", s.handleWatch) + + s.mux.HandleFunc("/", s.notFound) return s } +func (s *APIServer) operationPrefix() string { + return path.Join(s.prefix, "operations") +} + +func (s *APIServer) watchPrefix() string { + return path.Join(s.prefix, "watch") +} + func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) // TODO: serve this out of a file? @@ -175,25 +196,25 @@ func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // ServeREST handles requests to all our RESTStorage objects. func (server *APIServer) ServeREST(w http.ResponseWriter, req *http.Request) { if !strings.HasPrefix(req.URL.Path, server.prefix) { - server.notFound(req, w) + server.notFound(w, req) return } requestParts := strings.Split(req.URL.Path[len(server.prefix):], "/")[1:] if len(requestParts) < 1 { - server.notFound(req, w) + server.notFound(w, req) return } storage := server.storage[requestParts[0]] if storage == nil { httplog.LogOf(w).Addf("'%v' has no storage object", requestParts[0]) - server.notFound(req, w) + server.notFound(w, req) return } server.handleREST(requestParts, req, w, storage) } -func (server *APIServer) notFound(req *http.Request, w http.ResponseWriter) { +func (server *APIServer) notFound(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) fmt.Fprintf(w, "Not Found: %#v", req) } @@ -290,7 +311,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re case 2: item, err := storage.Get(parts[1]) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -299,11 +320,11 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } server.write(http.StatusOK, item, w) default: - server.notFound(req, w) + server.notFound(w, req) } case "POST": if len(parts) != 1 { - server.notFound(req, w) + server.notFound(w, req) return } body, err := server.readBody(req) @@ -313,7 +334,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } obj, err := storage.Extract(body) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -322,7 +343,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } out, err := storage.Create(obj) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -332,12 +353,12 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re server.finishReq(out, sync, timeout, w) case "DELETE": if len(parts) != 2 { - server.notFound(req, w) + server.notFound(w, req) return } out, err := storage.Delete(parts[1]) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -347,7 +368,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re server.finishReq(out, sync, timeout, w) case "PUT": if len(parts) != 2 { - server.notFound(req, w) + server.notFound(w, req) return } body, err := server.readBody(req) @@ -357,7 +378,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } obj, err := storage.Extract(body) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -366,7 +387,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } out, err := storage.Update(obj) if IsNotFound(err) { - server.notFound(req, w) + server.notFound(w, req) return } if err != nil { @@ -375,24 +396,24 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re } server.finishReq(out, sync, timeout, w) default: - server.notFound(req, w) + server.notFound(w, req) } } func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) { - opPrefix := path.Join(server.prefix, "operations") + opPrefix := server.operationPrefix() if !strings.HasPrefix(req.URL.Path, opPrefix) { - server.notFound(req, w) + server.notFound(w, req) return } trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/") parts := strings.Split(trimmed, "/") if len(parts) > 1 { - server.notFound(req, w) + server.notFound(w, req) return } if req.Method != "GET" { - server.notFound(req, w) + server.notFound(w, req) return } if len(parts) == 0 { @@ -404,7 +425,7 @@ func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http op := server.ops.Get(parts[0]) if op == nil { - server.notFound(req, w) + server.notFound(w, req) return } @@ -415,3 +436,140 @@ func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http server.write(http.StatusAccepted, obj, w) } } + +func (server *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { + prefix := server.watchPrefix() + if !strings.HasPrefix(req.URL.Path, prefix) { + server.notFound(w, req) + return + } + parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:] + if req.Method != "GET" || len(parts) < 1 { + server.notFound(w, req) + } + storage := server.storage[parts[0]] + if storage == nil { + server.notFound(w, req) + } + if watcher, ok := storage.(ResourceWatcher); ok { + var watching watch.Interface + var err error + if id := req.URL.Query().Get("id"); id != "" { + watching, err = watcher.WatchSingle(id) + } else { + watching, err = watcher.WatchAll() + } + if err != nil { + server.error(err, w) + return + } + + // TODO: This is one watch per connection. We want to multiplex, so that + // multiple watches of the same thing don't create two watches downstream. + watchServer := &WatchServer{watching} + if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { + websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) + } else { + watchServer.ServeHTTP(w, req) + } + return + } + + server.notFound(w, req) +} + +// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. +type WatchServer struct { + watching watch.Interface +} + +// HandleWS implements a websocket handler. +func (w *WatchServer) HandleWS(ws *websocket.Conn) { + done := make(chan struct{}) + go func() { + var unused interface{} + // Expect this to block until the connection is closed. Client should not + // send anything. + websocket.JSON.Receive(ws, &unused) + close(done) + }() + for { + select { + case <-done: + w.watching.Stop() + return + case event, ok := <-w.watching.ResultChan(): + if !ok { + // End of results. + return + } + wireFormat, err := api.Encode(event.Object) + if err != nil { + glog.Errorf("error encoding %#v: %v", event.Object, err) + return + } + err = websocket.JSON.Send(ws, &api.WatchEvent{ + Type: event.Type, + EmbeddedObject: wireFormat, + }) + if err != nil { + // Client disconnect. + w.watching.Stop() + return + } + } + } +} + +// ServeHTTP serves a series of JSON encoded events via straight HTTP with +// Transfer-Encoding: chunked. +func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + loggedW := httplog.LogOf(w) + w = httplog.Unlogged(w) + + cn, ok := w.(http.CloseNotifier) + if !ok { + loggedW.Addf("unable to get CloseNotifier") + http.NotFound(loggedW, req) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + loggedW.Addf("unable to get Flusher") + http.NotFound(loggedW, req) + return + } + + loggedW.Header().Set("Transfer-Encoding", "chunked") + loggedW.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := json.NewEncoder(w) + for { + select { + case <-cn.CloseNotify(): + self.watching.Stop() + return + case event, ok := <-self.watching.ResultChan(): + if !ok { + // End of results. + return + } + wireFormat, err := api.Encode(event.Object) + if err != nil { + glog.Errorf("error encoding %#v: %v", event.Object, err) + return + } + err = encoder.Encode(&api.WatchEvent{ + Type: event.Type, + EmbeddedObject: wireFormat, + }) + if err != nil { + // Client disconnect. + self.watching.Stop() + return + } + flusher.Flush() + } + } +} diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 01e50acdf1e..b4402cdfa93 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -18,17 +18,21 @@ package apiserver import ( "bytes" + "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httptest" + "net/url" "reflect" "sync" "testing" "time" + "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func init() { @@ -60,6 +64,12 @@ type SimpleRESTStorage struct { updated Simple created Simple + // Valid if WatchAll or WatchSingle is called + fakeWatch *watch.FakeWatcher + + // Set if WatchSingle is called + requestedID string + // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj recieves the original input to the update, delete, or create call. injectedFunction func(obj interface{}) (returnObj interface{}, err error) @@ -78,8 +88,8 @@ func (storage *SimpleRESTStorage) Get(id string) (interface{}, error) { func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) { storage.deleted = id - if storage.errors["delete"] != nil { - return nil, storage.errors["delete"] + if err := storage.errors["delete"]; err != nil { + return nil, err } return MakeAsync(func() (interface{}, error) { if storage.injectedFunction != nil { @@ -97,8 +107,8 @@ func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) { func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, error) { storage.created = obj.(Simple) - if storage.errors["create"] != nil { - return nil, storage.errors["create"] + if err := storage.errors["create"]; err != nil { + return nil, err } return MakeAsync(func() (interface{}, error) { if storage.injectedFunction != nil { @@ -110,8 +120,8 @@ func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, e func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, error) { storage.updated = obj.(Simple) - if storage.errors["update"] != nil { - return nil, storage.errors["update"] + if err := storage.errors["update"]; err != nil { + return nil, err } return MakeAsync(func() (interface{}, error) { if storage.injectedFunction != nil { @@ -121,6 +131,25 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e }), nil } +// Implement ResourceWatcher. +func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) { + if err := storage.errors["watchAll"]; err != nil { + return nil, err + } + storage.fakeWatch = watch.NewFake() + return storage.fakeWatch, nil +} + +// Implement ResourceWatcher. +func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) { + storage.requestedID = id + if err := storage.errors["watchSingle"]; err != nil { + return nil, err + } + storage.fakeWatch = watch.NewFake() + return storage.fakeWatch, nil +} + func extractBody(response *http.Response, object interface{}) (string, error) { defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) @@ -525,3 +554,123 @@ func TestOpGet(t *testing.T) { t.Errorf("Unexpected response %#v", response) } } + +var watchTestTable = []struct { + t watch.EventType + obj interface{} +}{ + {watch.Added, &Simple{Name: "A Name"}}, + {watch.Modified, &Simple{Name: "Another Name"}}, + {watch.Deleted, &Simple{Name: "Another Name"}}, +} + +func TestWatchWebsocket(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Scheme = "ws" // Required by websocket, though the server never sees it. + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + ws, err := websocket.Dial(dest.String(), "", "http://localhost") + expectNoError(t, err) + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := websocket.JSON.Receive(ws, &got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + apiObj, err := api.Decode(got.EmbeddedObject) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(object, apiObj) { + t.Errorf("Expected %v, got %v", object, apiObj) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = websocket.JSON.Receive(ws, &got) + if err == nil { + t.Errorf("Unexpected non-error") + } +} + +func TestWatchHTTP(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + request, err := http.NewRequest("GET", dest.String(), nil) + expectNoError(t, err) + response, err := client.Do(request) + expectNoError(t, err) + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + decoder := json.NewDecoder(response.Body) + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := decoder.Decode(&got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + apiObj, err := api.Decode(got.EmbeddedObject) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(object, apiObj) { + t.Errorf("Expected %v, got %v", object, apiObj) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = decoder.Decode(&got) + if err == nil { + t.Errorf("Unexpected non-error") + } +} diff --git a/pkg/httplog/log.go b/pkg/httplog/log.go index c65af57a2b6..ea8364a3a2d 100644 --- a/pkg/httplog/log.go +++ b/pkg/httplog/log.go @@ -96,6 +96,14 @@ func LogOf(w http.ResponseWriter) *respLogger { return nil } +// Unlogged returns the original ResponseWriter, or w if it is not our inserted logger. +func Unlogged(w http.ResponseWriter) http.ResponseWriter { + if rl, ok := w.(*respLogger); ok { + return rl.w + } + return w +} + // Sets the stacktrace logging predicate, which decides when to log a stacktrace. // There's a default, so you don't need to call this unless you don't like the default. func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger { diff --git a/pkg/watch/doc.go b/pkg/watch/doc.go new file mode 100644 index 00000000000..0c695f98e0c --- /dev/null +++ b/pkg/watch/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package watch contains a generic watchable interface, and a fake for +// testing code that uses the watch interface. +package watch diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go new file mode 100644 index 00000000000..6ba3e26a0d5 --- /dev/null +++ b/pkg/watch/watch.go @@ -0,0 +1,96 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" +) + +// Interface can be implemented by anything that knows how to watch and report changes. +type Interface interface { + // Stops watching. Will close the channel returned by ResultChan(). Releases + // any resources used by the watch. + Stop() + + // Returns a chan which will receive all the events. If an error occurs + // or Stop() is called, this channel will be closed, in which case the + // watch should be completely cleaned up. + ResultChan() <-chan *Event +} + +// EventType defines the possible types of events. +type EventType string + +const ( + Added EventType = "ADDED" + Modified EventType = "MODIFIED" + Deleted EventType = "DELETED" +) + +// Event represents a single event to a watched resource. +type Event struct { + Type EventType + + // If Type == Deleted, then this is the state of the object + // immediately before deletion. + Object interface{} +} + +// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. +type FakeWatcher struct { + result chan *Event + Stopped bool + sync.Mutex +} + +func NewFake() *FakeWatcher { + return &FakeWatcher{ + result: make(chan *Event), + } +} + +// Stop implements Interface.Stop(). +func (f *FakeWatcher) Stop() { + f.Lock() + defer f.Unlock() + close(f.result) + f.Stopped = true +} + +func (f *FakeWatcher) ResultChan() <-chan *Event { + return f.result +} + +// Add sends an add event. +func (f *FakeWatcher) Add(obj interface{}) { + f.result <- &Event{Added, obj} +} + +// Modify sends a modify event. +func (f *FakeWatcher) Modify(obj interface{}) { + f.result <- &Event{Modified, obj} +} + +// Delete sends a delete event. +func (f *FakeWatcher) Delete(lastValue interface{}) { + f.result <- &Event{Deleted, lastValue} +} + +// Action sends an event of the requested type, for table-based testing. +func (f *FakeWatcher) Action(action EventType, obj interface{}) { + f.result <- &Event{action, obj} +} diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go new file mode 100644 index 00000000000..d54c2e60acb --- /dev/null +++ b/pkg/watch/watch_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "testing" +) + +func TestFake(t *testing.T) { + f := NewFake() + + table := []struct { + t EventType + s string + }{ + {Added, "foo"}, + {Modified, "qux"}, + {Modified, "bar"}, + {Deleted, "bar"}, + } + + // Prove that f implements Interface by phrasing this as a function. + consumer := func(w Interface) { + for _, expect := range table { + got, ok := <-w.ResultChan() + if !ok { + t.Fatalf("closed early") + } + if e, a := expect.t, got.Type; e != a { + t.Fatalf("Expected %v, got %v", e, a) + } + if a, ok := got.Object.(string); !ok || a != expect.s { + t.Fatalf("Expected %v, got %v", expect.s, a) + } + } + _, stillOpen := <-w.ResultChan() + if stillOpen { + t.Fatal("Never stopped") + } + } + + sender := func() { + f.Add("foo") + f.Action(Modified, "qux") + f.Modify("bar") + f.Delete("bar") + f.Stop() + } + + go sender() + consumer(f) +}