Add watch mechanism to apiserver

Implemented via HTTP and websocket. A test is present but this isn't
yet wired into anything.

Eventual purpose of this is to allow a scheduler to watch for new pods.
Or allow replication controller to watch for new items it controlls.
Generally, it'll be good to turn everything possible into a push instead
of a poll.
This commit is contained in:
Daniel Smith 2014-07-17 10:05:14 -07:00
parent af0ded703f
commit eda30d4f20
7 changed files with 535 additions and 29 deletions

View File

@ -18,6 +18,7 @@ package api
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/fsouza/go-dockerclient"
)
@ -338,3 +339,12 @@ type ServerOpList struct {
JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}
// WatchEvent objects are streamed from the api server in response to a watch request.
type WatchEvent struct {
// The type of the watch event; added, modified, or deleted.
Type watch.EventType
// An object which can be decoded via api.Decode
EmbeddedObject []byte
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package apiserver
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@ -25,12 +26,14 @@ import (
"strings"
"time"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
@ -72,6 +75,13 @@ type RESTStorage interface {
Update(interface{}) (<-chan interface{}, error)
}
// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
}
// WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods.
@ -136,13 +146,24 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer {
s.mux.HandleFunc("/index.html", s.handleIndex)
// Handle both operations and operations/* with the same handler
opPrefix := path.Join(s.prefix, "operations")
s.mux.HandleFunc(opPrefix, s.handleOperationRequest)
s.mux.HandleFunc(opPrefix+"/", s.handleOperationRequest)
s.mux.HandleFunc(s.operationPrefix(), s.handleOperationRequest)
s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperationRequest)
s.mux.HandleFunc(s.watchPrefix()+"/", s.handleWatch)
s.mux.HandleFunc("/", s.notFound)
return s
}
func (s *APIServer) operationPrefix() string {
return path.Join(s.prefix, "operations")
}
func (s *APIServer) watchPrefix() string {
return path.Join(s.prefix, "watch")
}
func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
// TODO: serve this out of a file?
@ -175,25 +196,25 @@ func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// ServeREST handles requests to all our RESTStorage objects.
func (server *APIServer) ServeREST(w http.ResponseWriter, req *http.Request) {
if !strings.HasPrefix(req.URL.Path, server.prefix) {
server.notFound(req, w)
server.notFound(w, req)
return
}
requestParts := strings.Split(req.URL.Path[len(server.prefix):], "/")[1:]
if len(requestParts) < 1 {
server.notFound(req, w)
server.notFound(w, req)
return
}
storage := server.storage[requestParts[0]]
if storage == nil {
httplog.LogOf(w).Addf("'%v' has no storage object", requestParts[0])
server.notFound(req, w)
server.notFound(w, req)
return
}
server.handleREST(requestParts, req, w, storage)
}
func (server *APIServer) notFound(req *http.Request, w http.ResponseWriter) {
func (server *APIServer) notFound(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, "Not Found: %#v", req)
}
@ -290,7 +311,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
case 2:
item, err := storage.Get(parts[1])
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -299,11 +320,11 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
server.write(http.StatusOK, item, w)
default:
server.notFound(req, w)
server.notFound(w, req)
}
case "POST":
if len(parts) != 1 {
server.notFound(req, w)
server.notFound(w, req)
return
}
body, err := server.readBody(req)
@ -313,7 +334,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
obj, err := storage.Extract(body)
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -322,7 +343,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
out, err := storage.Create(obj)
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -332,12 +353,12 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
server.finishReq(out, sync, timeout, w)
case "DELETE":
if len(parts) != 2 {
server.notFound(req, w)
server.notFound(w, req)
return
}
out, err := storage.Delete(parts[1])
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -347,7 +368,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
server.finishReq(out, sync, timeout, w)
case "PUT":
if len(parts) != 2 {
server.notFound(req, w)
server.notFound(w, req)
return
}
body, err := server.readBody(req)
@ -357,7 +378,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
obj, err := storage.Extract(body)
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -366,7 +387,7 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
out, err := storage.Update(obj)
if IsNotFound(err) {
server.notFound(req, w)
server.notFound(w, req)
return
}
if err != nil {
@ -375,24 +396,24 @@ func (server *APIServer) handleREST(parts []string, req *http.Request, w http.Re
}
server.finishReq(out, sync, timeout, w)
default:
server.notFound(req, w)
server.notFound(w, req)
}
}
func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) {
opPrefix := path.Join(server.prefix, "operations")
opPrefix := server.operationPrefix()
if !strings.HasPrefix(req.URL.Path, opPrefix) {
server.notFound(req, w)
server.notFound(w, req)
return
}
trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/")
parts := strings.Split(trimmed, "/")
if len(parts) > 1 {
server.notFound(req, w)
server.notFound(w, req)
return
}
if req.Method != "GET" {
server.notFound(req, w)
server.notFound(w, req)
return
}
if len(parts) == 0 {
@ -404,7 +425,7 @@ func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http
op := server.ops.Get(parts[0])
if op == nil {
server.notFound(req, w)
server.notFound(w, req)
return
}
@ -415,3 +436,140 @@ func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http
server.write(http.StatusAccepted, obj, w)
}
}
func (server *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) {
prefix := server.watchPrefix()
if !strings.HasPrefix(req.URL.Path, prefix) {
server.notFound(w, req)
return
}
parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:]
if req.Method != "GET" || len(parts) < 1 {
server.notFound(w, req)
}
storage := server.storage[parts[0]]
if storage == nil {
server.notFound(w, req)
}
if watcher, ok := storage.(ResourceWatcher); ok {
var watching watch.Interface
var err error
if id := req.URL.Query().Get("id"); id != "" {
watching, err = watcher.WatchSingle(id)
} else {
watching, err = watcher.WatchAll()
}
if err != nil {
server.error(err, w)
return
}
// TODO: This is one watch per connection. We want to multiplex, so that
// multiple watches of the same thing don't create two watches downstream.
watchServer := &WatchServer{watching}
if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" {
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req)
} else {
watchServer.ServeHTTP(w, req)
}
return
}
server.notFound(w, req)
}
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
watching watch.Interface
}
// HandleWS implements a websocket handler.
func (w *WatchServer) HandleWS(ws *websocket.Conn) {
done := make(chan struct{})
go func() {
var unused interface{}
// Expect this to block until the connection is closed. Client should not
// send anything.
websocket.JSON.Receive(ws, &unused)
close(done)
}()
for {
select {
case <-done:
w.watching.Stop()
return
case event, ok := <-w.watching.ResultChan():
if !ok {
// End of results.
return
}
wireFormat, err := api.Encode(event.Object)
if err != nil {
glog.Errorf("error encoding %#v: %v", event.Object, err)
return
}
err = websocket.JSON.Send(ws, &api.WatchEvent{
Type: event.Type,
EmbeddedObject: wireFormat,
})
if err != nil {
// Client disconnect.
w.watching.Stop()
return
}
}
}
}
// ServeHTTP serves a series of JSON encoded events via straight HTTP with
// Transfer-Encoding: chunked.
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
loggedW := httplog.LogOf(w)
w = httplog.Unlogged(w)
cn, ok := w.(http.CloseNotifier)
if !ok {
loggedW.Addf("unable to get CloseNotifier")
http.NotFound(loggedW, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
loggedW.Addf("unable to get Flusher")
http.NotFound(loggedW, req)
return
}
loggedW.Header().Set("Transfer-Encoding", "chunked")
loggedW.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := json.NewEncoder(w)
for {
select {
case <-cn.CloseNotify():
self.watching.Stop()
return
case event, ok := <-self.watching.ResultChan():
if !ok {
// End of results.
return
}
wireFormat, err := api.Encode(event.Object)
if err != nil {
glog.Errorf("error encoding %#v: %v", event.Object, err)
return
}
err = encoder.Encode(&api.WatchEvent{
Type: event.Type,
EmbeddedObject: wireFormat,
})
if err != nil {
// Client disconnect.
self.watching.Stop()
return
}
flusher.Flush()
}
}
}

