mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #32269 from wojtek-t/watcher_logs
Automatic merge from submit-queue Extend logging for scalability tests debugging Ref #32257
This commit is contained in:
commit
d877967c1a
@ -403,6 +403,12 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
|
|||||||
return result, len(result) > 0
|
return result, len(result) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Most probably splitting this method to a separate thread will visibily
|
||||||
|
// improve throughput of our watch machinery. So what we should do is to:
|
||||||
|
// - OnEvent handler simply put an element to channel
|
||||||
|
// - processEvent be another goroutine processing events from that channel
|
||||||
|
// Additionally, if we make this channel buffered, cacher will be more resistant
|
||||||
|
// to single watchers being slow - see cacheWatcher::add method.
|
||||||
func (c *Cacher) processEvent(event watchCacheEvent) {
|
func (c *Cacher) processEvent(event watchCacheEvent) {
|
||||||
triggerValues, supported := c.triggerValues(&event)
|
triggerValues, supported := c.triggerValues(&event)
|
||||||
|
|
||||||
@ -619,6 +625,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
|
|||||||
// OK, block sending, but only for up to 5 seconds.
|
// OK, block sending, but only for up to 5 seconds.
|
||||||
// cacheWatcher.add is called very often, so arrange
|
// cacheWatcher.add is called very often, so arrange
|
||||||
// to reuse timers instead of constantly allocating.
|
// to reuse timers instead of constantly allocating.
|
||||||
|
startTime := time.Now()
|
||||||
const timeout = 5 * time.Second
|
const timeout = 5 * time.Second
|
||||||
t, ok := timerPool.Get().(*time.Timer)
|
t, ok := timerPool.Get().(*time.Timer)
|
||||||
if ok {
|
if ok {
|
||||||
@ -643,6 +650,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
|
|||||||
c.forget(false)
|
c.forget(false)
|
||||||
c.stop()
|
c.stop()
|
||||||
}
|
}
|
||||||
|
glog.V(2).Infof("cacheWatcher add function blocked processing for %v", time.Since(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
||||||
|
@ -19,6 +19,7 @@ package etcd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -107,6 +108,10 @@ type etcdWatcher struct {
|
|||||||
// Injectable for testing. Send the event down the outgoing channel.
|
// Injectable for testing. Send the event down the outgoing channel.
|
||||||
emit func(watch.Event)
|
emit func(watch.Event)
|
||||||
|
|
||||||
|
// HighWaterMarks for performance debugging.
|
||||||
|
incomingHWM HighWaterMark
|
||||||
|
outgoingHWM HighWaterMark
|
||||||
|
|
||||||
cache etcdCache
|
cache etcdCache
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,6 +155,10 @@ func newEtcdWatcher(
|
|||||||
cancel: nil,
|
cancel: nil,
|
||||||
}
|
}
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
|
if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
|
||||||
|
// Monitor if this gets backed up, and how much.
|
||||||
|
glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
|
||||||
|
}
|
||||||
// Give up on user stop, without this we leak a lot of goroutines in tests.
|
// Give up on user stop, without this we leak a lot of goroutines in tests.
|
||||||
select {
|
select {
|
||||||
case w.outgoing <- e:
|
case w.outgoing <- e:
|
||||||
@ -262,10 +271,6 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
|
|||||||
incoming <- &copied
|
incoming <- &copied
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
watchChannelHWM HighWaterMark
|
|
||||||
)
|
|
||||||
|
|
||||||
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
|
||||||
// called as a goroutine.
|
// called as a goroutine.
|
||||||
func (w *etcdWatcher) translate() {
|
func (w *etcdWatcher) translate() {
|
||||||
@ -308,9 +313,9 @@ func (w *etcdWatcher) translate() {
|
|||||||
return
|
return
|
||||||
case res, ok := <-w.etcdIncoming:
|
case res, ok := <-w.etcdIncoming:
|
||||||
if ok {
|
if ok {
|
||||||
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
|
if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
|
||||||
// Monitor if this gets backed up, and how much.
|
// Monitor if this gets backed up, and how much.
|
||||||
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
|
glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
|
||||||
}
|
}
|
||||||
w.sendResult(res)
|
w.sendResult(res)
|
||||||
}
|
}
|
||||||
|
@ -190,6 +190,10 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||||||
if res == nil {
|
if res == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if len(wc.resultChan) == outgoingBufSize {
|
||||||
|
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||||
|
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
|
||||||
|
}
|
||||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||||
// Because storing events in local will cause more memory usage.
|
// Because storing events in local will cause more memory usage.
|
||||||
// The worst case would be closing the fast watcher.
|
// The worst case would be closing the fast watcher.
|
||||||
@ -300,7 +304,7 @@ func (wc *watchChan) sendError(err error) {
|
|||||||
|
|
||||||
func (wc *watchChan) sendEvent(e *event) {
|
func (wc *watchChan) sendEvent(e *event) {
|
||||||
if len(wc.incomingEventChan) == incomingBufSize {
|
if len(wc.incomingEventChan) == incomingBufSize {
|
||||||
glog.V(2).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
|
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||||
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
|
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
|
||||||
incomingBufSize)
|
incomingBufSize)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user