Merge pull request #66906 from tnozicka/rename-until

Automatic merge from submit-queue (batch tested with PRs 67071, 66906, 66722, 67276, 67039). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

#50102 Task 1: Move apimachinery/pkg/watch.Until into client-go/tools/watch.UntilWithoutRetry

**What this PR does / why we need it**:
This is a split off from https://github.com/kubernetes/kubernetes/pull/50102 to go in smaller pieces.

Moves `apimachinery/pkg/watch.Until` into `client-go/tools/watch.UntilWithoutRetry` and adds context so it is cancelable.

**Release note**:
```release-note
NONE
```

**Dev release note**:
```dev-release-note
`apimachinery/pkg/watch.Until` has been moved to `client-go/tools/watch.UntilWithoutRetry`.
While switching please consider using the new `client-go/tools/watch.UntilWithSync` or `client-go/tools/watch.Until`.
```

/cc @smarterclayton @kubernetes/sig-api-machinery-pr-reviews
/milestone v1.12
/priority important-soon
/kind bug
(bug after the main PR which is this split from)

Kubernetes-commit: b6f0aed056ab94fef0b6f54e1ca1d66a5fc228b3
This commit is contained in:
Kubernetes Publisher 2018-08-14 22:43:19 -07:00
commit 744b11616f
4 changed files with 364 additions and 85 deletions

164
Godeps/Godeps.json generated
View File

@ -272,331 +272,331 @@
},
{
"ImportPath": "k8s.io/api/admissionregistration/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/admissionregistration/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/apps/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/apps/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/apps/v1beta2",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/authentication/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/authentication/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/authorization/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/authorization/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/autoscaling/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/autoscaling/v2beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/batch/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/batch/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/batch/v2alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/certificates/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/coordination/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/core/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/events/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/extensions/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/imagepolicy/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/networking/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/policy/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/rbac/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/rbac/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/rbac/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/scheduling/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/scheduling/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/settings/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/storage/v1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/storage/v1alpha1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/api/storage/v1beta1",
"Rev": "91bfdbcf0c2cab32ec1236cee4c300793abea68a"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/fuzzer",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/roundtrip",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/equality",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/errors",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/meta",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/api/resource",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/fuzzer",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/internalversion",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion/queryparams",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/fields",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/labels",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/schema",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/json",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/protobuf",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/recognizer",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/streaming",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/versioning",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/selection",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/types",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/cache",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/clock",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/diff",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/errors",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/framer",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/httpstream",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/httpstream/spdy",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/intstr",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/json",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/naming",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/net",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/remotecommand",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/runtime",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/sets",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/validation",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/validation/field",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/version",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/watch",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/netutil",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect",
"Rev": "720795f37ef04a6d63ba53b0b43233b3f851c149"
"Rev": "794ec3ffa49dd85eddf72ef4581cb9ba572a6b4c"
},
{
"ImportPath": "k8s.io/kube-openapi/pkg/util/proto",

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
watchtools "k8s.io/client-go/tools/watch"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
return nil, err
}
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
if err == watch.ErrWatchClosed {
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
if err == watchtools.ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}

102
tools/watch/until.go Normal file
View File

@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event watch.Event) (bool, error)
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// Waits until context deadline or until context is canceled.
//
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
// Warning: solving such issues.
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
continue
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, ErrWatchClosed
}
lastEvent = &event
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
// This should be handled in validation
glog.Errorf("Timeout for context shall not be negative!")
timeout = 0
}
if timeout == 0 {
return context.WithCancel(parent)
}
return context.WithTimeout(parent, timeout)
}

174
tools/watch/until_test.go Normal file
View File

@ -0,0 +1,174 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)
type fakePod struct {
name string
}
func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
func TestUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *fakePod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
}
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilMultipleConditions(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *fakePod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
}
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Added {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilMultipleConditionsFail(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *fakePod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Added {
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilTimeout(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *fakePod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event watch.Event) (bool, error) {
return event.Type == watch.Added, nil
},
func(event watch.Event) (bool, error) {
return event.Type == watch.Modified, nil
},
}
lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilErrorCondition(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *fakePod
fw.Add(obj)
}()
expected := "something bad"
conditions := []ConditionFunc{
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
func(event watch.Event) (bool, error) { return false, errors.New(expected) },
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := UntilWithoutRetry(ctx, fw, conditions...)
if err == nil {
t.Fatal("expected an error")
}
if !strings.Contains(err.Error(), expected) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}