Add resource version to Store Replace params.

This commit is contained in:
Wojciech Tyczynski 2015-08-18 10:34:27 +02:00
parent 66a644b275
commit e202f9c797
20 changed files with 27 additions and 254 deletions

View File

@ -277,7 +277,7 @@ func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} {
}
}
func (f *HistoricalFIFO) Replace(objs []interface{}) error {
func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error {
notifications := make([]Entry, 0, len(objs))
defer func() {
for _, e := range notifications {

View File

@ -122,7 +122,7 @@ func TestFIFO_addUpdate(t *testing.T) {
func TestFIFO_addReplace(t *testing.T) {
f := NewHistorical(nil)
f.Add(&testObj{"foo", 10})
f.Replace([]interface{}{&testObj{"foo", 15}})
f.Replace([]interface{}{&testObj{"foo", 15}}, "0")
got := make(chan *testObj, 2)
go func() {
for {

View File

@ -893,11 +893,11 @@ func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) {
// Replace will delete the contents of the store, using instead the
// given map. This store implementation does NOT take ownership of the map.
func (psa *podStoreAdapter) Replace(objs []interface{}) error {
func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) error {
newobjs := make([]interface{}, len(objs))
for i, v := range objs {
pod := v.(*api.Pod)
newobjs[i] = &Pod{Pod: pod}
}
return psa.FIFO.Replace(newobjs)
return psa.FIFO.Replace(newobjs, resourceVersion)
}

View File

@ -307,7 +307,7 @@ func (f *DeltaFIFO) Pop() interface{} {
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}) error {
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
for _, item := range list {

View File

@ -97,7 +97,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
)
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("foo", 12))
f.Replace([]interface{}{mkFifoObj("foo", 20)})
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
f.Delete(mkFifoObj("foo", 15))
f.Delete(mkFifoObj("foo", 18)) // flush the last one out
expect := []DeltaType{Added, Updated, Sync, Deleted}
@ -165,7 +165,7 @@ func TestDeltaFIFO_enqueueing(t *testing.T) {
func TestDeltaFIFO_addReplace(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
f.Add(mkFifoObj("foo", 10))
f.Replace([]interface{}{mkFifoObj("foo", 15)})
f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0")
got := make(chan testFifoObject, 2)
go func() {
for {
@ -197,7 +197,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}),
)
f.Delete(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)})
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}},

View File

@ -166,7 +166,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// Replace will convert all items in the given list to TimestampedEntries
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}) error {
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{}
ts := c.clock.Now()
for _, item := range list {
@ -176,7 +176,7 @@ func (c *ExpirationCache) Replace(list []interface{}) error {
}
items[key] = &timestampedEntry{item, ts}
}
c.cacheStorage.Replace(items)
c.cacheStorage.Replace(items, resourceVersion)
return nil
}

View File

@ -188,7 +188,7 @@ func (f *FIFO) Pop() interface{} {
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *FIFO) Replace(list []interface{}) error {
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{}
for _, item := range list {
key, err := f.keyFunc(item)

View File

@ -107,7 +107,7 @@ func TestFIFO_addUpdate(t *testing.T) {
func TestFIFO_addReplace(t *testing.T) {
f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkFifoObj("foo", 10))
f.Replace([]interface{}{mkFifoObj("foo", 15)})
f.Replace([]interface{}{mkFifoObj("foo", 15)}, "15")
got := make(chan testFifoObject, 2)
go func() {
for {

View File

@ -1,86 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
)
// Enumerator should be able to return the list of objects to be synced with
// one object at a time.
type Enumerator interface {
Len() int
Get(index int) (object interface{})
}
// GetFunc should return an enumerator that you wish the Poller to process.
type GetFunc func() (Enumerator, error)
// Poller is like Reflector, but it periodically polls instead of watching.
// This is intended to be a workaround for api objects that don't yet support
// watching.
type Poller struct {
getFunc GetFunc
period time.Duration
store Store
}
// NewPoller constructs a new poller. Note that polling probably doesn't make much
// sense to use along with the FIFO queue. The returned Poller will call getFunc and
// sync the objects in 'store' with the returned Enumerator, waiting 'period' between
// each call. It probably only makes sense to use a poller if you're treating the
// store as read-only.
func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {
return &Poller{
getFunc: getFunc,
period: period,
store: store,
}
}
// Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() {
go util.Until(p.run, p.period, util.NeverStop)
}
// RunUntil begins polling. It starts a goroutine and returns immediately.
// It will stop when the stopCh is closed.
func (p *Poller) RunUntil(stopCh <-chan struct{}) {
go util.Until(p.run, p.period, stopCh)
}
func (p *Poller) run() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}
func (p *Poller) sync(e Enumerator) {
items := []interface{}{}
for i := 0; i < e.Len(); i++ {
object := e.Get(i)
items = append(items, object)
}
p.store.Replace(items)
}

View File

@ -1,132 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"reflect"
"testing"
"time"
)
func testPairKeyFunc(obj interface{}) (string, error) {
return obj.(testPair).id, nil
}
type testPair struct {
id string
obj interface{}
}
type testEnumerator []testPair
func (t testEnumerator) Len() int { return len(t) }
func (t testEnumerator) Get(i int) interface{} {
return t[i]
}
func TestPoller_sync(t *testing.T) {
table := []struct {
// each step simulates the list that a getFunc would receive.
steps [][]testPair
}{
{
steps: [][]testPair{
{
{"foo", "foo1"},
{"bar", "bar1"},
{"baz", "baz1"},
{"qux", "qux1"},
}, {
{"foo", "foo2"},
{"bar", "bar2"},
{"qux", "qux2"},
}, {
{"bar", "bar3"},
{"baz", "baz2"},
{"qux", "qux3"},
}, {
{"qux", "qux4"},
}, {
{"foo", "foo3"},
},
},
},
}
for testCase, item := range table {
s := NewStore(testPairKeyFunc)
// This is a unit test for the sync function, hence the nil getFunc.
p := NewPoller(nil, 0, s)
for line, pairs := range item.steps {
p.sync(testEnumerator(pairs))
list := s.List()
for _, pair := range pairs {
foundInList := false
for _, listItem := range list {
id, _ := testPairKeyFunc(listItem)
if pair.id == id {
foundInList = true
}
}
if !foundInList {
t.Errorf("%v, %v: expected to find list entry for %v, but did not.", testCase, line, pair.id)
continue
}
found, ok, _ := s.Get(pair)
if !ok {
t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id)
continue
}
if e, a := pair.obj, found.(testPair).obj; !reflect.DeepEqual(e, a) {
t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id)
}
}
if e, a := len(pairs), len(list); e != a {
t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a)
}
}
}
}
func TestPoller_Run(t *testing.T) {
stopCh := make(chan struct{})
defer func() { stopCh <- struct{}{} }()
s := NewStore(testPairKeyFunc)
const count = 10
var called = 0
done := make(chan struct{})
NewPoller(func() (Enumerator, error) {
called++
if called == count {
close(done)
}
// test both error and regular returns.
if called&1 == 0 {
return testEnumerator{}, nil
}
return nil, errors.New("transient error")
}, time.Millisecond, s).RunUntil(stopCh)
// The test here is that we get called at least count times.
<-done
// We never added anything, verify that.
if e, a := 0, len(s.List()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -234,12 +234,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
for _, item := range items {
found = append(found, item)
}
myStore, ok := r.store.(*WatchCache)
if ok {
return myStore.ReplaceWithVersion(found, resourceVersion)
}
return r.store.Replace(found)
return r.store.Replace(found, resourceVersion)
}
// watchHandler watches w and keeps *resourceVersion up to date.

View File

@ -43,7 +43,7 @@ type Store interface {
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}) error
Replace([]interface{}, string) error
}
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
@ -193,7 +193,7 @@ func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func (c *cache) Replace(list []interface{}) error {
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{}
for _, item := range list {
key, err := c.keyFunc(item)
@ -202,7 +202,7 @@ func (c *cache) Replace(list []interface{}) error {
}
items[key] = item
}
c.cacheStorage.Replace(items)
c.cacheStorage.Replace(items, resourceVersion)
return nil
}

