diff --git a/pkg/client/request.go b/pkg/client/request.go index e7aaca90d6a..6708d492715 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -18,15 +18,20 @@ package client import ( "bytes" + "encoding/json" + "fmt" "io" "io/ioutil" "net/http" "net/url" "path" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -191,25 +196,51 @@ func (r *Request) PollPeriod(d time.Duration) *Request { return r } +func (r *Request) finalURL() string { + finalURL := r.c.host + r.path + query := url.Values{} + if r.selector != nil { + query.Add("labels", r.selector.String()) + } + if r.sync { + query.Add("sync", "true") + if r.timeout != 0 { + query.Add("timeout", r.timeout.String()) + } + } + finalURL += "?" + query.Encode() + return finalURL +} + +// Attempts to begin watching the requested location. Returns a watch.Interface, or an error. +func (r *Request) Watch() (watch.Interface, error) { + if r.err != nil { + return nil, r.err + } + req, err := http.NewRequest(r.verb, r.finalURL(), r.body) + if err != nil { + return nil, err + } + if r.c.auth != nil { + req.SetBasicAuth(r.c.auth.User, r.c.auth.Password) + } + response, err := r.c.httpClient.Do(req) + if err != nil { + return nil, err + } + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Got status: %v", response.StatusCode) + } + return newHTTPWatcher(response.Body), nil +} + // Do formats and executes the request. Returns the API object received, or an error. func (r *Request) Do() Result { for { if r.err != nil { return Result{err: r.err} } - finalURL := r.c.host + r.path - query := url.Values{} - if r.selector != nil { - query.Add("labels", r.selector.String()) - } - if r.sync { - query.Add("sync", "true") - if r.timeout != 0 { - query.Add("timeout", r.timeout.String()) - } - } - finalURL += "?" + query.Encode() - req, err := http.NewRequest(r.verb, finalURL, r.body) + req, err := http.NewRequest(r.verb, r.finalURL(), r.body) if err != nil { return Result{err: err} } @@ -262,3 +293,74 @@ func (r Result) Into(obj interface{}) error { func (r Result) Error() error { return r.err } + +type httpWatcher struct { + source io.ReadCloser + result chan watch.Event + done chan struct{} + sync.Mutex + stopped bool +} + +func newHTTPWatcher(source io.ReadCloser) *httpWatcher { + hw := &httpWatcher{ + source: source, + result: make(chan watch.Event), + done: make(chan struct{}), + } + go hw.receive() + return hw +} + +// Implements watch.Interface +func (hw *httpWatcher) ResultChan() <-chan watch.Event { + return hw.result +} + +// Implements watch.Interface +func (hw *httpWatcher) Stop() { + hw.Lock() + defer hw.Unlock() + if !hw.stopped { + close(hw.done) + hw.stopped = true + } +} + +// In a loop, read results from http, decode, and send down the result channel. +func (hw *httpWatcher) receive() { + defer close(hw.result) + defer hw.source.Close() + defer util.HandleCrash() + + decoder := json.NewDecoder(hw.source) + + decoded := make(chan *api.WatchEvent) + + // Read one at a time. Have to do this separately because Decode blocks and + // we want to wait on the done channel, too. + go func() { + defer util.HandleCrash() + for { + var got api.WatchEvent + err := decoder.Decode(&got) + if err != nil { + hw.Stop() + return + } + decoded <- &got + } + }() + + for { + select { + case <-hw.done: + return + case got := <-decoded: + hw.result <- watch.Event{ + Type: got.Type, + Object: got.Object.Object, + } + } + } +} diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 14383e0aa47..c4c0f6779d3 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -18,16 +18,20 @@ package client import ( "bytes" + "encoding/base64" + "encoding/json" "io/ioutil" "net/http" "net/http/httptest" "reflect" + "strings" "testing" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestDoRequestNewWay(t *testing.T) { @@ -303,3 +307,91 @@ func TestPolling(t *testing.T) { f() } } + +func authFromReq(r *http.Request) (*AuthInfo, bool) { + auth, ok := r.Header["Authorization"] + if !ok { + return nil, false + } + + encoded := strings.Split(auth[0], " ") + if len(encoded) != 2 || encoded[0] != "Basic" { + return nil, false + } + + decoded, err := base64.StdEncoding.DecodeString(encoded[1]) + if err != nil { + return nil, false + } + parts := strings.Split(string(decoded), ":") + if len(parts) != 2 { + return nil, false + } + return &AuthInfo{User: parts[0], Password: parts[1]}, true +} + +// checkAuth sets errors if the auth found in r doesn't match the expectation. +// TODO: Move to util, test in more places. +func checkAuth(t *testing.T, expect AuthInfo, r *http.Request) { + foundAuth, found := authFromReq(r) + if !found { + t.Errorf("no auth found") + } else if e, a := expect, *foundAuth; !reflect.DeepEqual(e, a) { + t.Fatalf("Wrong basic auth: wanted %#v, got %#v", e, a) + } +} + +func TestWatch(t *testing.T) { + var table = []struct { + t watch.EventType + obj interface{} + }{ + {watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}}, + {watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}}, + {watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}}, + } + + auth := AuthInfo{User: "user", Password: "pass"} + testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkAuth(t, auth, r) + flusher, ok := w.(http.Flusher) + if !ok { + panic("need flusher!") + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := json.NewEncoder(w) + for _, item := range table { + encoder.Encode(&api.WatchEvent{item.t, api.APIObject{item.obj}}) + flusher.Flush() + } + })) + + s := New(testServer.URL, &auth) + + watching, err := s.Get().Path("path/to/watch/thing").Watch() + if err != nil { + t.Fatalf("Unexpected error") + } + + for _, item := range table { + got, ok := <-watching.ResultChan() + if !ok { + t.Fatalf("Unexpected early close") + } + if e, a := item.t, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := item.obj, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + _, ok := <-watching.ResultChan() + if ok { + t.Fatal("Unexpected non-close") + } +} diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 6ba3e26a0d5..59ddfbc5795 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -29,7 +29,7 @@ type Interface interface { // Returns a chan which will receive all the events. If an error occurs // or Stop() is called, this channel will be closed, in which case the // watch should be completely cleaned up. - ResultChan() <-chan *Event + ResultChan() <-chan Event } // EventType defines the possible types of events. @@ -52,14 +52,14 @@ type Event struct { // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. type FakeWatcher struct { - result chan *Event + result chan Event Stopped bool sync.Mutex } func NewFake() *FakeWatcher { return &FakeWatcher{ - result: make(chan *Event), + result: make(chan Event), } } @@ -67,30 +67,32 @@ func NewFake() *FakeWatcher { func (f *FakeWatcher) Stop() { f.Lock() defer f.Unlock() - close(f.result) - f.Stopped = true + if !f.Stopped { + close(f.result) + f.Stopped = true + } } -func (f *FakeWatcher) ResultChan() <-chan *Event { +func (f *FakeWatcher) ResultChan() <-chan Event { return f.result } // Add sends an add event. func (f *FakeWatcher) Add(obj interface{}) { - f.result <- &Event{Added, obj} + f.result <- Event{Added, obj} } // Modify sends a modify event. func (f *FakeWatcher) Modify(obj interface{}) { - f.result <- &Event{Modified, obj} + f.result <- Event{Modified, obj} } // Delete sends a delete event. func (f *FakeWatcher) Delete(lastValue interface{}) { - f.result <- &Event{Deleted, lastValue} + f.result <- Event{Deleted, lastValue} } // Action sends an event of the requested type, for table-based testing. func (f *FakeWatcher) Action(action EventType, obj interface{}) { - f.result <- &Event{action, obj} + f.result <- Event{action, obj} }