mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-15 14:14:39 +00:00
@@ -168,10 +168,11 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
|
|||||||
// object. Basically, we see a lot of "401 window exceeded"
|
// object. Basically, we see a lot of "401 window exceeded"
|
||||||
// errors from etcd, and that's due to the client not streaming
|
// errors from etcd, and that's due to the client not streaming
|
||||||
// results but rather getting them one at a time. So we really
|
// results but rather getting them one at a time. So we really
|
||||||
// want to never block the etcd client, if possible. The 50 is
|
// want to never block the etcd client, if possible. The 100 is
|
||||||
// arbitrary; there's a V(4) log message that prints the length
|
// mostly arbitrary--we know it goes as high as 50, though.
|
||||||
// so we can monitor how much of this buffer is actually used.
|
// There's a V(2) log message that prints the length so we can
|
||||||
etcdIncoming: make(chan *etcd.Response, 50),
|
// monitor how much of this buffer is actually used.
|
||||||
|
etcdIncoming: make(chan *etcd.Response, 100),
|
||||||
etcdError: make(chan error, 1),
|
etcdError: make(chan error, 1),
|
||||||
etcdStop: make(chan bool),
|
etcdStop: make(chan bool),
|
||||||
outgoing: make(chan watch.Event),
|
outgoing: make(chan watch.Event),
|
||||||
@@ -235,6 +236,10 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
|
|||||||
incoming <- &copied
|
incoming <- &copied
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
watchChannelHWM util.HighWaterMark
|
||||||
|
)
|
||||||
|
|
||||||
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
||||||
// called as a goroutine.
|
// called as a goroutine.
|
||||||
func (w *etcdWatcher) translate() {
|
func (w *etcdWatcher) translate() {
|
||||||
@@ -259,9 +264,9 @@ func (w *etcdWatcher) translate() {
|
|||||||
return
|
return
|
||||||
case res, ok := <-w.etcdIncoming:
|
case res, ok := <-w.etcdIncoming:
|
||||||
if ok {
|
if ok {
|
||||||
if curLen := len(w.etcdIncoming); curLen > 0 {
|
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) {
|
||||||
// Monitor if this gets backed up, and how much.
|
// Monitor if this gets backed up, and how much.
|
||||||
glog.V(4).Infof("watch: %v objects queued in channel.", curLen)
|
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
|
||||||
}
|
}
|
||||||
w.sendResult(res)
|
w.sendResult(res)
|
||||||
}
|
}
|
||||||
|
@@ -18,6 +18,7 @@ package util
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(ArtfulCoder)
|
// TODO(ArtfulCoder)
|
||||||
@@ -40,3 +41,20 @@ func (at *AtomicValue) Load() interface{} {
|
|||||||
defer at.valueMutex.RUnlock()
|
defer at.valueMutex.RUnlock()
|
||||||
return at.value
|
return at.value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HighWaterMark is a thread-safe object for tracking the maximum value seen
|
||||||
|
// for some quantity.
|
||||||
|
type HighWaterMark int64
|
||||||
|
|
||||||
|
// Check returns true iff 'current' is the highest value ever seen.
|
||||||
|
func (hwm *HighWaterMark) Check(current int64) bool {
|
||||||
|
for {
|
||||||
|
old := atomic.LoadInt64((*int64)(hwm))
|
||||||
|
if current <= old {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -48,3 +50,34 @@ func TestAtomicValue(t *testing.T) {
|
|||||||
atomicValue.Store(10)
|
atomicValue.Store(10)
|
||||||
ExpectValue(t, atomicValue, 10)
|
ExpectValue(t, atomicValue, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHighWaterMark(t *testing.T) {
|
||||||
|
var h HighWaterMark
|
||||||
|
|
||||||
|
for i := int64(10); i < 20; i++ {
|
||||||
|
if !h.Check(i) {
|
||||||
|
t.Errorf("unexpected false for %v", i)
|
||||||
|
}
|
||||||
|
if h.Check(i - 1) {
|
||||||
|
t.Errorf("unexpected true for %v", i-1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m := int64(0)
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
v := rand.Int63()
|
||||||
|
go func(v int64) {
|
||||||
|
defer wg.Done()
|
||||||
|
h.Check(v)
|
||||||
|
}(v)
|
||||||
|
if v > m {
|
||||||
|
m = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if m != int64(h) {
|
||||||
|
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user