View File

@ -70,7 +70,7 @@ func doTestStore(t *testing.T, store Store) {
store.Replace([]interface{}{
mkObj("foo", "foo"),
mkObj("bar", "bar"),
})
}, "0")
{
found := util.StringSet{}

View File

@ -41,7 +41,7 @@ type ThreadSafeStore interface {
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{})
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
@ -112,7 +112,7 @@ func (c *threadSafeMap) ListKeys() []string {
return list
}
func (c *threadSafeMap) Replace(items map[string]interface{}) {
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items

View File

@ -66,8 +66,8 @@ func (u *UndeltaStore) Delete(obj interface{}) error {
return nil
}
func (u *UndeltaStore) Replace(list []interface{}) error {
if err := u.Store.Replace(list); err != nil {
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
if err := u.Store.Replace(list, resourceVersion); err != nil {
return err
}
u.PushFunc(u.Store.List())

View File

@ -120,7 +120,7 @@ func TestReplaceCallsPush(t *testing.T) {
m := []interface{}{mkObj("a", 1)}
u.Replace(m)
u.Replace(m, "0")
if callcount != 1 {
t.Errorf("Expected 1 calls, got %d", callcount)
}

View File

@ -215,11 +215,7 @@ func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) {
return w.store.GetByKey(key)
}
func (w *WatchCache) Replace(objs []interface{}) error {
return w.ReplaceWithVersion(objs, "0")
}
func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error {
func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion)
if err != nil {
return err
@ -230,7 +226,7 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri
w.startIndex = 0
w.endIndex = 0
if err := w.store.Replace(objs); err != nil {
if err := w.store.Replace(objs, resourceVersion); err != nil {
return err
}
w.resourceVersion = version

View File

@ -90,7 +90,7 @@ func TestWatchCacheBasic(t *testing.T) {
store.Replace([]interface{}{
makeTestPod("pod4", 7),
makeTestPod("pod5", 8),
})
}, "8")
{
podNames := util.StringSet{}
for _, item := range store.List() {

View File

@ -908,7 +908,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the rc.
podExp.Seen(1, 0)
manager.podStore.Store.Replace(make([]interface{}, 0))
manager.podStore.Store.Replace(make([]interface{}, 0), "0")
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}

View File

@ -131,7 +131,7 @@ func replacePods(pods []*api.Pod, store cache.Store) {
for i := range pods {
found = append(found, pods[i])
}
expectNoError(store.Replace(found))
expectNoError(store.Replace(found, "0"))
}
// getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector,