diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 91995da18c6..8448517d61e 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -21,18 +21,15 @@ import ( "fmt" "io/ioutil" "net/http" - "path" "runtime/debug" "strings" "time" - "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -90,8 +87,8 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer { s.mux.HandleFunc("/", handleIndex) // Handle both operations and operations/* with the same handler - s.mux.HandleFunc(s.operationPrefix(), s.handleOperationRequest) - s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperationRequest) + s.mux.HandleFunc(s.operationPrefix(), s.handleOperation) + s.mux.HandleFunc(s.operationPrefix()+"/", s.handleOperation) // Proxy minion requests s.mux.HandleFunc("/proxy/minion/", s.handleProxyMinion) @@ -309,102 +306,15 @@ func (s *APIServer) finishReq(op *Operation, w http.ResponseWriter) { } } -func (s *APIServer) operationPrefix() string { - return path.Join(s.prefix, "operations") -} - -func (s *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) { - opPrefix := s.operationPrefix() - if !strings.HasPrefix(req.URL.Path, opPrefix) { - notFound(w, req) - return - } - trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/") - parts := strings.Split(trimmed, "/") - if len(parts) > 1 { - notFound(w, req) - return - } - if req.Method != "GET" { - notFound(w, req) - return - } - if len(parts) == 0 { - // List outstanding operations. - list := s.ops.List() - writeJSON(http.StatusOK, list, w) - return - } - - op := s.ops.Get(parts[0]) - if op == nil { - notFound(w, req) - return - } - - obj, complete := op.StatusOrResult() - if complete { - writeJSON(http.StatusOK, obj, w) - } else { - writeJSON(http.StatusAccepted, obj, w) - } -} - -func (s *APIServer) watchPrefix() string { - return path.Join(s.prefix, "watch") -} - -// handleWatch processes a watch request -func (s *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { - prefix := s.watchPrefix() - if !strings.HasPrefix(req.URL.Path, prefix) { - notFound(w, req) - return - } - parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:] - if req.Method != "GET" || len(parts) < 1 { - notFound(w, req) - } - storage := s.storage[parts[0]] - if storage == nil { - notFound(w, req) - } - if watcher, ok := storage.(ResourceWatcher); ok { - var watching watch.Interface - var err error - if id := req.URL.Query().Get("id"); id != "" { - watching, err = watcher.WatchSingle(id) - } else { - watching, err = watcher.WatchAll() - } - if err != nil { - internalError(err, w) - return - } - - // TODO: This is one watch per connection. We want to multiplex, so that - // multiple watches of the same thing don't create two watches downstream. - watchServer := &WatchServer{watching} - if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { - websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) - } else { - watchServer.ServeHTTP(w, req) - } - return - } - - notFound(w, req) -} - // writeJSON renders an object as JSON to the response func writeJSON(statusCode int, object interface{}, w http.ResponseWriter) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) output, err := api.Encode(object) if err != nil { internalError(err, w) return } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) w.Write(output) } diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go index 7b88531d214..90474adeb7f 100644 --- a/pkg/apiserver/operation.go +++ b/pkg/apiserver/operation.go @@ -17,8 +17,11 @@ limitations under the License. package apiserver import ( + "net/http" + "path" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -27,6 +30,47 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +func (s *APIServer) operationPrefix() string { + return path.Join(s.prefix, "operations") +} + +func (s *APIServer) handleOperation(w http.ResponseWriter, req *http.Request) { + opPrefix := s.operationPrefix() + if !strings.HasPrefix(req.URL.Path, opPrefix) { + notFound(w, req) + return + } + trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/") + parts := strings.Split(trimmed, "/") + if len(parts) > 1 { + notFound(w, req) + return + } + if req.Method != "GET" { + notFound(w, req) + return + } + if len(parts) == 0 { + // List outstanding operations. + list := s.ops.List() + writeJSON(http.StatusOK, list, w) + return + } + + op := s.ops.Get(parts[0]) + if op == nil { + notFound(w, req) + return + } + + obj, complete := op.StatusOrResult() + if complete { + writeJSON(http.StatusOK, obj, w) + } else { + writeJSON(http.StatusAccepted, obj, w) + } +} + // Operation represents an ongoing action which the server is performing. type Operation struct { ID string diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 3218a54380d..b2f1cc1853b 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -19,6 +19,8 @@ package apiserver import ( "encoding/json" "net/http" + "path" + "strings" "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -26,6 +28,52 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +func (s *APIServer) watchPrefix() string { + return path.Join(s.prefix, "watch") +} + +// handleWatch processes a watch request +func (s *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { + prefix := s.watchPrefix() + if !strings.HasPrefix(req.URL.Path, prefix) { + notFound(w, req) + return + } + parts := strings.Split(req.URL.Path[len(prefix):], "/")[1:] + if req.Method != "GET" || len(parts) < 1 { + notFound(w, req) + } + storage := s.storage[parts[0]] + if storage == nil { + notFound(w, req) + } + if watcher, ok := storage.(ResourceWatcher); ok { + var watching watch.Interface + var err error + if id := req.URL.Query().Get("id"); id != "" { + watching, err = watcher.WatchSingle(id) + } else { + watching, err = watcher.WatchAll() + } + if err != nil { + internalError(err, w) + return + } + + // TODO: This is one watch per connection. We want to multiplex, so that + // multiple watches of the same thing don't create two watches downstream. + watchServer := &WatchServer{watching} + if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { + websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) + } else { + watchServer.ServeHTTP(w, req) + } + return + } + + notFound(w, req) +} + // WatchServer serves a watch.Interface over a websocket or vanilla HTTP. type WatchServer struct { watching watch.Interface