mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #12268 from wojtek-t/create_watch_cache
Implement watchCache structure.
This commit is contained in:
commit
bb49fdabdc
14
pkg/client/cache/reflector.go
vendored
14
pkg/client/cache/reflector.go
vendored
@ -135,13 +135,13 @@ outer:
|
||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// Run starts a goroutine and returns immediately.
|
||||
func (r *Reflector) Run() {
|
||||
go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period)
|
||||
go util.Forever(func() { r.ListAndWatch(util.NeverStop) }, r.period)
|
||||
}
|
||||
|
||||
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
||||
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
||||
go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh)
|
||||
go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -170,7 +170,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
||||
return t.C, t.Stop
|
||||
}
|
||||
|
||||
func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) {
|
||||
var resourceVersion string
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer cleanup()
|
||||
@ -191,7 +191,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
||||
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err))
|
||||
return
|
||||
}
|
||||
if err := r.syncWith(items); err != nil {
|
||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||
util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err))
|
||||
return
|
||||
}
|
||||
@ -232,12 +232,16 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
// syncWith replaces the store's items with the given list.
|
||||
func (r *Reflector) syncWith(items []runtime.Object) error {
|
||||
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
|
||||
found := make([]interface{}, 0, len(items))
|
||||
for _, item := range items {
|
||||
found = append(found, item)
|
||||
}
|
||||
|
||||
myStore, ok := r.store.(*WatchCache)
|
||||
if ok {
|
||||
return myStore.ReplaceWithVersion(found, resourceVersion)
|
||||
}
|
||||
return r.store.Replace(found)
|
||||
}
|
||||
|
||||
|
41
pkg/client/cache/reflector_test.go
vendored
41
pkg/client/cache/reflector_test.go
vendored
@ -51,7 +51,7 @@ func TestCloseWatchChannelOnError(t *testing.T) {
|
||||
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil
|
||||
},
|
||||
}
|
||||
go r.listAndWatch(util.NeverStop)
|
||||
go r.ListAndWatch(util.NeverStop)
|
||||
fw.Error(pod)
|
||||
select {
|
||||
case _, ok := <-fw.ResultChan():
|
||||
@ -214,7 +214,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflector_listAndWatch(t *testing.T) {
|
||||
func TestReflector_ListAndWatch(t *testing.T) {
|
||||
createdFakes := make(chan *watch.FakeWatcher)
|
||||
|
||||
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
|
||||
@ -239,7 +239,7 @@ func TestReflector_listAndWatch(t *testing.T) {
|
||||
}
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||
go r.listAndWatch(util.NeverStop)
|
||||
go r.ListAndWatch(util.NeverStop)
|
||||
|
||||
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||
var fw *watch.FakeWatcher
|
||||
@ -272,7 +272,7 @@ func TestReflector_listAndWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
||||
func TestReflector_ListAndWatchWithErrors(t *testing.T) {
|
||||
mkPod := func(id string, rv string) *api.Pod {
|
||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: rv}}
|
||||
}
|
||||
@ -356,6 +356,37 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||
r.listAndWatch(util.NeverStop)
|
||||
r.ListAndWatch(util.NeverStop)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorForWatchCache(t *testing.T) {
|
||||
store := NewWatchCache(5)
|
||||
|
||||
{
|
||||
_, version := store.ListWithVersion()
|
||||
if version != 0 {
|
||||
t.Errorf("unexpected resource version: %d", version)
|
||||
}
|
||||
}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go fw.Stop()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &api.Pod{}, store, 0)
|
||||
r.ListAndWatch(util.NeverStop)
|
||||
|
||||
{
|
||||
_, version := store.ListWithVersion()
|
||||
if version != 10 {
|
||||
t.Errorf("unexpected resource version: %d", version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
245
pkg/client/cache/watch_cache.go
vendored
Normal file
245
pkg/client/cache/watch_cache.go
vendored
Normal file
@ -0,0 +1,245 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// watchCacheElement is a single "watch event" stored in a cache.
|
||||
// It contains the resource version of the object and the object
|
||||
// itself.
|
||||
type watchCacheElement struct {
|
||||
resourceVersion uint64
|
||||
event watch.Event
|
||||
}
|
||||
|
||||
// WatchCache implements a Store interface.
|
||||
// However, it depends on the elements implementing runtime.Object interface.
|
||||
//
|
||||
// WatchCache is a "sliding window" (with a limitted capacity) of objects
|
||||
// observed from a watch.
|
||||
type WatchCache struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Maximum size of history window.
|
||||
capacity int
|
||||
|
||||
// cache is used a cyclic buffer - its first element (with the smallest
|
||||
// resourceVersion) is defined by startIndex, its last element is defined
|
||||
// by endIndex (if cache is full it will be startIndex + capacity).
|
||||
// Both startIndex and endIndex can be greater than buffer capacity -
|
||||
// you should always apply modulo capacity to get an index in cache array.
|
||||
cache []watchCacheElement
|
||||
startIndex int
|
||||
endIndex int
|
||||
|
||||
// store will effectively support LIST operation from the "end of cache
|
||||
// history" i.e. from the moment just after the newest cached watched event.
|
||||
// It is necessary to effectively allow clients to start watching at now.
|
||||
store Store
|
||||
|
||||
// ResourceVersion up to which the WatchCache is propagated.
|
||||
resourceVersion uint64
|
||||
|
||||
// This handler is run at the end of every successful Replace() method.
|
||||
onReplace func()
|
||||
|
||||
// This handler is run at the end of every Add/Update/Delete method.
|
||||
onEvent func(watch.Event)
|
||||
}
|
||||
|
||||
func NewWatchCache(capacity int) *WatchCache {
|
||||
return &WatchCache{
|
||||
capacity: capacity,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
endIndex: 0,
|
||||
store: NewStore(MetaNamespaceKeyFunc),
|
||||
resourceVersion: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WatchCache) Add(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{watch.Added, object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Add(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func (w *WatchCache) Update(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{watch.Modified, object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Update(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func (w *WatchCache) Delete(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := watch.Event{watch.Deleted, object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Delete(obj) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
|
||||
}
|
||||
meta, err := meta.Accessor(object)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
resourceVersion, err := strconv.ParseUint(meta.ResourceVersion(), 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return object, resourceVersion, nil
|
||||
}
|
||||
|
||||
func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
if w.onEvent != nil {
|
||||
w.onEvent(event)
|
||||
}
|
||||
w.updateCache(resourceVersion, event)
|
||||
w.resourceVersion = resourceVersion
|
||||
return updateFunc(event.Object)
|
||||
}
|
||||
|
||||
// Assumes that lock is already held for write.
|
||||
func (w *WatchCache) updateCache(resourceVersion uint64, event watch.Event) {
|
||||
if w.endIndex == w.startIndex+w.capacity {
|
||||
// Cache is full - remove the oldest element.
|
||||
w.startIndex++
|
||||
}
|
||||
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
|
||||
w.endIndex++
|
||||
}
|
||||
|
||||
func (w *WatchCache) List() []interface{} {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.List()
|
||||
}
|
||||
|
||||
func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.List(), w.resourceVersion
|
||||
}
|
||||
|
||||
func (w *WatchCache) ListKeys() []string {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.ListKeys()
|
||||
}
|
||||
|
||||
func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.Get(obj)
|
||||
}
|
||||
|
||||
func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.GetByKey(key)
|
||||
}
|
||||
|
||||
func (w *WatchCache) Replace(objs []interface{}) error {
|
||||
return w.ReplaceWithVersion(objs, "0")
|
||||
}
|
||||
|
||||
func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error {
|
||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
w.startIndex = 0
|
||||
w.endIndex = 0
|
||||
if err := w.store.Replace(objs); err != nil {
|
||||
return err
|
||||
}
|
||||
w.resourceVersion = version
|
||||
if w.onReplace != nil {
|
||||
w.onReplace()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WatchCache) SetOnReplace(onReplace func()) {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.onReplace = onReplace
|
||||
}
|
||||
|
||||
func (w *WatchCache) SetOnEvent(onEvent func(watch.Event)) {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.onEvent = onEvent
|
||||
}
|
||||
|
||||
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
size := w.endIndex - w.startIndex
|
||||
oldest := w.resourceVersion
|
||||
if size > 0 {
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
}
|
||||
|
||||
// Binary seatch the smallest index at which resourceVersion is not smaller than
|
||||
// the given one.
|
||||
f := func(i int) bool {
|
||||
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion
|
||||
}
|
||||
if size > 0 && resourceVersion < oldest {
|
||||
return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest)
|
||||
}
|
||||
first := sort.Search(size, f)
|
||||
result := make([]watch.Event, size-first)
|
||||
for i := 0; i < size-first; i++ {
|
||||
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].event
|
||||
}
|
||||
return result, nil
|
||||
}
|
163
pkg/client/cache/watch_cache_test.go
vendored
Normal file
163
pkg/client/cache/watch_cache_test.go
vendored
Normal file
@ -0,0 +1,163 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func makeTestPod(name string, resourceVersion uint64) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: "ns",
|
||||
Name: name,
|
||||
ResourceVersion: strconv.FormatUint(resourceVersion, 10),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchCacheBasic(t *testing.T) {
|
||||
store := NewWatchCache(2)
|
||||
|
||||
// Test Add/Update/Delete.
|
||||
pod1 := makeTestPod("pod", 1)
|
||||
if err := store.Add(pod1); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item, ok, _ := store.Get(pod1); !ok {
|
||||
t.Errorf("didn't find pod")
|
||||
} else {
|
||||
if !api.Semantic.DeepEqual(pod1, item) {
|
||||
t.Errorf("expected %v, got %v", pod1, item)
|
||||
}
|
||||
}
|
||||
pod2 := makeTestPod("pod", 2)
|
||||
if err := store.Update(pod2); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item, ok, _ := store.Get(pod2); !ok {
|
||||
t.Errorf("didn't find pod")
|
||||
} else {
|
||||
if !api.Semantic.DeepEqual(pod2, item) {
|
||||
t.Errorf("expected %v, got %v", pod1, item)
|
||||
}
|
||||
}
|
||||
pod3 := makeTestPod("pod", 3)
|
||||
if err := store.Delete(pod3); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if _, ok, _ := store.Get(pod3); ok {
|
||||
t.Errorf("found pod")
|
||||
}
|
||||
|
||||
// Test List.
|
||||
store.Add(makeTestPod("pod1", 4))
|
||||
store.Add(makeTestPod("pod2", 5))
|
||||
store.Add(makeTestPod("pod3", 6))
|
||||
{
|
||||
podNames := util.StringSet{}
|
||||
for _, item := range store.List() {
|
||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
||||
}
|
||||
if !podNames.HasAll("pod1", "pod2", "pod3") {
|
||||
t.Errorf("missing pods, found %v", podNames)
|
||||
}
|
||||
if len(podNames) != 3 {
|
||||
t.Errorf("found missing/extra items")
|
||||
}
|
||||
}
|
||||
|
||||
// Test Replace.
|
||||
store.Replace([]interface{}{
|
||||
makeTestPod("pod4", 7),
|
||||
makeTestPod("pod5", 8),
|
||||
})
|
||||
{
|
||||
podNames := util.StringSet{}
|
||||
for _, item := range store.List() {
|
||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
||||
}
|
||||
if !podNames.HasAll("pod4", "pod5") {
|
||||
t.Errorf("missing pods, found %v", podNames)
|
||||
}
|
||||
if len(podNames) != 2 {
|
||||
t.Errorf("found missing/extra items")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents(t *testing.T) {
|
||||
store := NewWatchCache(5)
|
||||
|
||||
store.Add(makeTestPod("pod", 2))
|
||||
store.Update(makeTestPod("pod", 3))
|
||||
store.Update(makeTestPod("pod", 4))
|
||||
|
||||
// Test with not full cache.
|
||||
{
|
||||
_, err := store.GetAllEventsSince(1)
|
||||
if err == nil {
|
||||
t.Errorf("expected error too old")
|
||||
}
|
||||
}
|
||||
{
|
||||
result, err := store.GetAllEventsSince(3)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if len(result) != 2 {
|
||||
t.Fatalf("unexpected events: %v", result)
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
pod := makeTestPod("pod", uint64(i+3))
|
||||
if !api.Semantic.DeepEqual(pod, result[i].Object) {
|
||||
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 5; i < 9; i++ {
|
||||
store.Update(makeTestPod("pod", uint64(i)))
|
||||
}
|
||||
|
||||
// Test with full cache - there should be elements from 4 to 8.
|
||||
{
|
||||
_, err := store.GetAllEventsSince(3)
|
||||
if err == nil {
|
||||
t.Errorf("expected error too old")
|
||||
}
|
||||
}
|
||||
{
|
||||
result, err := store.GetAllEventsSince(4)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if len(result) != 5 {
|
||||
t.Fatalf("unexpected events: %v", result)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
pod := makeTestPod("pod", uint64(i+4))
|
||||
if !api.Semantic.DeepEqual(pod, result[i].Object) {
|
||||
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user