mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-15 06:01:50 +00:00
Move watch to being a resthandler resource and expose it on LIST
GET /pods?watch=true&resourceVersion=10 will now function equivalent to GET /watch/pods.
This commit is contained in:
@@ -17,157 +17,41 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
// TODO: convert me to resthandler custom verb
|
||||
type WatchHandler struct {
|
||||
storage map[string]rest.Storage
|
||||
mapper meta.RESTMapper
|
||||
convertor runtime.ObjectConvertor
|
||||
codec runtime.Codec
|
||||
linker runtime.SelfLinker
|
||||
info *APIRequestInfoResolver
|
||||
}
|
||||
|
||||
// setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type.
|
||||
func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error {
|
||||
name, err := h.linker.Name(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newURL := *req.URL
|
||||
newURL.Path = path.Join(req.URL.Path, name)
|
||||
newURL.RawQuery = ""
|
||||
newURL.Fragment = ""
|
||||
return h.linker.SetSelfLink(obj, newURL.String())
|
||||
}
|
||||
|
||||
var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
|
||||
|
||||
func isWebsocketRequest(req *http.Request) bool {
|
||||
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
|
||||
}
|
||||
|
||||
// ServeHTTP processes watch requests.
|
||||
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
var verb string
|
||||
var apiResource string
|
||||
var httpCode int
|
||||
reqStart := time.Now()
|
||||
defer monitor("watch", &verb, &apiResource, &httpCode, reqStart)
|
||||
|
||||
if req.Method != "GET" {
|
||||
httpCode = errorJSON(errors.NewBadRequest(
|
||||
fmt.Sprintf("unsupported method for watch: %s", req.Method)), h.codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
requestInfo, err := h.info.GetAPIRequestInfo(req)
|
||||
if err != nil {
|
||||
httpCode = errorJSON(errors.NewBadRequest(
|
||||
fmt.Sprintf("failed to find api request info: %s", err.Error())), h.codec, w)
|
||||
return
|
||||
}
|
||||
verb = requestInfo.Verb
|
||||
ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace)
|
||||
|
||||
storage := h.storage[requestInfo.Resource]
|
||||
if storage == nil {
|
||||
httpCode = errorJSON(errors.NewNotFound(requestInfo.Resource, "Resource"), h.codec, w)
|
||||
return
|
||||
}
|
||||
apiResource = requestInfo.Resource
|
||||
watcher, ok := storage.(rest.Watcher)
|
||||
if !ok {
|
||||
httpCode = errorJSON(errors.NewMethodNotSupported(requestInfo.Resource, "watch"), h.codec, w)
|
||||
return
|
||||
}
|
||||
kind := requestInfo.Kind
|
||||
if len(kind) == 0 {
|
||||
if _, kind, err = h.mapper.VersionAndKindForResource(apiResource); err != nil {
|
||||
glog.Errorf("No kind found for %s: %v", apiResource, err)
|
||||
}
|
||||
}
|
||||
|
||||
scope := RequestScope{
|
||||
Convertor: h.convertor,
|
||||
Kind: kind,
|
||||
Resource: apiResource,
|
||||
APIVersion: requestInfo.APIVersion,
|
||||
// TODO: this must be parameterized per version, and is incorrect for implementors
|
||||
// outside of Kubernetes. Fix by refactoring watch under resthandler as a custome
|
||||
// resource.
|
||||
ServerAPIVersion: requestInfo.APIVersion,
|
||||
}
|
||||
label, field, err := parseSelectorQueryParams(req.URL.Query(), scope)
|
||||
if err != nil {
|
||||
httpCode = errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
resourceVersion := req.URL.Query().Get("resourceVersion")
|
||||
watching, err := watcher.Watch(ctx, label, field, resourceVersion)
|
||||
if err != nil {
|
||||
httpCode = errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
httpCode = http.StatusOK
|
||||
|
||||
// TODO: This is one watch per connection. We want to multiplex, so that
|
||||
// multiple watches of the same thing don't create two watches downstream.
|
||||
watchServer := &WatchServer{watching, h.codec, func(obj runtime.Object) {
|
||||
if err := h.setSelfLinkAddName(obj, req); err != nil {
|
||||
glog.Errorf("Failed to set self link for object %#v", obj)
|
||||
// serveWatch handles serving requests to the server
|
||||
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) {
|
||||
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
||||
}
|
||||
}}
|
||||
if isWebsocketRequest(req) {
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req)
|
||||
if isWebsocketRequest(req.Request) {
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request)
|
||||
} else {
|
||||
watchServer.ServeHTTP(w, req)
|
||||
watchServer.ServeHTTP(w, req.Request)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: remove when watcher is refactored to fit under api_installer
|
||||
func parseSelectorQueryParams(query url.Values, scope RequestScope) (label labels.Selector, field fields.Selector, err error) {
|
||||
labelString := query.Get(api.LabelSelectorQueryParam(scope.ServerAPIVersion))
|
||||
label, err = labels.Parse(labelString)
|
||||
if err != nil {
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'labels' selector parameter (%s) could not be parsed: %v", labelString, err))
|
||||
}
|
||||
|
||||
fn := func(label, value string) (newLabel, newValue string, err error) {
|
||||
return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value)
|
||||
}
|
||||
fieldString := query.Get(api.FieldSelectorQueryParam(scope.ServerAPIVersion))
|
||||
field, err = fields.ParseAndTransformSelector(fieldString, fn)
|
||||
if err != nil {
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'fields' selector parameter (%s) could not be parsed: %v", fieldString, err))
|
||||
}
|
||||
glog.Infof("Found %#v %#v from %v in scope %#v", label, field, query, scope)
|
||||
return label, field, nil
|
||||
}
|
||||
|
||||
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
|
||||
type WatchServer struct {
|
||||
watching watch.Interface
|
||||
|
Reference in New Issue
Block a user