diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a5986bbe919..8448517d61e 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -17,30 +17,19 @@ limitations under the License. package apiserver import ( - "bytes" "encoding/json" "fmt" "io/ioutil" - "net" "net/http" - "net/http/httputil" - "net/url" - "path" "runtime/debug" "strings" "time" - "code.google.com/p/go.net/html" - "code.google.com/p/go.net/html/atom" - "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -63,66 +52,6 @@ func NewNotFoundErr(kind, name string) error { return errNotFound(fmt.Sprintf("%s %q not found", kind, name)) } -// RESTStorage is a generic interface for RESTful storage services -// Resources which are exported to the RESTful API of apiserver need to implement this interface. -type RESTStorage interface { - // List selects resources in the storage which match to the selector. - List(labels.Selector) (interface{}, error) - - // Get finds a resource in the storage by id and returns it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. - Get(id string) (interface{}, error) - - // Delete finds a resource in the storage and deletes it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. - Delete(id string) (<-chan interface{}, error) - - Extract(body []byte) (interface{}, error) - Create(interface{}) (<-chan interface{}, error) - Update(interface{}) (<-chan interface{}, error) -} - -// ResourceWatcher should be implemented by all RESTStorage objects that -// want to offer the ability to watch for changes through the watch api. -type ResourceWatcher interface { - WatchAll() (watch.Interface, error) - WatchSingle(id string) (watch.Interface, error) -} - -// WorkFunc is used to perform any time consuming work for an api call, after -// the input has been validated. Pass one of these to MakeAsync to create an -// appropriate return value for the Update, Delete, and Create methods. -type WorkFunc func() (result interface{}, err error) - -// MakeAsync takes a function and executes it, delivering the result in the way required -// by RESTStorage's Update, Delete, and Create methods. -func MakeAsync(fn WorkFunc) <-chan interface{} { - channel := make(chan interface{}) - go func() { - defer util.HandleCrash() - obj, err := fn() - if err != nil { - status := http.StatusInternalServerError - switch { - case tools.IsEtcdConflict(err): - status = http.StatusConflict - } - channel <- &api.Status{ - Status: api.StatusFailure, - Details: err.Error(), - Code: status, - } - } else { - channel <- obj - } - // 'close' is used to signal that no further values will - // be written to the channel. Not strictly necessary, but - // also won't hurt. - close(channel) - }() - return channel -} - // APIServer is an HTTPHandler that delegates to RESTStorage objects. // It handles URLs of the form: // ${prefix}/${storage_key}[/${object_name}] @@ -147,177 +76,32 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer { mux: http.NewServeMux(), } - s.mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))) - s.mux.HandleFunc(s.prefix+"/", s.ServeREST) - healthz.InstallHandler(s.mux) - - s.mux.HandleFunc("/", s.handleIndex) - s.mux.HandleFunc("/version", s.handleVersionReq) - - // Handle both operations and operations/* with the same handler - s.mux.HandleFunc(s.operationPrefix(), s.handleOperationRequest) - s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperationRequest) - + // Primary API methods + s.mux.HandleFunc(s.prefix+"/", s.handleREST) s.mux.HandleFunc(s.watchPrefix()+"/", s.handleWatch) - s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq) + // Support services for the apiserver + s.mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))) + healthz.InstallHandler(s.mux) + s.mux.HandleFunc("/version", handleVersion) + s.mux.HandleFunc("/", handleIndex) + + // Handle both operations and operations/* with the same handler + s.mux.HandleFunc(s.operationPrefix(), s.handleOperation) + s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperation) + + // Proxy minion requests + s.mux.HandleFunc("/proxy/minion/", s.handleProxyMinion) return s } -func (server *APIServer) operationPrefix() string { - return path.Join(server.prefix, "operations") -} - -func (server *APIServer) watchPrefix() string { - return path.Join(server.prefix, "watch") -} - -func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) { - if req.URL.Path != "/" && req.URL.Path != "/index.html" { - server.notFound(w, req) - return - } - w.WriteHeader(http.StatusOK) - // TODO: serve this out of a file? - data := "Welcome to Kubernetes" - fmt.Fprint(w, data) -} - -// handleVersionReq writes the server's version information. -func (server *APIServer) handleVersionReq(w http.ResponseWriter, req *http.Request) { - server.writeRawJSON(http.StatusOK, version.Get(), w) -} - -func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) { - minionPrefix := "/proxy/minion/" - if !strings.HasPrefix(req.URL.Path, minionPrefix) { - server.notFound(w, req) - return - } - - path := req.URL.Path[len(minionPrefix):] - rawQuery := req.URL.RawQuery - - // Expect path as: ${minion}/${query_to_minion} - // and query_to_minion can be any query that kubelet will accept. - // - // For example: - // To query stats of a minion or a pod or a container, - // path string can be ${minion}/stats// or - // ${minion}/podInfo?podID= - // - // To query logs on a minion, path string can be: - // ${minion}/logs/ - idx := strings.Index(path, "/") - minionHost := path[:idx] - _, port, _ := net.SplitHostPort(minionHost) - if port == "" { - // Couldn't retrieve port information - // TODO: Retrieve port info from a common object - minionHost += ":10250" - } - minionPath := path[idx:] - - minionURL := &url.URL{ - Scheme: "http", - Host: minionHost, - } - newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil) - if err != nil { - glog.Errorf("Failed to create request: %s", err) - } - - proxy := httputil.NewSingleHostReverseProxy(minionURL) - proxy.Transport = &minionTransport{} - proxy.ServeHTTP(w, newReq) -} - -type minionTransport struct{} - -func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) { - resp, err := http.DefaultTransport.RoundTrip(req) - - if err != nil && strings.Contains(err.Error(), "connection refused") { - message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host) - resp = &http.Response{ - StatusCode: http.StatusServiceUnavailable, - Body: ioutil.NopCloser(strings.NewReader(message)), - } - return resp, nil - } - - if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") { - // Do nothing, simply pass through - return resp, err - } - - resp, err = t.ProcessResponse(req, resp) - return resp, err -} - -func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) { - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - // copying the response body did not work - return nil, err - } - - bodyNode := &html.Node{ - Type: html.ElementNode, - Data: "body", - DataAtom: atom.Body, - } - nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode) - if err != nil { - glog.Errorf("Failed to found node: %v", err) - return resp, err - } - - // Define the method to traverse the doc tree and update href node to - // point to correct minion - var updateHRef func(*html.Node) - updateHRef = func(n *html.Node) { - if n.Type == html.ElementNode && n.Data == "a" { - for i, attr := range n.Attr { - if attr.Key == "href" { - Url := &url.URL{ - Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val, - } - n.Attr[i].Val = Url.String() - break - } - } - } - for c := n.FirstChild; c != nil; c = c.NextSibling { - updateHRef(c) - } - } - - newContent := &bytes.Buffer{} - for _, n := range nodes { - updateHRef(n) - err = html.Render(newContent, n) - if err != nil { - glog.Errorf("Failed to render: %v", err) - } - } - - resp.Body = ioutil.NopCloser(newContent) - // Update header node with new content-length - // TODO: Remove any hash/signature headers here? - resp.Header.Del("Content-Length") - resp.ContentLength = int64(newContent.Len()) - - return resp, err -} - -// HTTP Handler interface -func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { +// ServeHTTP implements the standard net/http interface. +func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer func() { if x := recover(); x != nil { w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, "apiserver panic. Look in log for details.") + fmt.Fprint(w, "apis panic. Look in log for details.") glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) } }() @@ -331,76 +115,177 @@ func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ).Log() // Dispatch via our mux. - server.mux.ServeHTTP(w, req) + s.mux.ServeHTTP(w, req) } -// ServeREST handles requests to all our RESTStorage objects. -func (server *APIServer) ServeREST(w http.ResponseWriter, req *http.Request) { - if !strings.HasPrefix(req.URL.Path, server.prefix) { - server.notFound(w, req) +// handleREST handles requests to all our RESTStorage objects. +func (s *APIServer) handleREST(w http.ResponseWriter, req *http.Request) { + if !strings.HasPrefix(req.URL.Path, s.prefix) { + notFound(w, req) return } - requestParts := strings.Split(req.URL.Path[len(server.prefix):], "/")[1:] + requestParts := strings.Split(req.URL.Path[len(s.prefix):], "/")[1:] if len(requestParts) < 1 { - server.notFound(w, req) + notFound(w, req) return } - storage := server.storage[requestParts[0]] + storage := s.storage[requestParts[0]] if storage == nil { httplog.LogOf(w).Addf("'%v' has no storage object", requestParts[0]) - server.notFound(w, req) + notFound(w, req) return } - server.handleREST(requestParts, req, w, storage) + s.handleRESTStorage(requestParts, req, w, storage) } -func (server *APIServer) notFound(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusNotFound) - fmt.Fprintf(w, "Not Found: %#v", req) -} +// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then +// on path length, according to the following table: +// Method Path Action +// GET /foo list +// GET /foo/bar get 'bar' +// POST /foo create +// PUT /foo/bar update 'bar' +// DELETE /foo/bar delete 'bar' +// Returns 404 if the method/pattern doesn't match one of these entries +// The s accepts several query parameters: +// sync=[false|true] Synchronous request (only applies to create, update, delete operations) +// timeout= Timeout for synchronous requests, only applies if sync=true +// labels= Used for filtering list operations +func (s *APIServer) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { + sync := req.URL.Query().Get("sync") == "true" + timeout := parseTimeout(req.URL.Query().Get("timeout")) + switch req.Method { + case "GET": + switch len(parts) { + case 1: + selector, err := labels.ParseSelector(req.URL.Query().Get("labels")) + if err != nil { + internalError(err, w) + return + } + list, err := storage.List(selector) + if err != nil { + internalError(err, w) + return + } + writeJSON(http.StatusOK, list, w) + case 2: + item, err := storage.Get(parts[1]) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + writeJSON(http.StatusOK, item, w) + default: + notFound(w, req) + } -// write writes an API object in wire format. -func (server *APIServer) write(statusCode int, object interface{}, w http.ResponseWriter) { - output, err := api.Encode(object) - if err != nil { - server.error(err, w) - return + case "POST": + if len(parts) != 1 { + notFound(w, req) + return + } + body, err := readBody(req) + if err != nil { + internalError(err, w) + return + } + obj, err := storage.Extract(body) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + out, err := storage.Create(obj) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + op := s.createOperation(out, sync, timeout) + s.finishReq(op, w) + + case "DELETE": + if len(parts) != 2 { + notFound(w, req) + return + } + out, err := storage.Delete(parts[1]) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + op := s.createOperation(out, sync, timeout) + s.finishReq(op, w) + + case "PUT": + if len(parts) != 2 { + notFound(w, req) + return + } + body, err := readBody(req) + if err != nil { + internalError(err, w) + return + } + obj, err := storage.Extract(body) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + out, err := storage.Update(obj) + if IsNotFound(err) { + notFound(w, req) + return + } + if err != nil { + internalError(err, w) + return + } + op := s.createOperation(out, sync, timeout) + s.finishReq(op, w) + + default: + notFound(w, req) } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) - w.Write(output) } -// writeRawJSON writes a non-API object in JSON. -func (server *APIServer) writeRawJSON(statusCode int, object interface{}, w http.ResponseWriter) { - output, err := json.Marshal(object) - if err != nil { - server.error(err, w) - return +// handleVersionReq writes the server's version information. +func handleVersion(w http.ResponseWriter, req *http.Request) { + writeRawJSON(http.StatusOK, version.Get(), w) +} + +// createOperation creates an operation to process a channel response +func (s *APIServer) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation { + op := s.ops.NewOperation(out) + if sync { + op.WaitFor(timeout) } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) - w.Write(output) -} - -func (server *APIServer) error(err error, w http.ResponseWriter) { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Error: %#v", err) -} - -func (server *APIServer) readBody(req *http.Request) ([]byte, error) { - defer req.Body.Close() - return ioutil.ReadAll(req.Body) + return op } // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an // Operation to receive the result and returning its ID down the writer. -func (server *APIServer) finishReq(out <-chan interface{}, sync bool, timeout time.Duration, w http.ResponseWriter) { - op := server.ops.NewOperation(out) - if sync { - op.WaitFor(timeout) - } +func (s *APIServer) finishReq(op *Operation, w http.ResponseWriter) { obj, complete := op.StatusOrResult() if complete { status := http.StatusOK @@ -415,12 +300,36 @@ func (server *APIServer) finishReq(out <-chan interface{}, sync bool, timeout ti status = stat.Code } } - server.write(status, obj, w) + writeJSON(status, obj, w) } else { - server.write(http.StatusAccepted, obj, w) + writeJSON(http.StatusAccepted, obj, w) } } +// writeJSON renders an object as JSON to the response +func writeJSON(statusCode int, object interface{}, w http.ResponseWriter) { + output, err := api.Encode(object) + if err != nil { + internalError(err, w) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + w.Write(output) +} + +// writeRawJSON writes a non-API object in JSON. +func writeRawJSON(statusCode int, object interface{}, w http.ResponseWriter) { + output, err := json.Marshal(object) + if err != nil { + internalError(err, w) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + w.Write(output) +} + func parseTimeout(str string) time.Duration { if str != "" { timeout, err := time.ParseDuration(str) @@ -432,289 +341,7 @@ func parseTimeout(str string) time.Duration { return 30 * time.Second } -// handleREST is the main dispatcher for the server. It switches on the HTTP method, and then -// on path length, according to the following table: -// Method Path Action -// GET /foo list -// GET /foo/bar get 'bar' -// POST /foo create -// PUT /foo/bar update 'bar' -// DELETE /foo/bar delete 'bar' -// Returns 404 if the method/pattern doesn't match one of these entries -// The server accepts several query parameters: -// sync=[false|true] Synchronous request (only applies to create, update, delete operations) -// timeout= Timeout for synchronous requests, only applies if sync=true -// labels= Used for filtering list operations -func (server *APIServer) handleREST(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { - sync := req.URL.Query().Get("sync") == "true" - timeout := parseTimeout(req.URL.Query().Get("timeout")) - switch req.Method { - case "GET": - switch len(parts) { - case 1: - selector, err := labels.ParseSelector(req.URL.Query().Get("labels")) - if err != nil { - server.error(err, w) - return - } - list, err := storage.List(selector) - if err != nil { - server.error(err, w) - return - } - server.write(http.StatusOK, list, w) - case 2: - item, err := storage.Get(parts[1]) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - server.write(http.StatusOK, item, w) - default: - server.notFound(w, req) - } - case "POST": - if len(parts) != 1 { - server.notFound(w, req) - return - } - body, err := server.readBody(req) - if err != nil { - server.error(err, w) - return - } - obj, err := storage.Extract(body) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - out, err := storage.Create(obj) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - server.finishReq(out, sync, timeout, w) - case "DELETE": - if len(parts) != 2 { - server.notFound(w, req) - return - } - out, err := storage.Delete(parts[1]) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - server.finishReq(out, sync, timeout, w) - case "PUT": - if len(parts) != 2 { - server.notFound(w, req) - return - } - body, err := server.readBody(req) - if err != nil { - server.error(err, w) - return - } - obj, err := storage.Extract(body) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - out, err := storage.Update(obj) - if IsNotFound(err) { - server.notFound(w, req) - return - } - if err != nil { - server.error(err, w) - return - } - server.finishReq(out, sync, timeout, w) - default: - server.notFound(w, req) - } -} - -func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) { - opPrefix := server.operationPrefix() - if !strings.HasPrefix(req.URL.Path, opPrefix) { - server.notFound(w, req) - return - } - trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/") - parts := strings.Split(trimmed, "/") - if len(parts) > 1 { - server.notFound(w, req) - return - } - if req.Method != "GET" { - server.notFound(w, req) - return - } - if len(parts) == 0 { - // List outstanding operations. - list := server.ops.List() - server.write(http.StatusOK, list, w) - return - } - - op := server.ops.Get(parts[0]) - if op == nil { - server.notFound(w, req) - return - } - - obj, complete := op.StatusOrResult() - if complete { - server.write(http.StatusOK, obj, w) - } else { - server.write(http.StatusAccepted, obj, w) - } -} - -func (server *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { - prefix := server.watchPrefix() - if !strings.HasPrefix(req.URL.Path, prefix) { - server.notFound(w, req) - return - } - parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:] - if req.Method != "GET" || len(parts) < 1 { - server.notFound(w, req) - } - storage := server.storage[parts[0]] - if storage == nil { - server.notFound(w, req) - } - if watcher, ok := storage.(ResourceWatcher); ok { - var watching watch.Interface - var err error - if id := req.URL.Query().Get("id"); id != "" { - watching, err = watcher.WatchSingle(id) - } else { - watching, err = watcher.WatchAll() - } - if err != nil { - server.error(err, w) - return - } - - // TODO: This is one watch per connection. We want to multiplex, so that - // multiple watches of the same thing don't create two watches downstream. - watchServer := &WatchServer{watching} - if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { - websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) - } else { - watchServer.ServeHTTP(w, req) - } - return - } - - server.notFound(w, req) -} - -// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. -type WatchServer struct { - watching watch.Interface -} - -// HandleWS implements a websocket handler. -func (w *WatchServer) HandleWS(ws *websocket.Conn) { - done := make(chan struct{}) - go func() { - var unused interface{} - // Expect this to block until the connection is closed. Client should not - // send anything. - websocket.JSON.Receive(ws, &unused) - close(done) - }() - for { - select { - case <-done: - w.watching.Stop() - return - case event, ok := <-w.watching.ResultChan(): - if !ok { - // End of results. - return - } - err := websocket.JSON.Send(ws, &api.WatchEvent{ - Type: event.Type, - Object: api.APIObject{event.Object}, - }) - if err != nil { - // Client disconnect. - w.watching.Stop() - return - } - } - } -} - -// ServeHTTP serves a series of JSON encoded events via straight HTTP with -// Transfer-Encoding: chunked. -func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - loggedW := httplog.LogOf(w) - w = httplog.Unlogged(w) - - cn, ok := w.(http.CloseNotifier) - if !ok { - loggedW.Addf("unable to get CloseNotifier") - http.NotFound(loggedW, req) - return - } - flusher, ok := w.(http.Flusher) - if !ok { - loggedW.Addf("unable to get Flusher") - http.NotFound(loggedW, req) - return - } - - loggedW.Header().Set("Transfer-Encoding", "chunked") - loggedW.WriteHeader(http.StatusOK) - flusher.Flush() - - encoder := json.NewEncoder(w) - for { - select { - case <-cn.CloseNotify(): - self.watching.Stop() - return - case event, ok := <-self.watching.ResultChan(): - if !ok { - // End of results. - return - } - err := encoder.Encode(&api.WatchEvent{ - Type: event.Type, - Object: api.APIObject{event.Object}, - }) - if err != nil { - // Client disconnect. - self.watching.Stop() - return - } - flusher.Flush() - } - } +func readBody(req *http.Request) ([]byte, error) { + defer req.Body.Close() + return ioutil.ReadAll(req.Body) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index e2d57840868..65547c46b01 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -18,20 +18,16 @@ package apiserver import ( "bytes" - "encoding/json" "fmt" "io/ioutil" "log" "net/http" "net/http/httptest" - "net/url" "reflect" - "strings" "sync" "testing" "time" - "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -537,202 +533,3 @@ func TestSyncCreateTimeout(t *testing.T) { t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response) } } - -func TestOpGet(t *testing.T) { - simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ - "foo": simpleStorage, - }, "/prefix/version") - server := httptest.NewServer(handler) - client := http.Client{} - - simple := Simple{ - Name: "foo", - } - data, err := api.Encode(simple) - t.Log(string(data)) - expectNoError(t, err) - request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data)) - expectNoError(t, err) - response, err := client.Do(request) - expectNoError(t, err) - if response.StatusCode != http.StatusAccepted { - t.Errorf("Unexpected response %#v", response) - } - - var itemOut api.Status - body, err := extractBody(response, &itemOut) - expectNoError(t, err) - if itemOut.Status != api.StatusWorking || itemOut.Details == "" { - t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body)) - } - - req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details, nil) - expectNoError(t, err) - _, err = client.Do(req2) - expectNoError(t, err) - if response.StatusCode != http.StatusAccepted { - t.Errorf("Unexpected response %#v", response) - } -} - -var watchTestTable = []struct { - t watch.EventType - obj interface{} -}{ - {watch.Added, &Simple{Name: "A Name"}}, - {watch.Modified, &Simple{Name: "Another Name"}}, - {watch.Deleted, &Simple{Name: "Another Name"}}, -} - -func TestWatchWebsocket(t *testing.T) { - simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ - "foo": simpleStorage, - }, "/prefix/version") - server := httptest.NewServer(handler) - - dest, _ := url.Parse(server.URL) - dest.Scheme = "ws" // Required by websocket, though the server never sees it. - dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" - - ws, err := websocket.Dial(dest.String(), "", "http://localhost") - expectNoError(t, err) - - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - - try := func(action watch.EventType, object interface{}) { - // Send - simpleStorage.fakeWatch.Action(action, object) - // Test receive - var got api.WatchEvent - err := websocket.JSON.Receive(ws, &got) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if got.Type != action { - t.Errorf("Unexpected type: %v", got.Type) - } - if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - } - - for _, item := range watchTestTable { - try(item.t, item.obj) - } - simpleStorage.fakeWatch.Stop() - - var got api.WatchEvent - err = websocket.JSON.Receive(ws, &got) - if err == nil { - t.Errorf("Unexpected non-error") - } -} - -func TestWatchHTTP(t *testing.T) { - simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ - "foo": simpleStorage, - }, "/prefix/version") - server := httptest.NewServer(handler) - client := http.Client{} - - dest, _ := url.Parse(server.URL) - dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" - - request, err := http.NewRequest("GET", dest.String(), nil) - expectNoError(t, err) - response, err := client.Do(request) - expectNoError(t, err) - if response.StatusCode != http.StatusOK { - t.Errorf("Unexpected response %#v", response) - } - - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - - decoder := json.NewDecoder(response.Body) - - try := func(action watch.EventType, object interface{}) { - // Send - simpleStorage.fakeWatch.Action(action, object) - // Test receive - var got api.WatchEvent - err := decoder.Decode(&got) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if got.Type != action { - t.Errorf("Unexpected type: %v", got.Type) - } - if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - } - - for _, item := range watchTestTable { - try(item.t, item.obj) - } - simpleStorage.fakeWatch.Stop() - - var got api.WatchEvent - err = decoder.Decode(&got) - if err == nil { - t.Errorf("Unexpected non-error") - } -} - -func TestMinionTransport(t *testing.T) { - content := string(`
kubelet.loggoogle.log
`) - transport := &minionTransport{} - - // Test /logs/ - request := &http.Request{ - Method: "GET", - URL: &url.URL{ - Scheme: "http", - Host: "minion1:10250", - Path: "/logs/", - }, - } - response := &http.Response{ - Status: "200 OK", - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(content)), - Close: true, - } - updated_resp, _ := transport.ProcessResponse(request, response) - body, _ := ioutil.ReadAll(updated_resp.Body) - expected := string(`
kubelet.loggoogle.log
`) - if !strings.Contains(string(body), expected) { - t.Errorf("Received wrong content: %s", string(body)) - } - - // Test subdir under /logs/ - request = &http.Request{ - Method: "GET", - URL: &url.URL{ - Scheme: "http", - Host: "minion1:8080", - Path: "/whatever/apt/", - }, - } - response = &http.Response{ - Status: "200 OK", - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(content)), - Close: true, - } - updated_resp, _ = transport.ProcessResponse(request, response) - body, _ = ioutil.ReadAll(updated_resp.Body) - expected = string(`
kubelet.loggoogle.log
`) - if !strings.Contains(string(body), expected) { - t.Errorf("Received wrong content: %s", string(body)) - } -} diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go new file mode 100644 index 00000000000..70b190354b0 --- /dev/null +++ b/pkg/apiserver/async.go @@ -0,0 +1,59 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/http" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// WorkFunc is used to perform any time consuming work for an api call, after +// the input has been validated. Pass one of these to MakeAsync to create an +// appropriate return value for the Update, Delete, and Create methods. +type WorkFunc func() (result interface{}, err error) + +// MakeAsync takes a function and executes it, delivering the result in the way required +// by RESTStorage's Update, Delete, and Create methods. +func MakeAsync(fn WorkFunc) <-chan interface{} { + channel := make(chan interface{}) + go func() { + defer util.HandleCrash() + obj, err := fn() + if err != nil { + status := http.StatusInternalServerError + switch { + case tools.IsEtcdConflict(err): + status = http.StatusConflict + } + channel <- &api.Status{ + Status: api.StatusFailure, + Details: err.Error(), + Code: status, + } + } else { + channel <- obj + } + // 'close' is used to signal that no further values will + // be written to the channel. Not strictly necessary, but + // also won't hurt. + close(channel) + }() + return channel +} diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go new file mode 100644 index 00000000000..34bf293038a --- /dev/null +++ b/pkg/apiserver/errors.go @@ -0,0 +1,34 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "net/http" +) + +// internalError renders a generic error to the response +func internalError(err error, w http.ResponseWriter) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Error: %#v", err) +} + +// notFound renders a simple not found error +func notFound(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + fmt.Fprintf(w, "Not Found: %#v", req.RequestURI) +} diff --git a/pkg/apiserver/index.go b/pkg/apiserver/index.go new file mode 100644 index 00000000000..7c682209caa --- /dev/null +++ b/pkg/apiserver/index.go @@ -0,0 +1,34 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "net/http" +) + +// handleIndex is the root index page for Kubernetes +func handleIndex(w http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/" && req.URL.Path != "/index.html" { + notFound(w, req) + return + } + w.WriteHeader(http.StatusOK) + // TODO: serve this out of a file? + data := "Welcome to Kubernetes" + fmt.Fprint(w, data) +} diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go new file mode 100644 index 00000000000..c268d532a69 --- /dev/null +++ b/pkg/apiserver/interfaces.go @@ -0,0 +1,48 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// RESTStorage is a generic interface for RESTful storage services +// Resources which are exported to the RESTful API of apiserver need to implement this interface. +type RESTStorage interface { + // List selects resources in the storage which match to the selector. + List(labels.Selector) (interface{}, error) + + // Get finds a resource in the storage by id and returns it. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + Get(id string) (interface{}, error) + + // Delete finds a resource in the storage and deletes it. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. + Delete(id string) (<-chan interface{}, error) + + Extract(body []byte) (interface{}, error) + Create(interface{}) (<-chan interface{}, error) + Update(interface{}) (<-chan interface{}, error) +} + +// ResourceWatcher should be implemented by all RESTStorage objects that +// want to offer the ability to watch for changes through the watch api. +type ResourceWatcher interface { + WatchAll() (watch.Interface, error) + WatchSingle(id string) (watch.Interface, error) +} diff --git a/pkg/apiserver/minionproxy.go b/pkg/apiserver/minionproxy.go new file mode 100644 index 00000000000..87e3fc4bbab --- /dev/null +++ b/pkg/apiserver/minionproxy.go @@ -0,0 +1,155 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + + "code.google.com/p/go.net/html" + "code.google.com/p/go.net/html/atom" + "github.com/golang/glog" +) + +func (server *APIServer) handleProxyMinion(w http.ResponseWriter, req *http.Request) { + minionPrefix := "/proxy/minion/" + if !strings.HasPrefix(req.URL.Path, minionPrefix) { + notFound(w, req) + return + } + + path := req.URL.Path[len(minionPrefix):] + rawQuery := req.URL.RawQuery + + // Expect path as: ${minion}/${query_to_minion} + // and query_to_minion can be any query that kubelet will accept. + // + // For example: + // To query stats of a minion or a pod or a container, + // path string can be ${minion}/stats// or + // ${minion}/podInfo?podID= + // + // To query logs on a minion, path string can be: + // ${minion}/logs/ + idx := strings.Index(path, "/") + minionHost := path[:idx] + _, port, _ := net.SplitHostPort(minionHost) + if port == "" { + // Couldn't retrieve port information + // TODO: Retrieve port info from a common object + minionHost += ":10250" + } + minionPath := path[idx:] + + minionURL := &url.URL{ + Scheme: "http", + Host: minionHost, + } + newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil) + if err != nil { + glog.Errorf("Failed to create request: %s", err) + } + + proxy := httputil.NewSingleHostReverseProxy(minionURL) + proxy.Transport = &minionTransport{} + proxy.ServeHTTP(w, newReq) +} + +type minionTransport struct{} + +func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := http.DefaultTransport.RoundTrip(req) + + if err != nil && strings.Contains(err.Error(), "connection refused") { + message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host) + resp = &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: ioutil.NopCloser(strings.NewReader(message)), + } + return resp, nil + } + + if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") { + // Do nothing, simply pass through + return resp, err + } + + resp, err = t.ProcessResponse(req, resp) + return resp, err +} + +func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + // copying the response body did not work + return nil, err + } + + bodyNode := &html.Node{ + Type: html.ElementNode, + Data: "body", + DataAtom: atom.Body, + } + nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode) + if err != nil { + glog.Errorf("Failed to found node: %v", err) + return resp, err + } + + // Define the method to traverse the doc tree and update href node to + // point to correct minion + var updateHRef func(*html.Node) + updateHRef = func(n *html.Node) { + if n.Type == html.ElementNode && n.Data == "a" { + for i, attr := range n.Attr { + if attr.Key == "href" { + Url := &url.URL{ + Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val, + } + n.Attr[i].Val = Url.String() + break + } + } + } + for c := n.FirstChild; c != nil; c = c.NextSibling { + updateHRef(c) + } + } + + newContent := &bytes.Buffer{} + for _, n := range nodes { + updateHRef(n) + err = html.Render(newContent, n) + if err != nil { + glog.Errorf("Failed to render: %v", err) + } + } + + resp.Body = ioutil.NopCloser(newContent) + // Update header node with new content-length + // TODO: Remove any hash/signature headers here? + resp.Header.Del("Content-Length") + resp.ContentLength = int64(newContent.Len()) + + return resp, err +} diff --git a/pkg/apiserver/minionproxy_test.go b/pkg/apiserver/minionproxy_test.go new file mode 100644 index 00000000000..e7bed930f63 --- /dev/null +++ b/pkg/apiserver/minionproxy_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "io/ioutil" + "net/http" + "net/url" + "strings" + "testing" +) + +func TestMinionTransport(t *testing.T) { + content := string(`
kubelet.loggoogle.log
`) + transport := &minionTransport{} + + // Test /logs/ + request := &http.Request{ + Method: "GET", + URL: &url.URL{ + Scheme: "http", + Host: "minion1:10250", + Path: "/logs/", + }, + } + response := &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ := transport.ProcessResponse(request, response) + body, _ := ioutil.ReadAll(updated_resp.Body) + expected := string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } + + // Test subdir under /logs/ + request = &http.Request{ + Method: "GET", + URL: &url.URL{ + Scheme: "http", + Host: "minion1:8080", + Path: "/whatever/apt/", + }, + } + response = &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ = transport.ProcessResponse(request, response) + body, _ = ioutil.ReadAll(updated_resp.Body) + expected = string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } +} diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go index 7b88531d214..90474adeb7f 100644 --- a/pkg/apiserver/operation.go +++ b/pkg/apiserver/operation.go @@ -17,8 +17,11 @@ limitations under the License. package apiserver import ( + "net/http" + "path" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -27,6 +30,47 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +func (s *APIServer) operationPrefix() string { + return path.Join(s.prefix, "operations") +} + +func (s *APIServer) handleOperation(w http.ResponseWriter, req *http.Request) { + opPrefix := s.operationPrefix() + if !strings.HasPrefix(req.URL.Path, opPrefix) { + notFound(w, req) + return + } + trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/") + parts := strings.Split(trimmed, "/") + if len(parts) > 1 { + notFound(w, req) + return + } + if req.Method != "GET" { + notFound(w, req) + return + } + if len(parts) == 0 { + // List outstanding operations. + list := s.ops.List() + writeJSON(http.StatusOK, list, w) + return + } + + op := s.ops.Get(parts[0]) + if op == nil { + notFound(w, req) + return + } + + obj, complete := op.StatusOrResult() + if complete { + writeJSON(http.StatusOK, obj, w) + } else { + writeJSON(http.StatusAccepted, obj, w) + } +} + // Operation represents an ongoing action which the server is performing. type Operation struct { ID string diff --git a/pkg/apiserver/operation_test.go b/pkg/apiserver/operation_test.go index 35b06ab1547..a828dfab796 100644 --- a/pkg/apiserver/operation_test.go +++ b/pkg/apiserver/operation_test.go @@ -17,9 +17,14 @@ limitations under the License. package apiserver import ( + "bytes" + "net/http" + "net/http/httptest" "sync/atomic" "testing" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) func TestOperation(t *testing.T) { @@ -84,3 +89,41 @@ func TestOperation(t *testing.T) { t.Errorf("Got unexpected result: %#v", op.result) } } + +func TestOpGet(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + simple := Simple{ + Name: "foo", + } + data, err := api.Encode(simple) + t.Log(string(data)) + expectNoError(t, err) + request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data)) + expectNoError(t, err) + response, err := client.Do(request) + expectNoError(t, err) + if response.StatusCode != http.StatusAccepted { + t.Errorf("Unexpected response %#v", response) + } + + var itemOut api.Status + body, err := extractBody(response, &itemOut) + expectNoError(t, err) + if itemOut.Status != api.StatusWorking || itemOut.Details == "" { + t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body)) + } + + req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details, nil) + expectNoError(t, err) + _, err = client.Do(req2) + expectNoError(t, err) + if response.StatusCode != http.StatusAccepted { + t.Errorf("Unexpected response %#v", response) + } +} diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go new file mode 100644 index 00000000000..b2f1cc1853b --- /dev/null +++ b/pkg/apiserver/watch.go @@ -0,0 +1,161 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "encoding/json" + "net/http" + "path" + "strings" + + "code.google.com/p/go.net/websocket" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func (s *APIServer) watchPrefix() string { + return path.Join(s.prefix, "watch") +} + +// handleWatch processes a watch request +func (s *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { + prefix := s.watchPrefix() + if !strings.HasPrefix(req.URL.Path, prefix) { + notFound(w, req) + return + } + parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:] + if req.Method != "GET" || len(parts) < 1 { + notFound(w, req) + } + storage := s.storage[parts[0]] + if storage == nil { + notFound(w, req) + } + if watcher, ok := storage.(ResourceWatcher); ok { + var watching watch.Interface + var err error + if id := req.URL.Query().Get("id"); id != "" { + watching, err = watcher.WatchSingle(id) + } else { + watching, err = watcher.WatchAll() + } + if err != nil { + internalError(err, w) + return + } + + // TODO: This is one watch per connection. We want to multiplex, so that + // multiple watches of the same thing don't create two watches downstream. + watchServer := &WatchServer{watching} + if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { + websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) + } else { + watchServer.ServeHTTP(w, req) + } + return + } + + notFound(w, req) +} + +// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. +type WatchServer struct { + watching watch.Interface +} + +// HandleWS implements a websocket handler. +func (w *WatchServer) HandleWS(ws *websocket.Conn) { + done := make(chan struct{}) + go func() { + var unused interface{} + // Expect this to block until the connection is closed. Client should not + // send anything. + websocket.JSON.Receive(ws, &unused) + close(done) + }() + for { + select { + case <-done: + w.watching.Stop() + return + case event, ok := <-w.watching.ResultChan(): + if !ok { + // End of results. + return + } + err := websocket.JSON.Send(ws, &api.WatchEvent{ + Type: event.Type, + Object: api.APIObject{event.Object}, + }) + if err != nil { + // Client disconnect. + w.watching.Stop() + return + } + } + } +} + +// ServeHTTP serves a series of JSON encoded events via straight HTTP with +// Transfer-Encoding: chunked. +func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + loggedW := httplog.LogOf(w) + w = httplog.Unlogged(w) + + cn, ok := w.(http.CloseNotifier) + if !ok { + loggedW.Addf("unable to get CloseNotifier") + http.NotFound(loggedW, req) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + loggedW.Addf("unable to get Flusher") + http.NotFound(loggedW, req) + return + } + + loggedW.Header().Set("Transfer-Encoding", "chunked") + loggedW.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := json.NewEncoder(w) + for { + select { + case <-cn.CloseNotify(): + self.watching.Stop() + return + case event, ok := <-self.watching.ResultChan(): + if !ok { + // End of results. + return + } + err := encoder.Encode(&api.WatchEvent{ + Type: event.Type, + Object: api.APIObject{event.Object}, + }) + if err != nil { + // Client disconnect. + self.watching.Stop() + return + } + flusher.Flush() + } + } +} diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go new file mode 100644 index 00000000000..09fdcfa6241 --- /dev/null +++ b/pkg/apiserver/watch_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + + "code.google.com/p/go.net/websocket" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +var watchTestTable = []struct { + t watch.EventType + obj interface{} +}{ + {watch.Added, &Simple{Name: "A Name"}}, + {watch.Modified, &Simple{Name: "Another Name"}}, + {watch.Deleted, &Simple{Name: "Another Name"}}, +} + +func TestWatchWebsocket(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Scheme = "ws" // Required by websocket, though the server never sees it. + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + ws, err := websocket.Dial(dest.String(), "", "http://localhost") + expectNoError(t, err) + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := websocket.JSON.Receive(ws, &got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = websocket.JSON.Receive(ws, &got) + if err == nil { + t.Errorf("Unexpected non-error") + } +} + +func TestWatchHTTP(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + request, err := http.NewRequest("GET", dest.String(), nil) + expectNoError(t, err) + response, err := client.Do(request) + expectNoError(t, err) + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + decoder := json.NewDecoder(response.Body) + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := decoder.Decode(&got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = decoder.Decode(&got) + if err == nil { + t.Errorf("Unexpected non-error") + } +}