mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Treat super short watches with no items received the same as watches that error immediately
This commit is contained in:
parent
0d69393a43
commit
4d7c6e2657
22
pkg/client/cache/reflector.go
vendored
22
pkg/client/cache/reflector.go
vendored
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
@ -99,7 +100,10 @@ func (r *Reflector) listAndWatch() {
|
|||||||
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.watchHandler(w, &resourceVersion)
|
if err := r.watchHandler(w, &resourceVersion); err != nil {
|
||||||
|
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,12 +123,13 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error {
|
||||||
|
start := time.Now()
|
||||||
|
eventCount := 0
|
||||||
for {
|
for {
|
||||||
event, ok := <-w.ResultChan()
|
event, ok := <-w.ResultChan()
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("unexpected watch close")
|
break
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||||
@ -149,5 +154,14 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
|||||||
glog.Errorf("unable to understand watch event %#v", event)
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
}
|
}
|
||||||
*resourceVersion = jsonBase.ResourceVersion() + 1
|
*resourceVersion = jsonBase.ResourceVersion() + 1
|
||||||
|
eventCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
watchDuration := time.Now().Sub(start)
|
||||||
|
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||||
|
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
|
||||||
|
return errors.New("very short watch")
|
||||||
|
}
|
||||||
|
glog.Infof("unexpected watch close - %v total items received", eventCount)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
19
pkg/client/cache/reflector_test.go
vendored
19
pkg/client/cache/reflector_test.go
vendored
@ -35,6 +35,20 @@ func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
|
|||||||
return t.WatchFunc(resourceVersion)
|
return t.WatchFunc(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflector_watchHandlerError(t *testing.T) {
|
||||||
|
s := NewStore()
|
||||||
|
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
go func() {
|
||||||
|
fw.Stop()
|
||||||
|
}()
|
||||||
|
var resumeRV uint64
|
||||||
|
err := g.watchHandler(fw, &resumeRV)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("unexpected non-error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_watchHandler(t *testing.T) {
|
func TestReflector_watchHandler(t *testing.T) {
|
||||||
s := NewStore()
|
s := NewStore()
|
||||||
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
||||||
@ -49,7 +63,10 @@ func TestReflector_watchHandler(t *testing.T) {
|
|||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV uint64
|
var resumeRV uint64
|
||||||
g.watchHandler(fw, &resumeRV)
|
err := g.watchHandler(fw, &resumeRV)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
ID string
|
ID string
|
||||||
|
Loading…
Reference in New Issue
Block a user