View File

@ -18,17 +18,21 @@ package apiserver
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"sync"
"testing"
"time"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func init() {
@ -60,6 +64,12 @@ type SimpleRESTStorage struct {
updated Simple
created Simple
// Valid if WatchAll or WatchSingle is called
fakeWatch *watch.FakeWatcher
// Set if WatchSingle is called
requestedID string
// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj recieves the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error)
@ -78,8 +88,8 @@ func (storage *SimpleRESTStorage) Get(id string) (interface{}, error) {
func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) {
storage.deleted = id
if storage.errors["delete"] != nil {
return nil, storage.errors["delete"]
if err := storage.errors["delete"]; err != nil {
return nil, err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
@ -97,8 +107,8 @@ func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) {
func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, error) {
storage.created = obj.(Simple)
if storage.errors["create"] != nil {
return nil, storage.errors["create"]
if err := storage.errors["create"]; err != nil {
return nil, err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
@ -110,8 +120,8 @@ func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, e
func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, error) {
storage.updated = obj.(Simple)
if storage.errors["update"] != nil {
return nil, storage.errors["update"]
if err := storage.errors["update"]; err != nil {
return nil, err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
@ -121,6 +131,25 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e
}), nil
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) {
if err := storage.errors["watchAll"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
return storage.fakeWatch, nil
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) {
storage.requestedID = id
if err := storage.errors["watchSingle"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
return storage.fakeWatch, nil
}
func extractBody(response *http.Response, object interface{}) (string, error) {
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
@ -525,3 +554,123 @@ func TestOpGet(t *testing.T) {
t.Errorf("Unexpected response %#v", response)
}
}
var watchTestTable = []struct {
t watch.EventType
obj interface{}
}{
{watch.Added, &Simple{Name: "A Name"}},
{watch.Modified, &Simple{Name: "Another Name"}},
{watch.Deleted, &Simple{Name: "Another Name"}},
}
func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
expectNoError(t, err)
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := websocket.JSON.Receive(ws, &got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
apiObj, err := api.Decode(got.EmbeddedObject)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(object, apiObj) {
t.Errorf("Expected %v, got %v", object, apiObj)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = websocket.JSON.Receive(ws, &got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}
func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
request, err := http.NewRequest("GET", dest.String(), nil)
expectNoError(t, err)
response, err := client.Do(request)
expectNoError(t, err)
if response.StatusCode != http.StatusOK {
t.Errorf("Unexpected response %#v", response)
}
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
decoder := json.NewDecoder(response.Body)
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := decoder.Decode(&got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
apiObj, err := api.Decode(got.EmbeddedObject)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(object, apiObj) {
t.Errorf("Expected %v, got %v", object, apiObj)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = decoder.Decode(&got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}

View File

@ -96,6 +96,14 @@ func LogOf(w http.ResponseWriter) *respLogger {
return nil
}
// Unlogged returns the original ResponseWriter, or w if it is not our inserted logger.
func Unlogged(w http.ResponseWriter) http.ResponseWriter {
if rl, ok := w.(*respLogger); ok {
return rl.w
}
return w
}
// Sets the stacktrace logging predicate, which decides when to log a stacktrace.
// There's a default, so you don't need to call this unless you don't like the default.
func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {

19
pkg/watch/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package watch contains a generic watchable interface, and a fake for
// testing code that uses the watch interface.
package watch

96
pkg/watch/watch.go Normal file
View File

@ -0,0 +1,96 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"sync"
)
// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan *Event
}
// EventType defines the possible types of events.
type EventType string
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
)
// Event represents a single event to a watched resource.
type Event struct {
Type EventType
// If Type == Deleted, then this is the state of the object
// immediately before deletion.
Object interface{}
}
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type FakeWatcher struct {
result chan *Event
Stopped bool
sync.Mutex
}
func NewFake() *FakeWatcher {
return &FakeWatcher{
result: make(chan *Event),
}
}
// Stop implements Interface.Stop().
func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
close(f.result)
f.Stopped = true
}
func (f *FakeWatcher) ResultChan() <-chan *Event {
return f.result
}
// Add sends an add event.
func (f *FakeWatcher) Add(obj interface{}) {
f.result <- &Event{Added, obj}
}
// Modify sends a modify event.
func (f *FakeWatcher) Modify(obj interface{}) {
f.result <- &Event{Modified, obj}
}
// Delete sends a delete event.
func (f *FakeWatcher) Delete(lastValue interface{}) {
f.result <- &Event{Deleted, lastValue}
}
// Action sends an event of the requested type, for table-based testing.
func (f *FakeWatcher) Action(action EventType, obj interface{}) {
f.result <- &Event{action, obj}
}

66
pkg/watch/watch_test.go Normal file
View File

@ -0,0 +1,66 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"testing"
)
func TestFake(t *testing.T) {
f := NewFake()
table := []struct {
t EventType
s string
}{
{Added, "foo"},
{Modified, "qux"},
{Modified, "bar"},
{Deleted, "bar"},
}
// Prove that f implements Interface by phrasing this as a function.
consumer := func(w Interface) {
for _, expect := range table {
got, ok := <-w.ResultChan()
if !ok {
t.Fatalf("closed early")
}
if e, a := expect.t, got.Type; e != a {
t.Fatalf("Expected %v, got %v", e, a)
}
if a, ok := got.Object.(string); !ok || a != expect.s {
t.Fatalf("Expected %v, got %v", expect.s, a)
}
}
_, stillOpen := <-w.ResultChan()
if stillOpen {
t.Fatal("Never stopped")
}
}
sender := func() {
f.Add("foo")
f.Action(Modified, "qux")
f.Modify("bar")
f.Delete("bar")
f.Stop()
}
go sender()
consumer(f)
}