mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Implement Cacher for watch in apiserver
This commit is contained in:
parent
8dcbebae5e
commit
e424da7d0d
24
pkg/client/cache/reflector.go
vendored
24
pkg/client/cache/reflector.go
vendored
@ -170,30 +170,27 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
||||
return t.C, t.Stop
|
||||
}
|
||||
|
||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) {
|
||||
// Returns error if ListAndWatch didn't even tried to initialize watch.
|
||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
var resourceVersion string
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer cleanup()
|
||||
|
||||
list, err := r.listerWatcher.List()
|
||||
if err != nil {
|
||||
util.HandleError(fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err))
|
||||
return
|
||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
meta, err := meta.Accessor(list)
|
||||
if err != nil {
|
||||
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v", r.name, list))
|
||||
return
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)
|
||||
}
|
||||
resourceVersion = meta.ResourceVersion()
|
||||
items, err := runtime.ExtractList(list)
|
||||
if err != nil {
|
||||
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err))
|
||||
return
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
||||
}
|
||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||
util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err))
|
||||
return
|
||||
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
||||
}
|
||||
r.setLastSyncResourceVersion(resourceVersion)
|
||||
|
||||
@ -220,13 +217,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
|
||||
if err != errorResyncRequested && err != errorStopRequested {
|
||||
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -277,6 +274,7 @@ loop:
|
||||
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||
continue
|
||||
}
|
||||
newResourceVersion := meta.ResourceVersion()
|
||||
switch event.Type {
|
||||
case watch.Added:
|
||||
r.store.Add(event.Object)
|
||||
@ -290,8 +288,8 @@ loop:
|
||||
default:
|
||||
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||
}
|
||||
*resourceVersion = meta.ResourceVersion()
|
||||
r.setLastSyncResourceVersion(*resourceVersion)
|
||||
*resourceVersion = newResourceVersion
|
||||
r.setLastSyncResourceVersion(newResourceVersion)
|
||||
eventCount++
|
||||
}
|
||||
}
|
||||
|
17
pkg/client/cache/watch_cache.go
vendored
17
pkg/client/cache/watch_cache.go
vendored
@ -123,13 +123,20 @@ func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, er
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
resourceVersion, err := strconv.ParseUint(meta.ResourceVersion(), 10, 64)
|
||||
resourceVersion, err := parseResourceVersion(meta.ResourceVersion())
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return object, resourceVersion, nil
|
||||
}
|
||||
|
||||
func parseResourceVersion(resourceVersion string) (uint64, error) {
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
@ -186,7 +193,7 @@ func (w *WatchCache) Replace(objs []interface{}) error {
|
||||
}
|
||||
|
||||
func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error {
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
version, err := parseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -227,15 +234,15 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, e
|
||||
if size > 0 {
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
}
|
||||
if resourceVersion < oldest {
|
||||
return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest)
|
||||
}
|
||||
|
||||
// Binary seatch the smallest index at which resourceVersion is not smaller than
|
||||
// the given one.
|
||||
f := func(i int) bool {
|
||||
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion
|
||||
}
|
||||
if size > 0 && resourceVersion < oldest {
|
||||
return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest)
|
||||
}
|
||||
first := sort.Search(size, f)
|
||||
result := make([]watch.Event, size-first)
|
||||
for i := 0; i < size-first; i++ {
|
||||
|
334
pkg/storage/cacher.go
Normal file
334
pkg/storage/cacher.go
Normal file
@ -0,0 +1,334 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// CacherConfig contains the configuration for a given Cache.
|
||||
type CacherConfig struct {
|
||||
// Maximum size of the history cached in memory.
|
||||
CacheCapacity int
|
||||
|
||||
// An underlying storage.Interface.
|
||||
Storage Interface
|
||||
|
||||
// An underlying storage.Versioner.
|
||||
Versioner Versioner
|
||||
|
||||
// The Cache will be caching objects of a given Type and assumes that they
|
||||
// are all stored under ResourcePrefix directory in the underlying database.
|
||||
Type interface{}
|
||||
ResourcePrefix string
|
||||
|
||||
// KeyFunc is used to get a key in the underyling storage for a given object.
|
||||
KeyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// NewList is a function that creates new empty object storing a list of
|
||||
// objects of type Type.
|
||||
NewListFunc func() runtime.Object
|
||||
|
||||
// Cacher will be stopped when the StopChannel will be closed.
|
||||
StopChannel <-chan struct{}
|
||||
}
|
||||
|
||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||
// resource from its internal cache and updating its cache in the background
|
||||
// based on the underlying storage contents.
|
||||
type Cacher struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Whether Cacher is initialized.
|
||||
initialized sync.WaitGroup
|
||||
initOnce sync.Once
|
||||
|
||||
// "sliding window" of recent changes of objects and the current state.
|
||||
watchCache *cache.WatchCache
|
||||
reflector *cache.Reflector
|
||||
|
||||
// Registered watchers.
|
||||
watcherIdx int
|
||||
watchers map[int]*cacheWatcher
|
||||
|
||||
// Versioner is used to handle resource versions.
|
||||
versioner Versioner
|
||||
|
||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
||||
keyFunc func(runtime.Object) (string, error)
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacher(config CacherConfig) *Cacher {
|
||||
watchCache := cache.NewWatchCache(config.CacheCapacity)
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
cacher := &Cacher{
|
||||
initialized: sync.WaitGroup{},
|
||||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
watcherIdx: 0,
|
||||
watchers: make(map[int]*cacheWatcher),
|
||||
versioner: config.Versioner,
|
||||
keyFunc: config.KeyFunc,
|
||||
}
|
||||
cacher.initialized.Add(1)
|
||||
// See startCaching method for why explanation on it.
|
||||
watchCache.SetOnReplace(func() {
|
||||
cacher.initOnce.Do(func() { cacher.initialized.Done() })
|
||||
cacher.Unlock()
|
||||
})
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
|
||||
stopCh := config.StopChannel
|
||||
go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh)
|
||||
cacher.initialized.Wait()
|
||||
return cacher
|
||||
}
|
||||
|
||||
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||
c.Lock()
|
||||
c.terminateAllWatchers()
|
||||
// We explicitly do NOT Unlock() in this method.
|
||||
// This is because we do not want to allow any WATCH/LIST methods before
|
||||
// the cache is initialized. Once the underlying cache is propagated,
|
||||
// onReplace handler will be called, which will do the Unlock() as
|
||||
// configured in NewCacher().
|
||||
// Note: the same bahavior is also triggered every time we fall out of
|
||||
// backen storage (e.g. etcd's) watch event window.
|
||||
// Note that since onReplace may be not called due to errors, we explicitly
|
||||
// need to retry it on errors under lock.
|
||||
for {
|
||||
err := c.reflector.ListAndWatch(stopChannel)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implements Watch (signature from storage.Interface).
|
||||
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
initEvents, err := c.watchCache.GetAllEventsSince(resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
||||
c.watchers[c.watcherIdx] = watcher
|
||||
c.watcherIdx++
|
||||
return watcher, nil
|
||||
}
|
||||
|
||||
// Implements WatchList (signature from storage.Interface).
|
||||
func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
return c.Watch(key, resourceVersion, filter)
|
||||
}
|
||||
|
||||
// Implements List (signature from storage.Interface).
|
||||
func (c *Cacher) List(key string, listObj runtime.Object) error {
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listVal, err := conversion.EnforcePtr(listPtr)
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterFunction(key, c.keyFunc, Everything)
|
||||
|
||||
objs, resourceVersion := c.watchCache.ListWithVersion()
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
||||
}
|
||||
if filter(object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
}
|
||||
}
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, resourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cacher) processEvent(event watch.Event) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, watcher := range c.watchers {
|
||||
watcher.add(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cacher) terminateAllWatchers() {
|
||||
for key, watcher := range c.watchers {
|
||||
delete(c.watchers, key)
|
||||
watcher.stop()
|
||||
}
|
||||
}
|
||||
|
||||
func forgetWatcher(c *Cacher, index int) func() {
|
||||
return func() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// It's possible that the watcher is already not in the map (e.g. in case of
|
||||
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
|
||||
delete(c.watchers, index)
|
||||
}
|
||||
}
|
||||
|
||||
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc {
|
||||
return func(obj runtime.Object) bool {
|
||||
objKey, err := keyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Invalid object for filter: %v", obj)
|
||||
return false
|
||||
}
|
||||
if !strings.HasPrefix(objKey, key) {
|
||||
return false
|
||||
}
|
||||
return filter(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns resource version to which the underlying cache is synced.
|
||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
resourceVersion := c.reflector.LastSyncResourceVersion()
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
||||
type cacherListerWatcher struct {
|
||||
storage Interface
|
||||
resourcePrefix string
|
||||
newListFunc func() runtime.Object
|
||||
}
|
||||
|
||||
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
||||
return &cacherListerWatcher{
|
||||
storage: storage,
|
||||
resourcePrefix: resourcePrefix,
|
||||
newListFunc: newListFunc,
|
||||
}
|
||||
}
|
||||
|
||||
// Implements cache.ListerWatcher interface.
|
||||
func (lw *cacherListerWatcher) List() (runtime.Object, error) {
|
||||
list := lw.newListFunc()
|
||||
if err := lw.storage.List(lw.resourcePrefix, list); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
// Implements cache.ListerWatcher interface.
|
||||
func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, error) {
|
||||
version, err := ParseWatchResourceVersion(resourceVersion, lw.resourcePrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lw.storage.WatchList(lw.resourcePrefix, version, Everything)
|
||||
}
|
||||
|
||||
// cacherWatch implements watch.Interface
|
||||
type cacheWatcher struct {
|
||||
sync.Mutex
|
||||
input chan watch.Event
|
||||
result chan watch.Event
|
||||
filter FilterFunc
|
||||
stopped bool
|
||||
forget func()
|
||||
}
|
||||
|
||||
func newCacheWatcher(initEvents []watch.Event, filter FilterFunc, forget func()) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan watch.Event, 10),
|
||||
result: make(chan watch.Event, 10),
|
||||
filter: filter,
|
||||
stopped: false,
|
||||
forget: forget,
|
||||
}
|
||||
go watcher.process(initEvents)
|
||||
return watcher
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
||||
return c.result
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *cacheWatcher) Stop() {
|
||||
c.forget()
|
||||
c.stop()
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) stop() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if !c.stopped {
|
||||
c.stopped = true
|
||||
close(c.input)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) add(event watch.Event) {
|
||||
c.input <- event
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) process(initEvents []watch.Event) {
|
||||
for _, event := range initEvents {
|
||||
if c.filter(event.Object) {
|
||||
c.result <- event
|
||||
}
|
||||
}
|
||||
defer close(c.result)
|
||||
defer c.Stop()
|
||||
for {
|
||||
event, ok := <-c.input
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if c.filter(event.Object) {
|
||||
c.result <- event
|
||||
}
|
||||
}
|
||||
}
|
337
pkg/storage/cacher_test.go
Normal file
337
pkg/storage/cacher_test.go
Normal file
@ -0,0 +1,337 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func newTestCacher(client tools.EtcdClient) *storage.Cacher {
|
||||
prefix := "pods"
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 10,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Storage: etcdstorage.NewEtcdStorage(client, testapi.Codec(), etcdtest.PathPrefix()),
|
||||
Type: &api.Pod{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
StopChannel: util.NeverStop,
|
||||
}
|
||||
return storage.NewCacher(config)
|
||||
}
|
||||
|
||||
func makeTestPod(name string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name},
|
||||
Spec: api.PodSpec{
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error {
|
||||
ready := func() (bool, error) {
|
||||
result, err := cacher.LastSyncResourceVersion()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return result == resourceVersion, nil
|
||||
}
|
||||
return wait.Poll(10*time.Millisecond, 100*time.Millisecond, ready)
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
fakeClient.ExpectNotFoundGet(prefixedKey)
|
||||
cacher := newTestCacher(fakeClient)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
podFoo := makeTestPod("foo")
|
||||
podBar := makeTestPod("bar")
|
||||
podBaz := makeTestPod("baz")
|
||||
|
||||
podFooPrime := makeTestPod("foo")
|
||||
podFooPrime.Spec.NodeName = "fakeNode"
|
||||
|
||||
testCases := []*etcd.Response{
|
||||
{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)),
|
||||
CreatedIndex: 2,
|
||||
ModifiedIndex: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podBaz)),
|
||||
CreatedIndex: 3,
|
||||
ModifiedIndex: 3,
|
||||
},
|
||||
},
|
||||
{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooPrime)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 4,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Action: "delete",
|
||||
Node: &etcd.Node{
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 5,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Propagate some data to etcd.
|
||||
for _, test := range testCases {
|
||||
fakeClient.WatchResponse <- test
|
||||
}
|
||||
if err := waitForUpToDateCache(cacher, 5); err != nil {
|
||||
t.Errorf("watch cache didn't propagated correctly: %v", err)
|
||||
}
|
||||
|
||||
result := &api.PodList{}
|
||||
if err := cacher.List("pods/ns", result); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if result.ListMeta.ResourceVersion != "5" {
|
||||
t.Errorf("incorrect resource version: %v", result.ListMeta.ResourceVersion)
|
||||
}
|
||||
if len(result.Items) != 2 {
|
||||
t.Errorf("unexpected list result: %d", len(result.Items))
|
||||
}
|
||||
keys := util.StringSet{}
|
||||
for _, item := range result.Items {
|
||||
keys.Insert(item.ObjectMeta.Name)
|
||||
}
|
||||
if !keys.HasAll("foo", "baz") {
|
||||
t.Errorf("unexpected list result: %#v", result)
|
||||
}
|
||||
for _, item := range result.Items {
|
||||
// unset fields that are set by the infrastructure
|
||||
item.ObjectMeta.ResourceVersion = ""
|
||||
item.ObjectMeta.CreationTimestamp = util.Time{}
|
||||
|
||||
var expected *api.Pod
|
||||
switch item.ObjectMeta.Name {
|
||||
case "foo":
|
||||
expected = podFooPrime
|
||||
case "baz":
|
||||
expected = podBaz
|
||||
default:
|
||||
t.Errorf("unexpected item: %v", item)
|
||||
}
|
||||
if e, a := *expected, item; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
close(fakeClient.WatchResponse)
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
fakeClient.ExpectNotFoundGet(prefixedKey)
|
||||
cacher := newTestCacher(fakeClient)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
podFoo := makeTestPod("foo")
|
||||
podBar := makeTestPod("bar")
|
||||
|
||||
testCases := []struct {
|
||||
object *api.Pod
|
||||
etcdResponse *etcd.Response
|
||||
event watch.EventType
|
||||
filtered bool
|
||||
}{
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
event: watch.Added,
|
||||
filtered: true,
|
||||
},
|
||||
{
|
||||
object: podBar,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)),
|
||||
CreatedIndex: 2,
|
||||
ModifiedIndex: 2,
|
||||
},
|
||||
},
|
||||
event: watch.Added,
|
||||
filtered: false,
|
||||
},
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 3,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
event: watch.Modified,
|
||||
filtered: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Set up Watch for object "podFoo".
|
||||
watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
fakeClient.WatchResponse <- test.etcdResponse
|
||||
if test.filtered {
|
||||
event := <-watcher.ResultChan()
|
||||
if e, a := test.event, event.Type; e != a {
|
||||
t.Errorf("%v %v", e, a)
|
||||
}
|
||||
// unset fields that are set by the infrastructure
|
||||
obj := event.Object.(*api.Pod)
|
||||
obj.ObjectMeta.ResourceVersion = ""
|
||||
obj.ObjectMeta.CreationTimestamp = util.Time{}
|
||||
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether we get too-old error.
|
||||
_, err = cacher.Watch("pods/ns/foo", 0, storage.Everything)
|
||||
if err == nil {
|
||||
t.Errorf("expected 'error too old' error")
|
||||
}
|
||||
|
||||
// Now test watch with initial state.
|
||||
initialWatcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
for _, test := range testCases {
|
||||
if test.filtered {
|
||||
event := <-initialWatcher.ResultChan()
|
||||
if e, a := test.event, event.Type; e != a {
|
||||
t.Errorf("%v %v", e, a)
|
||||
}
|
||||
// unset fields that are set by the infrastructure
|
||||
obj := event.Object.(*api.Pod)
|
||||
obj.ObjectMeta.ResourceVersion = ""
|
||||
obj.ObjectMeta.CreationTimestamp = util.Time{}
|
||||
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(fakeClient.WatchResponse)
|
||||
}
|
||||
|
||||
func TestStorageError(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
fakeClient.ExpectNotFoundGet(prefixedKey)
|
||||
cacher := newTestCacher(fakeClient)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
podFoo := makeTestPod("foo")
|
||||
|
||||
// Set up Watch for object "podFoo".
|
||||
watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
}
|
||||
_ = <-watcher.ResultChan()
|
||||
|
||||
// Injecting error is simulating error from etcd.
|
||||
// This is almost the same what would happen e.g. in case of
|
||||
// "error too old" when reconnecting to etcd watch.
|
||||
fakeClient.WatchInjectError <- fmt.Errorf("fake error")
|
||||
|
||||
_, ok := <-watcher.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("unexpected event")
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/fielderrors"
|
||||
)
|
||||
@ -49,3 +50,11 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
||||
}
|
||||
return version + 1, nil
|
||||
}
|
||||
|
||||
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return prefix + "/" + meta.Namespace() + "/" + meta.Name(), nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user