mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Merge pull request #4923 from lavalamp/fix4
Allow reflector to do full resync periodically
This commit is contained in:
commit
cb1e044a60
4
pkg/client/cache/poller_test.go
vendored
4
pkg/client/cache/poller_test.go
vendored
@ -104,6 +104,8 @@ func TestPoller_sync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoller_Run(t *testing.T) {
|
func TestPoller_Run(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer func() { stopCh <- struct{}{} }()
|
||||||
s := NewStore(testPairKeyFunc)
|
s := NewStore(testPairKeyFunc)
|
||||||
const count = 10
|
const count = 10
|
||||||
var called = 0
|
var called = 0
|
||||||
@ -118,7 +120,7 @@ func TestPoller_Run(t *testing.T) {
|
|||||||
return testEnumerator{}, nil
|
return testEnumerator{}, nil
|
||||||
}
|
}
|
||||||
return nil, errors.New("transient error")
|
return nil, errors.New("transient error")
|
||||||
}, time.Millisecond, s).Run()
|
}, time.Millisecond, s).RunUntil(stopCh)
|
||||||
|
|
||||||
// The test here is that we get called at least count times.
|
// The test here is that we get called at least count times.
|
||||||
<-done
|
<-done
|
||||||
|
100
pkg/client/cache/reflector.go
vendored
100
pkg/client/cache/reflector.go
vendored
@ -49,18 +49,23 @@ type Reflector struct {
|
|||||||
listerWatcher ListerWatcher
|
listerWatcher ListerWatcher
|
||||||
// period controls timing between one watch ending and
|
// period controls timing between one watch ending and
|
||||||
// the beginning of the next one.
|
// the beginning of the next one.
|
||||||
period time.Duration
|
period time.Duration
|
||||||
|
resyncPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReflector creates a new Reflector object which will keep the given store up to
|
// NewReflector creates a new Reflector object which will keep the given store up to
|
||||||
// date with the server's contents for the given resource. Reflector promises to
|
// date with the server's contents for the given resource. Reflector promises to
|
||||||
// only put things in the store that have the type of expectedType.
|
// only put things in the store that have the type of expectedType.
|
||||||
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector {
|
// If resyncPeriod is non-zero, then lists will be executed after every resyncPeriod,
|
||||||
|
// so that you can use reflectors to periodically process everything as well as
|
||||||
|
// incrementally processing the things that change.
|
||||||
|
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||||
r := &Reflector{
|
r := &Reflector{
|
||||||
listerWatcher: lw,
|
listerWatcher: lw,
|
||||||
store: store,
|
store: store,
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
expectedType: reflect.TypeOf(expectedType),
|
||||||
period: time.Second,
|
period: time.Second,
|
||||||
|
resyncPeriod: resyncPeriod,
|
||||||
}
|
}
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
@ -77,8 +82,25 @@ func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
|||||||
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
|
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// nothing will ever be sent down this channel
|
||||||
|
neverExitWatch <-chan time.Time = make(chan time.Time)
|
||||||
|
|
||||||
|
// Used to indicate that watching stopped so that a resync could happen.
|
||||||
|
errorResyncRequested = errors.New("resync channel fired")
|
||||||
|
)
|
||||||
|
|
||||||
|
// resyncChan returns a channel which will receive something when a resync is required.
|
||||||
|
func (r *Reflector) resyncChan() <-chan time.Time {
|
||||||
|
if r.resyncPeriod == 0 {
|
||||||
|
return neverExitWatch
|
||||||
|
}
|
||||||
|
return time.After(r.resyncPeriod)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Reflector) listAndWatch() {
|
func (r *Reflector) listAndWatch() {
|
||||||
var resourceVersion string
|
var resourceVersion string
|
||||||
|
exitWatch := r.resyncChan()
|
||||||
|
|
||||||
list, err := r.listerWatcher.List()
|
list, err := r.listerWatcher.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -114,8 +136,10 @@ func (r *Reflector) listAndWatch() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := r.watchHandler(w, &resourceVersion); err != nil {
|
if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil {
|
||||||
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
|
if err != errorResyncRequested {
|
||||||
|
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -132,41 +156,47 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error {
|
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
eventCount := 0
|
eventCount := 0
|
||||||
|
loop:
|
||||||
for {
|
for {
|
||||||
event, ok := <-w.ResultChan()
|
select {
|
||||||
if !ok {
|
case <-exitWatch:
|
||||||
break
|
w.Stop()
|
||||||
|
return errorResyncRequested
|
||||||
|
case event, ok := <-w.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
return apierrs.FromObject(event.Object)
|
||||||
|
}
|
||||||
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||||
|
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
meta, err := meta.Accessor(event.Object)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added:
|
||||||
|
r.store.Add(event.Object)
|
||||||
|
case watch.Modified:
|
||||||
|
r.store.Update(event.Object)
|
||||||
|
case watch.Deleted:
|
||||||
|
// TODO: Will any consumers need access to the "last known
|
||||||
|
// state", which is passed in event.Object? If so, may need
|
||||||
|
// to change this.
|
||||||
|
r.store.Delete(event.Object)
|
||||||
|
default:
|
||||||
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
|
}
|
||||||
|
*resourceVersion = meta.ResourceVersion()
|
||||||
|
eventCount++
|
||||||
}
|
}
|
||||||
if event.Type == watch.Error {
|
|
||||||
return apierrs.FromObject(event.Object)
|
|
||||||
}
|
|
||||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
|
||||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
meta, err := meta.Accessor(event.Object)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("unable to understand watch event %#v", event)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added:
|
|
||||||
r.store.Add(event.Object)
|
|
||||||
case watch.Modified:
|
|
||||||
r.store.Update(event.Object)
|
|
||||||
case watch.Deleted:
|
|
||||||
// TODO: Will any consumers need access to the "last known
|
|
||||||
// state", which is passed in event.Object? If so, may need
|
|
||||||
// to change this.
|
|
||||||
r.store.Delete(event.Object)
|
|
||||||
default:
|
|
||||||
glog.Errorf("unable to understand watch event %#v", event)
|
|
||||||
}
|
|
||||||
*resourceVersion = meta.ResourceVersion()
|
|
||||||
eventCount++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
watchDuration := time.Now().Sub(start)
|
watchDuration := time.Now().Sub(start)
|
||||||
|
38
pkg/client/cache/reflector_test.go
vendored
38
pkg/client/cache/reflector_test.go
vendored
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
@ -36,15 +37,27 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
|
|||||||
return t.WatchFunc(resourceVersion)
|
return t.WatchFunc(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflector_resyncChan(t *testing.T) {
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
|
||||||
|
a, b := g.resyncChan(), time.After(100*time.Millisecond)
|
||||||
|
select {
|
||||||
|
case <-a:
|
||||||
|
t.Logf("got timeout as expected")
|
||||||
|
case <-b:
|
||||||
|
t.Errorf("resyncChan() is at least 99 milliseconds late??")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_watchHandlerError(t *testing.T) {
|
func TestReflector_watchHandlerError(t *testing.T) {
|
||||||
s := NewStore(MetaNamespaceKeyFunc)
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
|
||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV)
|
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
@ -52,7 +65,7 @@ func TestReflector_watchHandlerError(t *testing.T) {
|
|||||||
|
|
||||||
func TestReflector_watchHandler(t *testing.T) {
|
func TestReflector_watchHandler(t *testing.T) {
|
||||||
s := NewStore(MetaNamespaceKeyFunc)
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
|
||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
||||||
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
|
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
|
||||||
@ -64,7 +77,7 @@ func TestReflector_watchHandler(t *testing.T) {
|
|||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV)
|
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -101,6 +114,19 @@ func TestReflector_watchHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflector_watchHandlerTimeout(t *testing.T) {
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
var resumeRV string
|
||||||
|
exit := make(chan time.Time, 1)
|
||||||
|
exit <- time.Now()
|
||||||
|
err := g.watchHandler(fw, &resumeRV, exit)
|
||||||
|
if err != errorResyncRequested {
|
||||||
|
t.Errorf("expected timeout error, but got %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_listAndWatch(t *testing.T) {
|
func TestReflector_listAndWatch(t *testing.T) {
|
||||||
createdFakes := make(chan *watch.FakeWatcher)
|
createdFakes := make(chan *watch.FakeWatcher)
|
||||||
|
|
||||||
@ -125,7 +151,7 @@ func TestReflector_listAndWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||||
r := NewReflector(lw, &api.Pod{}, s)
|
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||||
go r.listAndWatch()
|
go r.listAndWatch()
|
||||||
|
|
||||||
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||||
@ -242,7 +268,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
|||||||
return item.list, item.listErr
|
return item.list, item.listErr
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
r := NewReflector(lw, &api.Pod{}, s)
|
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||||
r.listAndWatch()
|
r.listAndWatch()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}
|
|||||||
}
|
}
|
||||||
updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource}
|
updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource}
|
||||||
}
|
}
|
||||||
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)).Run()
|
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
func getHostFieldLabel(apiVersion string) string {
|
func getHostFieldLabel(apiVersion string) string {
|
||||||
|
@ -102,7 +102,12 @@ func NewMainKubelet(
|
|||||||
|
|
||||||
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
if kubeClient != nil {
|
if kubeClient != nil {
|
||||||
cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run()
|
cache.NewReflector(
|
||||||
|
cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()),
|
||||||
|
&api.Service{},
|
||||||
|
serviceStore,
|
||||||
|
0,
|
||||||
|
).Run()
|
||||||
}
|
}
|
||||||
serviceLister := &cache.StoreToServiceLister{serviceStore}
|
serviceLister := &cache.StoreToServiceLister{serviceStore}
|
||||||
|
|
||||||
|
@ -90,6 +90,7 @@ func NewProvision(c client.Interface) admission.Interface {
|
|||||||
},
|
},
|
||||||
&api.Namespace{},
|
&api.Namespace{},
|
||||||
store,
|
store,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
reflector.Run()
|
reflector.Run()
|
||||||
return &provision{
|
return &provision{
|
||||||
|
@ -94,6 +94,7 @@ func NewExists(c client.Interface) admission.Interface {
|
|||||||
},
|
},
|
||||||
&api.Namespace{},
|
&api.Namespace{},
|
||||||
store,
|
store,
|
||||||
|
0,
|
||||||
)
|
)
|
||||||
reflector.Run()
|
reflector.Run()
|
||||||
return &exists{
|
return &exists{
|
||||||
|
@ -114,18 +114,18 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch and queue pods that need scheduling.
|
// Watch and queue pods that need scheduling.
|
||||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run()
|
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()
|
||||||
|
|
||||||
// Watch and cache all running pods. Scheduler needs to find all pods
|
// Watch and cache all running pods. Scheduler needs to find all pods
|
||||||
// so it knows where it's safe to place a pod. Cache this locally.
|
// so it knows where it's safe to place a pod. Cache this locally.
|
||||||
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run()
|
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store, 0).Run()
|
||||||
|
|
||||||
// Watch minions.
|
// Watch minions.
|
||||||
// Minions may be listed frequently, so provide a local up-to-date cache.
|
// Minions may be listed frequently, so provide a local up-to-date cache.
|
||||||
if false {
|
if false {
|
||||||
// Disable this code until minions support watches. Note when this code is enabled,
|
// Disable this code until minions support watches. Note when this code is enabled,
|
||||||
// we need to make sure minion ListWatcher has proper FieldSelector.
|
// we need to make sure minion ListWatcher has proper FieldSelector.
|
||||||
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run()
|
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store, 0).Run()
|
||||||
} else {
|
} else {
|
||||||
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
|
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
|
||||||
}
|
}
|
||||||
@ -133,7 +133,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
|||||||
// Watch and cache all service objects. Scheduler needs to find all pods
|
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||||
// created by the same service, so that it can spread them correctly.
|
// created by the same service, so that it can spread them correctly.
|
||||||
// Cache this locally.
|
// Cache this locally.
|
||||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run()
|
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run()
|
||||||
|
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user