published by bot

(https://github.com/kubernetes/contrib/tree/master/mungegithub)

copied from https://github.com/kubernetes/kubernetes.git, branch master,
last commit is 1bc5b822cd566321c115d4ebac5d97cfd347d687
This commit is contained in:
Kubernetes Publisher
2016-11-11 03:39:04 +00:00
parent 24b73253cd
commit ae6775eeec
305 changed files with 13010 additions and 8421 deletions

View File

@@ -20,12 +20,22 @@ import (
"time"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}
// ListFunc knows how to list resources
type ListFunc func(options api.ListOptions) (runtime.Object, error)
@@ -84,3 +94,69 @@ func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) {
func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watchInterface, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
return watch.Until(timeout, watchInterface, remainingConditions...)
}