Reuse TCP connections in Reflector between resync periods.

This commit is contained in:
Wojciech Tyczynski
2015-10-26 10:34:45 +01:00
parent a094a6e3de
commit d47e21f19f
37 changed files with 184 additions and 108 deletions

View File

@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"net/url"
"reflect"
@@ -30,6 +31,7 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
@@ -43,7 +45,7 @@ type ListerWatcher interface {
// ResourceVersion field will be used to start the watch in the right place.
List() (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(resourceVersion string) (watch.Interface, error)
Watch(options api.ListOptions) (watch.Interface, error)
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
@@ -61,6 +63,8 @@ type Reflector struct {
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
// nextResync is approximate time of next resync (0 if not scheduled)
nextResync time.Time
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
@@ -69,6 +73,22 @@ type Reflector struct {
lastSyncResourceVersionMutex sync.RWMutex
}
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
// However, it can be modified to avoid periodic resync to break the
// TCP connection.
minWatchTimeout = 5 * time.Minute
now func() time.Time = time.Now
// If we are within 'forceResyncThreshold' from the next planned resync
// and are just before issueing Watch(), resync will be forced now.
forceResyncThreshold = 3 * time.Second
// We try to set timeouts for Watch() so that we will finish about
// than 'timeoutThreshold' from next planned periodic resync.
timeoutThreshold = 1 * time.Second
)
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
@@ -160,16 +180,47 @@ var (
// required, and a cleanup function.
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
if r.resyncPeriod == 0 {
r.nextResync = time.Time{}
return neverExitWatch, func() bool { return false }
}
// The cleanup function is required: imagine the scenario where watches
// always fail so we end up listing frequently. Then, if we don't
// manually stop the timer, we could end up with many timers active
// concurrently.
r.nextResync = now().Add(r.resyncPeriod)
t := time.NewTimer(r.resyncPeriod)
return t.C, t.Stop
}
// We want to avoid situations when periodic resyncing is breaking the TCP
// connection.
// If response`s body is not read to completion before calling body.Close(),
// that TCP connection will not be reused in the future - see #15664 issue
// for more details.
// Thus, we set timeout for watch requests to be smaller than the remaining
// time until next periodic resync and force resyncing ourself to avoid
// breaking TCP connection.
//
// TODO: This should be parametrizable based on server load.
func (r *Reflector) timeoutForWatch() *int64 {
randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
timeout := r.nextResync.Sub(now()) - timeoutThreshold
if timeout < 0 || randTimeout < timeout {
timeout = randTimeout
}
timeoutSeconds := int64(timeout.Seconds())
return &timeoutSeconds
}
// Returns true if we are close enough to next planned periodic resync
// and we can force resyncing ourself now.
func (r *Reflector) canForceResyncNow() bool {
if r.nextResync.IsZero() {
return false
}
return now().Add(forceResyncThreshold).After(r.nextResync)
}
// Returns error if ListAndWatch didn't even tried to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
@@ -195,7 +246,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.setLastSyncResourceVersion(resourceVersion)
for {
w, err := r.listerWatcher.Watch(resourceVersion)
options := api.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations when resyncing is breaking the TCP connection
// - see comment for 'timeoutForWatch()' for more details.
TimeoutSeconds: r.timeoutForWatch(),
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
@@ -225,6 +282,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
return nil
}
if r.canForceResyncNow() {
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
return nil
}
}
}