Merge pull request #24142 from rrati/controller-sync-interval-23394

Automatic merge from submit-queue

Separated resync and relist functionality in reflector #23394

controller-manager #23394
This commit is contained in:
k8s-merge-robot 2016-05-19 07:10:00 -07:00
commit 044d55ed7d
11 changed files with 236 additions and 67 deletions

View File

@ -377,6 +377,12 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E
return
}
// Resync will touch all objects to put them into the processing queue
func (f *HistoricalFIFO) Resync() error {
// Nothing to do
return nil
}
// NewHistorical returns a Store which can be used to queue up items to
// process. If a non-nil Mux is provided, then modifications to the
// the FIFO are delivered on a channel specific to this fifo.

View File

@ -306,6 +306,10 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
// Copy item's slice so operations on this slice (delta
@ -452,6 +456,27 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
f.lock.RLock()
defer f.lock.RUnlock()
for _, k := range f.knownObjects.ListKeys() {
obj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
continue
} else if !exists {
glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
continue
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
}
return nil
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister

View File

@ -146,6 +146,7 @@ func (c *ExpirationCache) ListKeys() []string {
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
@ -191,6 +192,11 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil
}
// Resync will touch all objects to put them into the processing queue
func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync()
}
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
return &ExpirationCache{

102
pkg/client/cache/fake_custom_store.go vendored Normal file
View File

@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
// FakeStore lets you define custom functions for store operations
type FakeCustomStore struct {
AddFunc func(obj interface{}) error
UpdateFunc func(obj interface{}) error
DeleteFunc func(obj interface{}) error
ListFunc func() []interface{}
ListKeysFunc func() []string
GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
ReplaceFunc func(list []interface{}, resourceVerion string) error
ResyncFunc func() error
}
// Add calls the custom Add function if defined
func (f *FakeCustomStore) Add(obj interface{}) error {
if f.AddFunc != nil {
return f.AddFunc(obj)
}
return nil
}
// Update calls the custom Update function if defined
func (f *FakeCustomStore) Update(obj interface{}) error {
if f.UpdateFunc != nil {
return f.Update(obj)
}
return nil
}
// Delete calls the custom Delete function if defined
func (f *FakeCustomStore) Delete(obj interface{}) error {
if f.DeleteFunc != nil {
return f.DeleteFunc(obj)
}
return nil
}
// List calls the custom List function if defined
func (f *FakeCustomStore) List() []interface{} {
if f.ListFunc != nil {
return f.ListFunc()
}
return nil
}
// ListKeys calls the custom ListKeys function if defined
func (f *FakeCustomStore) ListKeys() []string {
if f.ListKeysFunc != nil {
return f.ListKeysFunc()
}
return nil
}
// Get calls the custom Get function if defined
func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
if f.GetFunc != nil {
return f.GetFunc(obj)
}
return nil, false, nil
}
// GetByKey calls the custom GetByKey function if defined
func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) {
if f.GetByKeyFunc != nil {
return f.GetByKeyFunc(key)
}
return nil, false, nil
}
// Replace calls the custom Replace function if defined
func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error {
if f.ReplaceFunc != nil {
return f.ReplaceFunc(list, resourceVersion)
}
return nil
}
// Resync calls the custom Resync function if defined
func (f *FakeCustomStore) Resync() error {
if f.ResyncFunc != nil {
return f.ResyncFunc()
}
return nil
}

View File

@ -18,6 +18,8 @@ package cache
import (
"sync"
"k8s.io/kubernetes/pkg/util/sets"
)
// Queue is exactly like a Store, but has a Pop() method too.
@ -241,6 +243,26 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will touch all objects to put them into the processing queue
func (f *FIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
inQueue := sets.NewString()
for _, id := range f.queue {
inQueue.Insert(id)
}
for id := range f.items {
if !inQueue.Has(id) {
f.queue = append(f.queue, id)
}
}
if len(f.queue) > 0 {
f.cond.Broadcast()
}
return nil
}
// NewFIFO returns a Store which can be used to queue up items to
// process.
func NewFIFO(keyFunc KeyFunc) *FIFO {

View File

@ -337,15 +337,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
return nil
}
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
if err != errorResyncRequested && err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
if err != errorResyncRequested {
return nil
}
}
if r.canForceResyncNow() {
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
return nil
if err := r.store.Resync(); err != nil {
return err
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}
}

View File

