Split watch

This commit is contained in:
Clayton Coleman 2014-07-29 17:35:09 -04:00
parent a41aeeca67
commit b911d9265a
4 changed files with 255 additions and 202 deletions

View File

@ -18,7 +18,6 @@ package apiserver
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
@ -632,89 +631,3 @@ func (server *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) {
server.notFound(w, req)
}
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
watching watch.Interface
}
// HandleWS implements a websocket handler.
func (w *WatchServer) HandleWS(ws *websocket.Conn) {
done := make(chan struct{})
go func() {
var unused interface{}
// Expect this to block until the connection is closed. Client should not
// send anything.
websocket.JSON.Receive(ws, &unused)
close(done)
}()
for {
select {
case <-done:
w.watching.Stop()
return
case event, ok := <-w.watching.ResultChan():
if !ok {
// End of results.
return
}
err := websocket.JSON.Send(ws, &api.WatchEvent{
Type: event.Type,
Object: api.APIObject{event.Object},
})
if err != nil {
// Client disconnect.
w.watching.Stop()
return
}
}
}
}
// ServeHTTP serves a series of JSON encoded events via straight HTTP with
// Transfer-Encoding: chunked.
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
loggedW := httplog.LogOf(w)
w = httplog.Unlogged(w)
cn, ok := w.(http.CloseNotifier)
if !ok {
loggedW.Addf("unable to get CloseNotifier")
http.NotFound(loggedW, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
loggedW.Addf("unable to get Flusher")
http.NotFound(loggedW, req)
return
}
loggedW.Header().Set("Transfer-Encoding", "chunked")
loggedW.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := json.NewEncoder(w)
for {
select {
case <-cn.CloseNotify():
self.watching.Stop()
return
case event, ok := <-self.watching.ResultChan():
if !ok {
// End of results.
return
}
err := encoder.Encode(&api.WatchEvent{
Type: event.Type,
Object: api.APIObject{event.Object},
})
if err != nil {
// Client disconnect.
self.watching.Stop()
return
}
flusher.Flush()
}
}
}

View File

@ -18,7 +18,6 @@ package apiserver
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
@ -31,7 +30,6 @@ import (
"testing"
"time"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -575,119 +573,6 @@ func TestOpGet(t *testing.T) {
t.Errorf("Unexpected response %#v", response)
}
}
var watchTestTable = []struct {
t watch.EventType
obj interface{}
}{
{watch.Added, &Simple{Name: "A Name"}},
{watch.Modified, &Simple{Name: "Another Name"}},
{watch.Deleted, &Simple{Name: "Another Name"}},
}
func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
expectNoError(t, err)
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := websocket.JSON.Receive(ws, &got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = websocket.JSON.Receive(ws, &got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}
func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
request, err := http.NewRequest("GET", dest.String(), nil)
expectNoError(t, err)
response, err := client.Do(request)
expectNoError(t, err)
if response.StatusCode != http.StatusOK {
t.Errorf("Unexpected response %#v", response)
}
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
decoder := json.NewDecoder(response.Body)
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := decoder.Decode(&got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = decoder.Decode(&got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}
func TestMinionTransport(t *testing.T) {
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
transport := &minionTransport{}

113
pkg/apiserver/watch.go Normal file
View File

@ -0,0 +1,113 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"encoding/json"
"net/http"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
watching watch.Interface
}
// HandleWS implements a websocket handler.
func (w *WatchServer) HandleWS(ws *websocket.Conn) {
done := make(chan struct{})
go func() {
var unused interface{}
// Expect this to block until the connection is closed. Client should not
// send anything.
websocket.JSON.Receive(ws, &unused)
close(done)
}()
for {
select {
case <-done:
w.watching.Stop()
return
case event, ok := <-w.watching.ResultChan():
if !ok {
// End of results.
return
}
err := websocket.JSON.Send(ws, &api.WatchEvent{
Type: event.Type,
Object: api.APIObject{event.Object},
})
if err != nil {
// Client disconnect.
w.watching.Stop()
return
}
}
}
}
// ServeHTTP serves a series of JSON encoded events via straight HTTP with
// Transfer-Encoding: chunked.
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
loggedW := httplog.LogOf(w)
w = httplog.Unlogged(w)
cn, ok := w.(http.CloseNotifier)
if !ok {
loggedW.Addf("unable to get CloseNotifier")
http.NotFound(loggedW, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
loggedW.Addf("unable to get Flusher")
http.NotFound(loggedW, req)
return
}
loggedW.Header().Set("Transfer-Encoding", "chunked")
loggedW.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := json.NewEncoder(w)
for {
select {
case <-cn.CloseNotify():
self.watching.Stop()
return
case event, ok := <-self.watching.ResultChan():
if !ok {
// End of results.
return
}
err := encoder.Encode(&api.WatchEvent{
Type: event.Type,
Object: api.APIObject{event.Object},
})
if err != nil {
// Client disconnect.
self.watching.Stop()
return
}
flusher.Flush()
}
}
}

142
pkg/apiserver/watch_test.go Normal file
View File

@ -0,0 +1,142 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
var watchTestTable = []struct {
t watch.EventType
obj interface{}
}{
{watch.Added, &Simple{Name: "A Name"}},
{watch.Modified, &Simple{Name: "Another Name"}},
{watch.Deleted, &Simple{Name: "Another Name"}},
}
func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
expectNoError(t, err)
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := websocket.JSON.Receive(ws, &got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = websocket.JSON.Receive(ws, &got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}
func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
request, err := http.NewRequest("GET", dest.String(), nil)
expectNoError(t, err)
response, err := client.Do(request)
expectNoError(t, err)
if response.StatusCode != http.StatusOK {
t.Errorf("Unexpected response %#v", response)
}
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
decoder := json.NewDecoder(response.Body)
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
err := decoder.Decode(&got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
for _, item := range watchTestTable {
try(item.t, item.obj)
}
simpleStorage.fakeWatch.Stop()
var got api.WatchEvent
err = decoder.Decode(&got)
if err == nil {
t.Errorf("Unexpected non-error")
}
}