Add UntilWithSync (informer based)

Kubernetes-commit: 866cc1acab6c1c30a7550b2de7160c8052be884d
This commit is contained in:
Tomas Nozicka 2018-08-03 16:45:41 +02:00 committed by Kubernetes Publisher
parent 6dac86f710
commit 4528feba43
4 changed files with 521 additions and 0 deletions

View File

@ -0,0 +1,114 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newTicketer() *ticketer {
return &ticketer{
cond: sync.NewCond(&sync.Mutex{}),
}
}
type ticketer struct {
counter uint64
cond *sync.Cond
current uint64
}
func (t *ticketer) GetTicket() uint64 {
// -1 to start from 0
return atomic.AddUint64(&t.counter, 1) - 1
}
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
t.cond.L.Lock()
defer t.cond.L.Unlock()
for ticket != t.current {
t.cond.Wait()
}
f()
t.current++
t.cond.Broadcast()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
t := newTicketer()
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
UpdateFunc: func(old, new interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
DeleteFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using watch API based on watch.Event
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
select {
case ch <- watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
}, cache.Indexers{})
go func() {
informer.Run(w.StopChan())
}()
return indexer, informer, w
}

View File

@ -0,0 +1,236 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"math/rand"
"reflect"
"sort"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
type byEventTypeAndName []watch.Event
func (a byEventTypeAndName) Len() int { return len(a) }
func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byEventTypeAndName) Less(i, j int) bool {
if a[i].Type < a[j].Type {
return true
}
if a[i].Type > a[j].Type {
return false
}
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func TestTicketer(t *testing.T) {
tg := newTicketer()
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
var tickets []uint64
for i := 0; i < numTickets; i++ {
ticket := tg.GetTicket()
tickets = append(tickets, ticket)
exp, got := uint64(i), ticket
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
// shuffle tickets
rand.Shuffle(len(tickets), func(i, j int) {
tickets[i], tickets[j] = tickets[j], tickets[i]
})
res := make(chan uint64, len(tickets))
for _, ticket := range tickets {
go func(ticket uint64) {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
tg.WaitForTicket(ticket, func() {
res <- ticket
})
}(ticket)
}
for i := 0; i < numTickets; i++ {
exp, got := uint64(i), <-res
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {
name string
objects []runtime.Object
events []watch.Event
}{
{
name: "basic test",
objects: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
},
StringData: map[string]string{
"foo-1": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
StringData: map[string]string{
"foo-3": "initial",
},
},
},
events: []watch.Event{
{
Type: watch.Added,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-4",
},
StringData: map[string]string{
"foo-4": "initial",
},
},
},
{
Type: watch.Modified,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "new",
},
},
},
{
Type: watch.Deleted,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var expected []watch.Event
for _, o := range tc.objects {
expected = append(expected, watch.Event{
Type: watch.Added,
Object: o.DeepCopyObject(),
})
}
for _, e := range tc.events {
expected = append(expected, *e.DeepCopy())
}
fake := fakeclientset.NewSimpleClientset(tc.objects...)
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fake.Core().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fake.Core().Secrets("").Watch(options)
},
}
_, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{})
var result []watch.Event
loop:
for {
var event watch.Event
var ok bool
select {
case event, ok = <-w.ResultChan():
if !ok {
t.Errorf("Failed to read event: channel is already closed!")
return
}
result = append(result, *event.DeepCopy())
case <-time.After(time.Second * 1):
// All the events are buffered -> this means we are done
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
break loop
}
}
// Informers don't guarantee event order so we need to sort these arrays to compare them
sort.Sort(byEventTypeAndName(expected))
sort.Sort(byEventTypeAndName(result))
if !reflect.DeepEqual(expected, result) {
t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result)))
return
}
// Fill in some data to test watch closing while there are some events to be read
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
// Stop before reading all the data to make sure the informer can deal with closed channel
w.Stop()
// Wait a bit to see if the informer won't panic
// TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591)
time.Sleep(1 * time.Second)
})
}
}

View File

@ -19,16 +19,22 @@ package watch
import (
"context"
"errors"
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition failed or detected an error state.
type PreconditionFunc func(store cache.Store) (bool, error)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
@ -89,6 +95,42 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
return lastEvent, nil
}
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
// and watches the output until each provided condition succeeds, in a way that is identical
// to function UntilWithoutRetry. (See above.)
// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'.
// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will
// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple
// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will
// re-list to recover and you always get an event, if there has been a change, after recovery.
// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for
// particular object, not between more of them even it's the same resource.
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType)
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
// let UntilWithoutRetry to stop it
defer watcher.Stop()
if precondition != nil {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err())
}
done, err := precondition(indexer)
if err != nil {
return nil, err
}
if done {
return nil, nil
}
}
return UntilWithoutRetry(ctx, watcher, conditions...)
}
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {

View File

@ -19,14 +19,19 @@ package watch
import (
"context"
"errors"
"reflect"
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
type fakePod struct {
@ -172,3 +177,127 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}
func TestUntilWithSync(t *testing.T) {
// FIXME: test preconditions
tt := []struct {
name string
lw *cache.ListWatch
preconditionFunc PreconditionFunc
conditionFunc ConditionFunc
expectedErr error
expectedEvent *watch.Event
}{
{
name: "doesn't wait for sync with no precondition",
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
select {}
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
select {}
},
},
preconditionFunc: nil,
conditionFunc: func(e watch.Event) (bool, error) {
return true, nil
},
expectedErr: errors.New("timed out waiting for the condition"),
expectedEvent: nil,
},
{
name: "waits indefinitely with precondition if it can't sync",
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
select {}
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
select {}
},
},
preconditionFunc: func(store cache.Store) (bool, error) {
return true, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
return true, nil
},
expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"),
expectedEvent: nil,
},
{
name: "precondition can stop the loop",
lw: func() *cache.ListWatch {
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fakeclient.CoreV1().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fakeclient.CoreV1().Secrets("").Watch(options)
},
}
}(),
preconditionFunc: func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"})
if err != nil {
return true, err
}
if exists {
return true, nil
}
return false, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
return true, errors.New("should never reach this")
},
expectedErr: nil,
expectedEvent: nil,
},
{
name: "precondition lets it proceed to regular condition",
lw: func() *cache.ListWatch {
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fakeclient.CoreV1().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fakeclient.CoreV1().Secrets("").Watch(options)
},
}
}(),
preconditionFunc: func(store cache.Store) (bool, error) {
return false, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
if e.Type == watch.Added {
return true, nil
}
panic("no other events are expected")
},
expectedErr: nil,
expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// Informer waits for caches to sync by polling in 100ms intervals,
// timeout needs to be reasonably higher
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc)
if !reflect.DeepEqual(err, tc.expectedErr) {
t.Errorf("expected error %#v, got %#v", tc.expectedErr, err)
}
if !reflect.DeepEqual(event, tc.expectedEvent) {
t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event)
}
})
}
}