@ -32,14 +32,14 @@ import (
type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(resourceVersion string) (watch.Interface, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
return t.ListFunc()
}
func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options.ResourceVersion)
return t.WatchFunc(options)
}
func TestCloseWatchChannelOnError(t *testing.T) {
@ -47,7 +47,7 @@ func TestCloseWatchChannelOnError(t *testing.T) {
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
@ -73,7 +73,7 @@ func TestRunUntil(t *testing.T) {
r := NewReflector(&testLW{}, &api.Pod{}, store, 0)
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
@ -191,19 +191,6 @@ func TestReflectorWatchHandler(t *testing.T) {
}
}
func TestReflectorWatchHandlerTimeout(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
fw := watch.NewFake()
var resumeRV string
exit := make(chan time.Time, 1)
exit <- time.Now()
err := g.watchHandler(fw, &resumeRV, exit, wait.NeverStop)
if err != errorResyncRequested {
t.Errorf("expected timeout error, but got %q", err)
}
}
func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
@ -225,7 +212,8 @@ func TestReflectorListAndWatch(t *testing.T) {
// inject an error.
expectedRVs := []string{"1", "3"}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
rv := options.ResourceVersion
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
@ -340,7 +328,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}
watchRet, watchErr := item.events, item.watchErr
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
if watchErr != nil {
return nil, watchErr
}
@ -364,40 +352,30 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}
func TestReflectorResync(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
currentTime := time.Time{}
iteration := 0
stopCh := make(chan struct{})
s := &FakeCustomStore{
ResyncFunc: func() error {
iteration++
if iteration == 2 {
close(stopCh)
}
return nil
},
}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
if iteration == 0 {
// Move time, but do not force resync.
currentTime = currentTime.Add(30 * time.Second)
} else if iteration == 1 {
// Move time to force resync.
currentTime = currentTime.Add(28 * time.Second)
} else if iteration >= 2 {
t.Fatalf("should have forced resync earlier")
}
iteration++
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
// Send something to the watcher to avoid "watch too short" errors.
go func() {
fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: strconv.Itoa(iteration)}})
fw.Stop()
}()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "0"}}, nil
},
}
resyncPeriod := time.Minute
resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &api.Pod{}, s, resyncPeriod)
r.now = func() time.Time { return currentTime }
r.ListAndWatch(wait.NeverStop)
r.ListAndWatch(stopCh)
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}

View File

@ -44,6 +44,7 @@ type Store interface {
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
@ -217,6 +218,11 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync touches all items in the store to force processing
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{

View File

@ -50,6 +50,7 @@ type ThreadSafeStore interface {
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}
// threadSafeMap implements ThreadSafeStore
@ -272,6 +273,11 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
return nil
}
func (c *threadSafeMap) Resync() error {
// Nothing to do
return nil
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},

View File

@ -29,10 +29,23 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/google/gofuzz"
)
type testLW struct {
ListFunc func(options api.ListOptions) (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
return t.ListFunc(options)
}
func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options)
}
func Example() {
// source simulates an apiserver object endpoint.
source := framework.NewFakeControllerSource()
@ -295,18 +308,15 @@ func TestUpdate(t *testing.T) {
source := framework.NewFakeControllerSource()
const (
FROM = "from"
ADD_MISSED = "missed the add event"
TO = "to"
FROM = "from"
TO = "to"
)
// These are the transitions we expect to see; because this is
// asynchronous, there are a lot of valid possibilities.
type pair struct{ from, to string }
allowedTransitions := map[pair]bool{
pair{FROM, TO}: true,
pair{FROM, ADD_MISSED}: true,
pair{ADD_MISSED, TO}: true,
pair{FROM, TO}: true,
// Because a resync can happen when we've already observed one
// of the above but before the item is deleted.
@ -337,21 +347,6 @@ func TestUpdate(t *testing.T) {
source.Add(pod(name, FROM, false))
source.Modify(pod(name, TO, true))
},
func(name string) {
name = "b-" + name
source.Add(pod(name, FROM, false))
source.ModifyDropWatch(pod(name, TO, true))
},
func(name string) {
name = "c-" + name
source.AddDropWatch(pod(name, FROM, false))
source.Modify(pod(name, ADD_MISSED, false))
source.Modify(pod(name, TO, true))
},
func(name string) {
name = "d-" + name
source.Add(pod(name, FROM, true))
},
}
const threads = 3
@ -362,10 +357,20 @@ func TestUpdate(t *testing.T) {
// Make a controller that deletes things once it observes an update.
// It calls Done() on the wait group on deletions so we can tell when
// everything we've added has been deleted.
watchCh := make(chan struct{})
_, controller := framework.NewInformer(
source,
&testLW{
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
watch, err := source.Watch(options)
close(watchCh)
return watch, err
},
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return source.List(options)
},
},
&api.Pod{},
time.Millisecond*1,
0,
framework.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
@ -388,6 +393,7 @@ func TestUpdate(t *testing.T) {
// all testDoneWG.Add() calls must happen before this point
stop := make(chan struct{})
go controller.Run(stop)
<-watchCh
// run every test a few times, in parallel
var wg sync.WaitGroup

View File

@ -324,3 +324,8 @@ func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEven
defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
}
func (w *watchCache) Resync() error {
// Nothing to do
return nil
}