diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index f3b175b9251..2d16448d306 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -20,8 +20,10 @@ package main import ( "fmt" + "math/rand" "os" "runtime" + "time" "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -32,6 +34,8 @@ import ( func main() { runtime.GOMAXPROCS(runtime.NumCPU()) + rand.Seed(time.Now().UTC().UnixNano()) + s := app.NewAPIServer() s.AddFlags(pflag.CommandLine) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 3939b6711a6..a70ca1ded1a 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -46,6 +46,12 @@ import ( "github.com/spf13/pflag" ) +const ( + // Maximum duration before timing out read/write requests + // Set to a value larger than the timeouts in each watch server. + ReadWriteTimeout = time.Minute * 60 +) + // APIServer runs a kubernetes api server. type APIServer struct { InsecureBindAddress util.IP @@ -393,8 +399,8 @@ func (s *APIServer) Run(_ []string) error { readOnlyServer := &http.Server{ Addr: roLocation, Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler)))), - ReadTimeout: 5 * time.Minute, - WriteTimeout: 5 * time.Minute, + ReadTimeout: ReadWriteTimeout, + WriteTimeout: ReadWriteTimeout, MaxHeaderBytes: 1 << 20, } glog.Infof("Serving read-only insecurely on %s", roLocation) @@ -413,8 +419,8 @@ func (s *APIServer) Run(_ []string) error { secureServer := &http.Server{ Addr: secureLocation, Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(m.Handler)), - ReadTimeout: 5 * time.Minute, - WriteTimeout: 5 * time.Minute, + ReadTimeout: ReadWriteTimeout, + WriteTimeout: ReadWriteTimeout, MaxHeaderBytes: 1 << 20, TLSConfig: &tls.Config{ // Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability) @@ -454,12 +460,11 @@ func (s *APIServer) Run(_ []string) error { } }() } - http := &http.Server{ Addr: insecureLocation, Handler: apiserver.RecoverPanics(m.InsecureHandler), - ReadTimeout: 5 * time.Minute, - WriteTimeout: 5 * time.Minute, + ReadTimeout: ReadWriteTimeout, + WriteTimeout: ReadWriteTimeout, MaxHeaderBytes: 1 << 20, } glog.Infof("Serving insecurely on %s", insecureLocation) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 68e81092608..61d39d91ba2 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -132,6 +132,14 @@ type APIGroupVersion struct { Context api.RequestContextMapper } +// TODO: Pipe these in through the apiserver cmd line +const ( + // Minimum duration before timing out read/write requests + MinTimeoutSecs = 300 + // Maximum duration before timing out read/write requests + MaxTimeoutSecs = 600 +) + // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. A restful WebService is created for the group and version. diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index b554f3047e1..83c171b7b03 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -17,10 +17,12 @@ limitations under the License. package apiserver import ( + "math/rand" "net/http" "reflect" "regexp" "strings" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -32,19 +34,47 @@ import ( "golang.org/x/net/websocket" ) -var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") +var ( + connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") + + // nothing will ever be sent down this channel + neverExitWatch <-chan time.Time = make(chan time.Time) +) func isWebsocketRequest(req *http.Request) bool { return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket" } +// timeoutFactory abstracts watch timeout logic for testing +type timeoutFactory interface { + TimeoutCh() (<-chan time.Time, func() bool) +} + +// realTimeoutFactory implements timeoutFactory +type realTimeoutFactory struct { + timeout time.Duration +} + +// TimeoutChan returns a channel which will receive something when the watch times out, +// and a cleanup function to call when this happens. +func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { + if w.timeout == 0 { + return neverExitWatch, func() bool { return false } + } + t := time.NewTimer(w.timeout) + return t.C, t.Stop +} + // serveWatch handles serving requests to the server func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) { + // Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer. + timeout := time.Duration(MinTimeoutSecs+rand.Intn(MaxTimeoutSecs-MinTimeoutSecs)) * time.Second + watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) { if err := setSelfLink(obj, req, scope.Namer); err != nil { glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) } - }} + }, &realTimeoutFactory{timeout}} if isWebsocketRequest(req.Request) { websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request) } else { @@ -57,6 +87,7 @@ type WatchServer struct { watching watch.Interface codec runtime.Codec fixup func(runtime.Object) + t timeoutFactory } // HandleWS implements a websocket handler. @@ -100,6 +131,9 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) { func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { loggedW := httplog.LogOf(req, w) w = httplog.Unlogged(w) + timeoutCh, cleanup := self.t.TimeoutCh() + defer cleanup() + defer self.watching.Stop() cn, ok := w.(http.CloseNotifier) if !ok { @@ -113,16 +147,15 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.NotFound(w, req) return } - w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() - encoder := watchjson.NewEncoder(w, self.codec) for { select { case <-cn.CloseNotify(): - self.watching.Stop() + return + case <-timeoutCh: return case event, ok := <-self.watching.ResultChan(): if !ok { @@ -132,7 +165,6 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { self.fixup(event.Object) if err := encoder.Encode(&event); err != nil { // Client disconnect. - self.watching.Stop() return } flusher.Flush() diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 4cab1565c3e..501f829219d 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -18,11 +18,13 @@ package apiserver import ( "encoding/json" + "io" "net/http" "net/http/httptest" "net/url" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" @@ -278,4 +280,71 @@ func TestWatchProtocolSelection(t *testing.T) { t.Errorf("Unexpected response %#v", response) } } + +} + +type fakeTimeoutFactory struct { + timeoutCh chan time.Time + done chan struct{} +} + +func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { + return t.timeoutCh, func() bool { + defer close(t.done) + return true + } +} + +func TestWatchHTTPTimeout(t *testing.T) { + watcher := watch.NewFake() + timeoutCh := make(chan time.Time) + done := make(chan struct{}) + + // Setup a new watchserver + watchServer := &WatchServer{ + watcher, + version2ServerCodec, + func(obj runtime.Object) {}, + &fakeTimeoutFactory{timeoutCh, done}, + } + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + watchServer.ServeHTTP(w, req) + })) + defer s.Close() + + // Setup a client + dest, _ := url.Parse(s.URL) + dest.Path = "/api/version/watch/resource" + dest.RawQuery = "" + + req, _ := http.NewRequest("GET", dest.String(), nil) + client := http.Client{} + resp, err := client.Do(req) + watcher.Add(&api.Pod{TypeMeta: api.TypeMeta{APIVersion: "v1beta3"}}) + + // Make sure we can actually watch an endpoint + decoder := json.NewDecoder(resp.Body) + var got watchJSON + err = decoder.Decode(&got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Timeout and check for leaks + close(timeoutCh) + select { + case <-done: + if !watcher.Stopped { + t.Errorf("Leaked watch on timeout") + } + case <-time.After(100 * time.Millisecond): + t.Errorf("Failed to stop watcher after 100ms of timeout signal") + } + + // Make sure we can't receive any more events through the timeout watch + err = decoder.Decode(&got) + if err != io.EOF { + t.Errorf("Unexpected non-error") + } }