mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Randomize apiserver watch timeouts
This commit is contained in:
parent
d9d12fd3f7
commit
8a5445d3db
@ -20,8 +20,10 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
@ -32,6 +34,8 @@ import (
|
||||
|
||||
func main() {
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
|
||||
s := app.NewAPIServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
|
||||
|
@ -46,6 +46,12 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
const (
|
||||
// Maximum duration before timing out read/write requests
|
||||
// Set to a value larger than the timeouts in each watch server.
|
||||
ReadWriteTimeout = time.Minute * 60
|
||||
)
|
||||
|
||||
// APIServer runs a kubernetes api server.
|
||||
type APIServer struct {
|
||||
InsecureBindAddress util.IP
|
||||
@ -393,8 +399,8 @@ func (s *APIServer) Run(_ []string) error {
|
||||
readOnlyServer := &http.Server{
|
||||
Addr: roLocation,
|
||||
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler)))),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
WriteTimeout: 5 * time.Minute,
|
||||
ReadTimeout: ReadWriteTimeout,
|
||||
WriteTimeout: ReadWriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
glog.Infof("Serving read-only insecurely on %s", roLocation)
|
||||
@ -413,8 +419,8 @@ func (s *APIServer) Run(_ []string) error {
|
||||
secureServer := &http.Server{
|
||||
Addr: secureLocation,
|
||||
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(m.Handler)),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
WriteTimeout: 5 * time.Minute,
|
||||
ReadTimeout: ReadWriteTimeout,
|
||||
WriteTimeout: ReadWriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
TLSConfig: &tls.Config{
|
||||
// Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability)
|
||||
@ -454,12 +460,11 @@ func (s *APIServer) Run(_ []string) error {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
http := &http.Server{
|
||||
Addr: insecureLocation,
|
||||
Handler: apiserver.RecoverPanics(m.InsecureHandler),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
WriteTimeout: 5 * time.Minute,
|
||||
ReadTimeout: ReadWriteTimeout,
|
||||
WriteTimeout: ReadWriteTimeout,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
glog.Infof("Serving insecurely on %s", insecureLocation)
|
||||
|
@ -132,6 +132,14 @@ type APIGroupVersion struct {
|
||||
Context api.RequestContextMapper
|
||||
}
|
||||
|
||||
// TODO: Pipe these in through the apiserver cmd line
|
||||
const (
|
||||
// Minimum duration before timing out read/write requests
|
||||
MinTimeoutSecs = 300
|
||||
// Maximum duration before timing out read/write requests
|
||||
MaxTimeoutSecs = 600
|
||||
)
|
||||
|
||||
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
||||
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
|
||||
// in a slash. A restful WebService is created for the group and version.
|
||||
|
@ -17,10 +17,12 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
@ -32,19 +34,47 @@ import (
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
|
||||
var (
|
||||
connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
|
||||
|
||||
// nothing will ever be sent down this channel
|
||||
neverExitWatch <-chan time.Time = make(chan time.Time)
|
||||
)
|
||||
|
||||
func isWebsocketRequest(req *http.Request) bool {
|
||||
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
|
||||
}
|
||||
|
||||
// timeoutFactory abstracts watch timeout logic for testing
|
||||
type timeoutFactory interface {
|
||||
TimeoutCh() (<-chan time.Time, func() bool)
|
||||
}
|
||||
|
||||
// realTimeoutFactory implements timeoutFactory
|
||||
type realTimeoutFactory struct {
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// TimeoutChan returns a channel which will receive something when the watch times out,
|
||||
// and a cleanup function to call when this happens.
|
||||
func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||
if w.timeout == 0 {
|
||||
return neverExitWatch, func() bool { return false }
|
||||
}
|
||||
t := time.NewTimer(w.timeout)
|
||||
return t.C, t.Stop
|
||||
}
|
||||
|
||||
// serveWatch handles serving requests to the server
|
||||
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) {
|
||||
// Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer.
|
||||
timeout := time.Duration(MinTimeoutSecs+rand.Intn(MaxTimeoutSecs-MinTimeoutSecs)) * time.Second
|
||||
|
||||
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
||||
}
|
||||
}}
|
||||
}, &realTimeoutFactory{timeout}}
|
||||
if isWebsocketRequest(req.Request) {
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request)
|
||||
} else {
|
||||
@ -57,6 +87,7 @@ type WatchServer struct {
|
||||
watching watch.Interface
|
||||
codec runtime.Codec
|
||||
fixup func(runtime.Object)
|
||||
t timeoutFactory
|
||||
}
|
||||
|
||||
// HandleWS implements a websocket handler.
|
||||
@ -100,6 +131,9 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) {
|
||||
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
loggedW := httplog.LogOf(req, w)
|
||||
w = httplog.Unlogged(w)
|
||||
timeoutCh, cleanup := self.t.TimeoutCh()
|
||||
defer cleanup()
|
||||
defer self.watching.Stop()
|
||||
|
||||
cn, ok := w.(http.CloseNotifier)
|
||||
if !ok {
|
||||
@ -113,16 +147,15 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
encoder := watchjson.NewEncoder(w, self.codec)
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
self.watching.Stop()
|
||||
return
|
||||
case <-timeoutCh:
|
||||
return
|
||||
case event, ok := <-self.watching.ResultChan():
|
||||
if !ok {
|
||||
@ -132,7 +165,6 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
self.fixup(event.Object)
|
||||
if err := encoder.Encode(&event); err != nil {
|
||||
// Client disconnect.
|
||||
self.watching.Stop()
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
|
@ -18,11 +18,13 @@ package apiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
@ -278,4 +280,71 @@ func TestWatchProtocolSelection(t *testing.T) {
|
||||
t.Errorf("Unexpected response %#v", response)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type fakeTimeoutFactory struct {
|
||||
timeoutCh chan time.Time
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||
return t.timeoutCh, func() bool {
|
||||
defer close(t.done)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchHTTPTimeout(t *testing.T) {
|
||||
watcher := watch.NewFake()
|
||||
timeoutCh := make(chan time.Time)
|
||||
done := make(chan struct{})
|
||||
|
||||
// Setup a new watchserver
|
||||
watchServer := &WatchServer{
|
||||
watcher,
|
||||
version2ServerCodec,
|
||||
func(obj runtime.Object) {},
|
||||
&fakeTimeoutFactory{timeoutCh, done},
|
||||
}
|
||||
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
watchServer.ServeHTTP(w, req)
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
// Setup a client
|
||||
dest, _ := url.Parse(s.URL)
|
||||
dest.Path = "/api/version/watch/resource"
|
||||
dest.RawQuery = ""
|
||||
|
||||
req, _ := http.NewRequest("GET", dest.String(), nil)
|
||||
client := http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
watcher.Add(&api.Pod{TypeMeta: api.TypeMeta{APIVersion: "v1beta3"}})
|
||||
|
||||
// Make sure we can actually watch an endpoint
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var got watchJSON
|
||||
err = decoder.Decode(&got)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Timeout and check for leaks
|
||||
close(timeoutCh)
|
||||
select {
|
||||
case <-done:
|
||||
if !watcher.Stopped {
|
||||
t.Errorf("Leaked watch on timeout")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Errorf("Failed to stop watcher after 100ms of timeout signal")
|
||||
}
|
||||
|
||||
// Make sure we can't receive any more events through the timeout watch
|
||||
err = decoder.Decode(&got)
|
||||
if err != io.EOF {
|
||||
t.Errorf("Unexpected non-error")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user