mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Add easy setup for simple controller
Also add tests; coverage up to 86.7%
This commit is contained in:
parent
065a8fa454
commit
880f922bb6
4
pkg/client/cache/store.go
vendored
4
pkg/client/cache/store.go
vendored
@ -59,6 +59,10 @@ func (k KeyError) Error() string {
|
|||||||
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
|
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
|
||||||
|
// the object but not the object itself.
|
||||||
|
type ExplicitKey string
|
||||||
|
|
||||||
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
|
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
|
||||||
// keys for API objects which implement meta.Interface.
|
// keys for API objects which implement meta.Interface.
|
||||||
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
|
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
|
||||||
|
@ -102,3 +102,126 @@ func (c *Controller) processLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResourceEventHandler can handle notifications for events that happen to a
|
||||||
|
// resource. The events are informational only, so you can't return an
|
||||||
|
// error.
|
||||||
|
// * OnAdd is called when an object is added.
|
||||||
|
// * OnUpdate is called when an object is modified. Note that oldObj is the
|
||||||
|
// last known state of the object-- it is possible that several changes
|
||||||
|
// were combined together, so you can't use this to see every single
|
||||||
|
// change. OnUpdate is also called when a re-list happens, and it will
|
||||||
|
// get called even if nothing changed. This is useful for periodically
|
||||||
|
// evaluating or syncing something.
|
||||||
|
// * OnDelete will get the final state of the item if it is known, otherwise
|
||||||
|
// it will get an object of type cache.DeletedFinalStateUnknown.
|
||||||
|
type ResourceEventHandler interface {
|
||||||
|
OnAdd(obj interface{})
|
||||||
|
OnUpdate(oldObj, newObj interface{})
|
||||||
|
OnDelete(obj interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
|
||||||
|
// as few of the notification functions as you want while still implementing
|
||||||
|
// ResourceEventHandler.
|
||||||
|
type ResourceEventHandlerFuncs struct {
|
||||||
|
AddFunc func(obj interface{})
|
||||||
|
UpdateFunc func(oldObj, newObj interface{})
|
||||||
|
DeleteFunc func(obj interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnAdd calls AddFunc if it's not nil.
|
||||||
|
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
|
||||||
|
if r.AddFunc != nil {
|
||||||
|
r.AddFunc(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnUpdate calls UpdateFunc if it's not nil.
|
||||||
|
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
|
||||||
|
if r.UpdateFunc != nil {
|
||||||
|
r.UpdateFunc(oldObj, newObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnDelete calls DeleteFunc if it's not nil.
|
||||||
|
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
|
||||||
|
if r.DeleteFunc != nil {
|
||||||
|
r.DeleteFunc(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeletionHandlingMetaNamespaceKeyFunc checks for
|
||||||
|
// cache.DeletedFinalStateUnknown objects before calling
|
||||||
|
// cache.MetaNamespaceKeyFunc.
|
||||||
|
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||||
|
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||||
|
return d.Key, nil
|
||||||
|
}
|
||||||
|
return cache.MetaNamespaceKeyFunc(obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInformer returns a cache.Store and a controller for populating the store
|
||||||
|
// while also providing event notifications. You should only used the returned
|
||||||
|
// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
|
||||||
|
// notifications to be faulty.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// * lw is list and watch functions for the source of the resource you want to
|
||||||
|
// be informed of.
|
||||||
|
// * objType is an object of the type that you expect to receieve.
|
||||||
|
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
|
||||||
|
// calls, even if nothing changed). Otherwise, re-list will be delayed as
|
||||||
|
// long as possible (until the upstream source closes the watch or times out,
|
||||||
|
// or you stop the controller).
|
||||||
|
// * h is the object you want notifications sent to.
|
||||||
|
//
|
||||||
|
func NewInformer(
|
||||||
|
lw cache.ListerWatcher,
|
||||||
|
objType runtime.Object,
|
||||||
|
resyncPeriod time.Duration,
|
||||||
|
h ResourceEventHandler,
|
||||||
|
) (cache.Store, *Controller) {
|
||||||
|
// This will hold the client state, as we know it.
|
||||||
|
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
||||||
|
|
||||||
|
// This will hold incoming changes. Note how we pass clientState in as a
|
||||||
|
// KeyLister, that way resync operations will result in the correct set
|
||||||
|
// of update/delete deltas.
|
||||||
|
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
Queue: fifo,
|
||||||
|
ListerWatcher: lw,
|
||||||
|
ObjectType: objType,
|
||||||
|
FullResyncPeriod: resyncPeriod,
|
||||||
|
RetryOnError: false,
|
||||||
|
|
||||||
|
Process: func(obj interface{}) error {
|
||||||
|
// from oldest to newest
|
||||||
|
for _, d := range obj.(cache.Deltas) {
|
||||||
|
switch d.Type {
|
||||||
|
case cache.Sync, cache.Added, cache.Updated:
|
||||||
|
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
|
||||||
|
if err := clientState.Update(d.Object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.OnUpdate(old, d.Object)
|
||||||
|
} else {
|
||||||
|
if err := clientState.Add(d.Object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.OnAdd(d.Object)
|
||||||
|
}
|
||||||
|
case cache.Deleted:
|
||||||
|
if err := clientState.Delete(d.Object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.OnDelete(d.Object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return clientState, New(cfg)
|
||||||
|
}
|
||||||
|
@ -18,15 +18,18 @@ package framework_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
// "testing"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
|
||||||
|
"github.com/google/gofuzz"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Example() {
|
func Example() {
|
||||||
@ -34,7 +37,7 @@ func Example() {
|
|||||||
source := framework.NewFakeControllerSource()
|
source := framework.NewFakeControllerSource()
|
||||||
|
|
||||||
// This will hold the downstream state, as we know it.
|
// This will hold the downstream state, as we know it.
|
||||||
downstream := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
|
||||||
|
|
||||||
// This will hold incoming changes. Note how we pass downstream in as a
|
// This will hold incoming changes. Note how we pass downstream in as a
|
||||||
// KeyLister, that way resync operations will result in the correct set
|
// KeyLister, that way resync operations will result in the correct set
|
||||||
@ -113,3 +116,272 @@ func Example() {
|
|||||||
// b-controller
|
// b-controller
|
||||||
// c-framework
|
// c-framework
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExampleInformer() {
|
||||||
|
// source simulates an apiserver object endpoint.
|
||||||
|
source := framework.NewFakeControllerSource()
|
||||||
|
|
||||||
|
// Let's do threadsafe output to get predictable test results.
|
||||||
|
outputSetLock := sync.Mutex{}
|
||||||
|
outputSet := util.StringSet{}
|
||||||
|
|
||||||
|
// Make a controller that immediately deletes anything added to it, and
|
||||||
|
// logs anything deleted.
|
||||||
|
_, controller := framework.NewInformer(
|
||||||
|
source,
|
||||||
|
&api.Pod{},
|
||||||
|
time.Millisecond*100,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
source.Delete(obj.(runtime.Object))
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
key = "oops something went wrong with the key"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record some output when items are deleted.
|
||||||
|
outputSetLock.Lock()
|
||||||
|
defer outputSetLock.Unlock()
|
||||||
|
outputSet.Insert(key)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Run the controller and run it until we close stop.
|
||||||
|
stop := make(chan struct{})
|
||||||
|
controller.Run(stop)
|
||||||
|
|
||||||
|
// Let's add a few objects to the source.
|
||||||
|
for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
|
||||||
|
// Note that these pods are not valid-- the fake source doesn't
|
||||||
|
// call validation or perform any other checking.
|
||||||
|
source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's wait for the controller to process the things we just added.
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
close(stop)
|
||||||
|
|
||||||
|
outputSetLock.Lock()
|
||||||
|
for _, key := range outputSet.List() {
|
||||||
|
fmt.Println(key)
|
||||||
|
}
|
||||||
|
// Output:
|
||||||
|
// a-hello
|
||||||
|
// b-controller
|
||||||
|
// c-framework
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHammerController(t *testing.T) {
|
||||||
|
// This test executes a bunch of requests through the fake source and
|
||||||
|
// controller framework to make sure there's no locking/threading
|
||||||
|
// errors. If an error happens, it should hang forever or trigger the
|
||||||
|
// race detector.
|
||||||
|
|
||||||
|
// source simulates an apiserver object endpoint.
|
||||||
|
source := framework.NewFakeControllerSource()
|
||||||
|
|
||||||
|
// Let's do threadsafe output to get predictable test results.
|
||||||
|
outputSetLock := sync.Mutex{}
|
||||||
|
// map of key to operations done on the key
|
||||||
|
outputSet := map[string][]string{}
|
||||||
|
|
||||||
|
recordFunc := func(eventType string, obj interface{}) {
|
||||||
|
key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("something wrong with key: %v", err)
|
||||||
|
key = "oops something went wrong with the key"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record some output when items are deleted.
|
||||||
|
outputSetLock.Lock()
|
||||||
|
defer outputSetLock.Unlock()
|
||||||
|
outputSet[key] = append(outputSet[key], eventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a controller which just logs all the changes it gets.
|
||||||
|
_, controller := framework.NewInformer(
|
||||||
|
source,
|
||||||
|
&api.Pod{},
|
||||||
|
time.Millisecond*100,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) { recordFunc("add", obj) },
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
|
||||||
|
DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Run the controller and run it until we close stop.
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go controller.Run(stop)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
// Let's add a few objects to the source.
|
||||||
|
currentNames := util.StringSet{}
|
||||||
|
rs := rand.NewSource(rand.Int63())
|
||||||
|
f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
|
||||||
|
r := rand.New(rs) // Mustn't use r and f concurrently!
|
||||||
|
for i := 0; i < 750; i++ {
|
||||||
|
var name string
|
||||||
|
var isNew bool
|
||||||
|
if currentNames.Len() == 0 || r.Intn(3) == 1 {
|
||||||
|
f.Fuzz(&name)
|
||||||
|
isNew = true
|
||||||
|
} else {
|
||||||
|
l := currentNames.List()
|
||||||
|
name = l[r.Intn(len(l))]
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := &api.Pod{}
|
||||||
|
f.Fuzz(pod)
|
||||||
|
pod.ObjectMeta.Name = name
|
||||||
|
pod.ObjectMeta.Namespace = "default"
|
||||||
|
// Add, update, or delete randomly.
|
||||||
|
// Note that these pods are not valid-- the fake source doesn't
|
||||||
|
// call validation or perform any other checking.
|
||||||
|
if isNew {
|
||||||
|
currentNames.Insert(name)
|
||||||
|
source.Add(pod)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch r.Intn(2) {
|
||||||
|
case 0:
|
||||||
|
currentNames.Insert(name)
|
||||||
|
source.Modify(pod)
|
||||||
|
case 1:
|
||||||
|
currentNames.Delete(name)
|
||||||
|
source.Delete(pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Let's wait for the controller to finish processing the things we just added.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
close(stop)
|
||||||
|
|
||||||
|
outputSetLock.Lock()
|
||||||
|
t.Logf("got: %#v", outputSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdate(t *testing.T) {
|
||||||
|
// This test is going to exercise the various paths that result in a
|
||||||
|
// call to update.
|
||||||
|
|
||||||
|
// source simulates an apiserver object endpoint.
|
||||||
|
source := framework.NewFakeControllerSource()
|
||||||
|
|
||||||
|
const (
|
||||||
|
FROM = "from"
|
||||||
|
ADD_MISSED = "missed the add event"
|
||||||
|
TO = "to"
|
||||||
|
)
|
||||||
|
|
||||||
|
// These are the transitions we expect to see; because this is
|
||||||
|
// asynchronous, there are a lot of valid possibilities.
|
||||||
|
type pair struct{ from, to string }
|
||||||
|
allowedTransitions := map[pair]bool{
|
||||||
|
pair{FROM, TO}: true,
|
||||||
|
pair{FROM, ADD_MISSED}: true,
|
||||||
|
pair{ADD_MISSED, TO}: true,
|
||||||
|
|
||||||
|
// Because a resync can happen when we've already observed one
|
||||||
|
// of the above but before the item is deleted.
|
||||||
|
pair{TO, TO}: true,
|
||||||
|
// Because a resync could happen before we observe an update.
|
||||||
|
pair{FROM, FROM}: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var testDoneWG sync.WaitGroup
|
||||||
|
|
||||||
|
// Make a controller that immediately deletes anything added to it, and
|
||||||
|
// logs anything deleted.
|
||||||
|
_, controller := framework.NewInformer(
|
||||||
|
source,
|
||||||
|
&api.Pod{},
|
||||||
|
time.Millisecond*1,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
|
||||||
|
from, to := o.Labels["check"], n.Labels["check"]
|
||||||
|
if !allowedTransitions[pair{from, to}] {
|
||||||
|
t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
|
||||||
|
}
|
||||||
|
source.Delete(n)
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
testDoneWG.Done()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Run the controller and run it until we close stop.
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go controller.Run(stop)
|
||||||
|
|
||||||
|
pod := func(name, check string) *api.Pod {
|
||||||
|
return &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Labels: map[string]string{"check": check},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
tests := []func(string){
|
||||||
|
func(name string) {
|
||||||
|
defer wg.Done()
|
||||||
|
testDoneWG.Add(1)
|
||||||
|
name = "a-" + name
|
||||||
|
source.Add(pod(name, FROM))
|
||||||
|
source.Modify(pod(name, TO))
|
||||||
|
},
|
||||||
|
func(name string) {
|
||||||
|
defer wg.Done()
|
||||||
|
testDoneWG.Add(1)
|
||||||
|
name = "b-" + name
|
||||||
|
source.Add(pod(name, FROM))
|
||||||
|
source.ModifyDropWatch(pod(name, TO))
|
||||||
|
},
|
||||||
|
func(name string) {
|
||||||
|
defer wg.Done()
|
||||||
|
testDoneWG.Add(1)
|
||||||
|
name = "c-" + name
|
||||||
|
source.AddDropWatch(pod(name, FROM))
|
||||||
|
source.Modify(pod(name, ADD_MISSED))
|
||||||
|
source.Modify(pod(name, TO))
|
||||||
|
},
|
||||||
|
func(name string) {
|
||||||
|
defer wg.Done()
|
||||||
|
testDoneWG.Add(1)
|
||||||
|
name = "d-" + name
|
||||||
|
source.Add(pod(name, FROM))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// run every test a few times, in parallel
|
||||||
|
fuzzer := fuzz.New()
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
for _, f := range tests {
|
||||||
|
wg.Add(1)
|
||||||
|
var name string
|
||||||
|
for len(name) < 10 {
|
||||||
|
fuzzer.Fuzz(&name)
|
||||||
|
}
|
||||||
|
go f(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Let's wait for the controller to process the things we just added.
|
||||||
|
testDoneWG.Wait()
|
||||||
|
close(stop)
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ package framework
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -51,26 +52,49 @@ type nnu struct {
|
|||||||
// Add adds an object to the set and sends an add event to watchers.
|
// Add adds an object to the set and sends an add event to watchers.
|
||||||
// obj's ResourceVersion is set.
|
// obj's ResourceVersion is set.
|
||||||
func (f *FakeControllerSource) Add(obj runtime.Object) {
|
func (f *FakeControllerSource) Add(obj runtime.Object) {
|
||||||
f.change(watch.Event{watch.Added, obj})
|
f.Change(watch.Event{watch.Added, obj}, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Modify updates an object in the set and sends a modified event to watchers.
|
// Modify updates an object in the set and sends a modified event to watchers.
|
||||||
// obj's ResourceVersion is set.
|
// obj's ResourceVersion is set.
|
||||||
func (f *FakeControllerSource) Modify(obj runtime.Object) {
|
func (f *FakeControllerSource) Modify(obj runtime.Object) {
|
||||||
f.change(watch.Event{watch.Modified, obj})
|
f.Change(watch.Event{watch.Modified, obj}, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes an object from the set and sends a delete event to watchers.
|
// Delete deletes an object from the set and sends a delete event to watchers.
|
||||||
// obj's ResourceVersion is set.
|
// obj's ResourceVersion is set.
|
||||||
func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
|
func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
|
||||||
f.change(watch.Event{watch.Deleted, lastValue})
|
f.Change(watch.Event{watch.Deleted, lastValue}, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddDropWatch adds an object to the set but forgets to send an add event to
|
||||||
|
// watchers.
|
||||||
|
// obj's ResourceVersion is set.
|
||||||
|
func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) {
|
||||||
|
f.Change(watch.Event{watch.Added, obj}, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModifyDropWatch updates an object in the set but forgets to send a modify
|
||||||
|
// event to watchers.
|
||||||
|
// obj's ResourceVersion is set.
|
||||||
|
func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) {
|
||||||
|
f.Change(watch.Event{watch.Modified, obj}, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteDropWatch deletes an object from the set but forgets to send a delete
|
||||||
|
// event to watchers.
|
||||||
|
// obj's ResourceVersion is set.
|
||||||
|
func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) {
|
||||||
|
f.Change(watch.Event{watch.Deleted, lastValue}, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu {
|
func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu {
|
||||||
return nnu{meta.Namespace, meta.Name, meta.UID}
|
return nnu{meta.Namespace, meta.Name, meta.UID}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeControllerSource) change(e watch.Event) {
|
// Change records the given event (setting the object's resource version) and
|
||||||
|
// sends a watch event with the specified probability.
|
||||||
|
func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
@ -89,7 +113,10 @@ func (f *FakeControllerSource) change(e watch.Event) {
|
|||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
delete(f.items, key)
|
delete(f.items, key)
|
||||||
}
|
}
|
||||||
f.broadcaster.Action(e.Type, e.Object)
|
|
||||||
|
if rand.Float64() < watchProbability {
|
||||||
|
f.broadcaster.Action(e.Type, e.Object)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list object, with its resource version set.
|
// List returns a list object, with its resource version set.
|
||||||
|
Loading…
Reference in New Issue
Block a user