diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b97be971..79d15b0d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -272,331 +272,331 @@ }, { "ImportPath": "k8s.io/api/admissionregistration/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/admissionregistration/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/apps/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/apps/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/apps/v1beta2", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/authentication/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/authentication/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/authorization/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/authorization/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/autoscaling/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/autoscaling/v2beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/batch/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/batch/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/batch/v2alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/certificates/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/coordination/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/core/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/events/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/extensions/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/imagepolicy/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/networking/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/policy/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/rbac/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/rbac/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/rbac/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/scheduling/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/scheduling/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/settings/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/storage/v1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/storage/v1alpha1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/api/storage/v1beta1", - "Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/fuzzer", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/roundtrip", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/equality", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/errors", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/meta", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/resource", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/fuzzer", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/internalversion", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/conversion", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/conversion/queryparams", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/fields", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/labels", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/schema", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/json", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/protobuf", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/recognizer", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/streaming", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/versioning", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/selection", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/types", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/cache", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/clock", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/diff", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/errors", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/framer", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/httpstream", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/httpstream/spdy", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/intstr", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/json", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/naming", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/net", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/remotecommand", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/runtime", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/sets", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/validation", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/validation/field", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/wait", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/version", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/pkg/watch", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/netutil", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect", - "Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149" + "Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c" }, { "ImportPath": "k8s.io/kube-openapi/pkg/util/proto", diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index 8bf41f51..30463aea 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" + watchtools "k8s.io/client-go/tools/watch" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) // ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout // if timeout is exceeded without all conditions returning true, or an error if an error occurs. // TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) { +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch return nil, err } - evt, err := watch.Until(timeout, watchInterface, remainingConditions...) - if err == watch.ErrWatchClosed { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == watchtools.ErrWatchClosed { // present a consistent error interface to callers err = wait.ErrWaitTimeout } diff --git a/tools/watch/until.go b/tools/watch/until.go new file mode 100644 index 00000000..4a891b23 --- /dev/null +++ b/tools/watch/until.go @@ -0,0 +1,102 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + glog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/tools/watch/until_test.go b/tools/watch/until_test.go new file mode 100644 index 00000000..e766acd7 --- /dev/null +++ b/tools/watch/until_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +type fakePod struct { + name string +} + +func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind } +func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") } + +func TestUntil(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *fakePod + fw.Add(obj) + fw.Modify(obj) + }() + conditions := []ConditionFunc{ + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil }, + } + + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != watch.Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*fakePod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilMultipleConditions(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *fakePod + fw.Add(obj) + }() + conditions := []ConditionFunc{ + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + } + + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != watch.Added { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*fakePod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilMultipleConditionsFail(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *fakePod + fw.Add(obj) + }() + conditions := []ConditionFunc{ + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil }, + } + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) + if err != wait.ErrWaitTimeout { + t.Fatalf("expected ErrWaitTimeout error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != watch.Added { + t.Fatalf("expected ADDED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*fakePod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilTimeout(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *fakePod + fw.Add(obj) + fw.Modify(obj) + }() + conditions := []ConditionFunc{ + func(event watch.Event) (bool, error) { + return event.Type == watch.Added, nil + }, + func(event watch.Event) (bool, error) { + return event.Type == watch.Modified, nil + }, + } + + lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != watch.Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*fakePod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilErrorCondition(t *testing.T) { + fw := watch.NewFake() + go func() { + var obj *fakePod + fw.Add(obj) + }() + expected := "something bad" + conditions := []ConditionFunc{ + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return false, errors.New(expected) }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, err := UntilWithoutRetry(ctx, fw, conditions...) + if err == nil { + t.Fatal("expected an error") + } + if !strings.Contains(err.Error(), expected) { + t.Fatalf("expected %q in error string, got %q", expected, err.Error()) + } +}