Merge pull request #23912 from smarterclayton/watch_until

Automatic merge from submit-queue

Add watch.Until, a conditional watch mechanism

A more powerful tool than wait.Poll, allows a watch interface to drive conditionals to react to changes on a resource or resources. Provide a set of standard conditions that are in common use in the code, and updates e2e to use a few of these.

Extracted from #23567
This commit is contained in:
k8s-merge-robot 2016-04-16 21:05:40 -07:00
commit 822618afb5
6 changed files with 403 additions and 53 deletions

View File

@ -238,6 +238,14 @@ func IsStandardFinalizerName(str string) bool {
return standardFinalizers.Has(str)
}
// SingleObject returns a ListOptions for watching a single object.
func SingleObject(meta ObjectMeta) ListOptions {
return ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", meta.Name),
ResourceVersion: meta.ResourceVersion,
}
}
// AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice,
// only if they do not already exist
func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) {

View File

@ -17,12 +17,15 @@ limitations under the License.
package unversioned
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
// DefaultRetry is the recommended retry for a conflict where multiple clients
@ -168,3 +171,125 @@ func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions.
deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil
}
}
// ErrPodCompleted is returned by PodRunning or PodContainerRunning to indicate that
// the pod has already reached completed state.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
// PodRunning returns true if the pod is running, false if the pod has not yet reached running state,
// returns ErrPodCompleted if the pod has run to completion, or an error in any other case.
func PodRunning(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodRunning:
return true, nil
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
}
}
return false, nil
}
// PodCompleted returns true if the pod has run to completion, false if the pod has not yet
// reached running state, or an error in any other case.
func PodCompleted(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodFailed, api.PodSucceeded:
return true, nil
}
}
return false, nil
}
// PodRunningAndReady returns true if the pod is running and ready, false if the pod has not
// yet reached those states, returns ErrPodCompleted if the pod has run to completion, or
// an error in any other case.
func PodRunningAndReady(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
case api.PodRunning:
return api.IsPodReady(t), nil
}
}
return false, nil
}
// PodNotPending returns true if the pod has left the pending state, false if it has not,
// or an error in any other case (such as if the pod was deleted).
func PodNotPending(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodPending:
return false, nil
default:
return true, nil
}
}
return false, nil
}
// PodContainerRunning returns false until the named container has ContainerStatus running (at least once),
// and will return an error if the pod is deleted, runs to completion, or the container pod is not available.
func PodContainerRunning(containerName string) watch.ConditionFunc {
return func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodRunning, api.PodPending:
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
default:
return false, nil
}
for _, s := range t.Status.ContainerStatuses {
if s.Name != containerName {
continue
}
return s.State.Running != nil, nil
}
return false, nil
}
return false, nil
}
}
// ServiceAccountHasSecrets returns true if the service account has at least one secret,
// false if it does not, or an error.
func ServiceAccountHasSecrets(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "serviceaccounts"}, "")
}
switch t := event.Object.(type) {
case *api.ServiceAccount:
return len(t.Secrets) > 0, nil
}
return false, nil
}

View File

@ -16,6 +16,10 @@ limitations under the License.
package watch
import (
"sync"
)
// FilterFunc should take an event, possibly modify it in some way, and return
// the modified event. If the event should be ignored, then return keep=false.
type FilterFunc func(in Event) (out Event, keep bool)
@ -69,3 +73,37 @@ func (fw *filteredWatch) loop() {
}
}
}
// Recorder records all events that are sent from the watch until it is closed.
type Recorder struct {
Interface
lock sync.Mutex
events []Event
}
var _ Interface = &Recorder{}
// NewRecorder wraps an Interface and records any changes sent across it.
func NewRecorder(w Interface) *Recorder {
r := &Recorder{}
r.Interface = Filter(w, r.record)
return r
}
// record is a FilterFunc and tracks each received event.
func (r *Recorder) record(in Event) (Event, bool) {
r.lock.Lock()
defer r.lock.Unlock()
r.events = append(r.events, in)
return in, true
}
// Events returns a copy of the events sent across this recorder.
func (r *Recorder) Events() []Event {
r.lock.Lock()
defer r.lock.Unlock()
copied := make([]Event, len(r.events))
copy(copied, r.events)
return copied
}

82
pkg/watch/until.go Normal file
View File

@ -0,0 +1,82 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"time"
"k8s.io/kubernetes/pkg/util/wait"
)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event Event) (bool, error)
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var after <-chan time.Time
if timeout > 0 {
after = time.After(timeout)
} else {
ch := make(chan time.Time)
close(ch)
after = ch
}
var lastEvent *Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
break
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, wait.ErrWaitTimeout
}
lastEvent = &event
// TODO: check for watch expired error and retry watch from latest point?
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-after:
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}

120
pkg/watch/until_test.go Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"errors"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Modified, nil },
}
timeout := time.Minute
lastEvent, err := Until(timeout, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilMultipleConditions(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Added, nil },
}
timeout := time.Minute
lastEvent, err := Until(timeout, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Added {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilTimeout(t *testing.T) {
fw := NewFake()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
}
timeout := time.Duration(0)
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
}
if lastEvent != nil {
t.Fatalf("expected nil event, got %#v", lastEvent)
}
}
func TestUntilErrorCondition(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
expected := "something bad"
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return false, errors.New(expected) },
}
timeout := time.Minute
_, err := Until(timeout, fw, conditions...)
if err == nil {
t.Fatal("expected an error")
}
if !strings.Contains(err.Error(), expected) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}

View File

@ -68,7 +68,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/blang/semver"
"github.com/davecgh/go-spew/spew"
"golang.org/x/crypto/ssh"
"golang.org/x/net/websocket"
@ -329,6 +328,7 @@ var providersWithMasterSSH = []string{"gce", "gke", "kubemark", "aws"}
type podCondition func(pod *api.Pod) (bool, error)
// podReady returns whether pod has a condition of Ready with a status of true.
// TODO: should be replaced with api.IsPodReady
func podReady(pod *api.Pod) bool {
for _, cond := range pod.Status.Conditions {
if cond.Type == api.PodReady && cond.Status == api.ConditionTrue {
@ -628,25 +628,12 @@ func WaitForNamespacesDeleted(c *client.Client, namespaces []string, timeout tim
}
func waitForServiceAccountInNamespace(c *client.Client, ns, serviceAccountName string, timeout time.Duration) error {
Logf("Waiting up to %v for service account %s to be provisioned in ns %s", timeout, serviceAccountName, ns)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
sa, err := c.ServiceAccounts(ns).Get(serviceAccountName)
if apierrs.IsNotFound(err) {
Logf("Get service account %s in ns %s failed, ignoring for %v: %v", serviceAccountName, ns, Poll, err)
continue
}
if err != nil {
Logf("Get service account %s in ns %s failed: %v", serviceAccountName, ns, err)
return err
}
if len(sa.Secrets) == 0 {
Logf("Service account %s in ns %s had 0 secrets, ignoring for %v: %v", serviceAccountName, ns, Poll, err)
continue
}
Logf("Service account %s in ns %s with secrets found. (%v)", serviceAccountName, ns, time.Since(start))
return nil
w, err := c.ServiceAccounts(ns).Watch(api.SingleObject(api.ObjectMeta{Name: serviceAccountName}))
if err != nil {
return err
}
return fmt.Errorf("Service account %s in namespace %s not ready within %v", serviceAccountName, ns, timeout)
_, err = watch.Until(timeout, w, client.ServiceAccountHasSecrets)
return err
}
func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout time.Duration, condition podCondition) error {
@ -902,16 +889,12 @@ func waitForPodRunningInNamespaceSlow(c *client.Client, podName string, namespac
}
func waitTimeoutForPodRunningInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error {
return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase == api.PodRunning {
Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName)
return true, nil
}
if pod.Status.Phase == api.PodFailed {
return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod))
}
return false, nil
})
w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName}))
if err != nil {
return err
}
_, err = watch.Until(timeout, w, client.PodRunning)
return err
}
// Waits default amount of time (podNoLongerRunningTimeout) for the specified pod to stop running.
@ -921,37 +904,31 @@ func WaitForPodNoLongerRunningInNamespace(c *client.Client, podName string, name
}
func waitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error {
return waitForPodCondition(c, namespace, podName, "no longer running", timeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
Logf("Found pod '%s' with status '%s' on node '%s'", podName, pod.Status.Phase, pod.Spec.NodeName)
return true, nil
}
return false, nil
})
w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName}))
if err != nil {
return err
}
_, err = watch.Until(timeout, w, client.PodCompleted)
return err
}
func waitTimeoutForPodReadyInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error {
return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase == api.PodRunning {
Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName)
return true, nil
}
if pod.Status.Phase == api.PodFailed {
return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod))
}
return podReady(pod), nil
})
w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName}))
if err != nil {
return err
}
_, err = watch.Until(timeout, w, client.PodRunningAndReady)
return err
}
// WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
func WaitForPodNotPending(c *client.Client, ns, podName string) error {
return waitForPodCondition(c, ns, podName, "!pending", PodStartTimeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase != api.PodPending {
Logf("Saw pod '%s' in namespace '%s' out of pending state (found '%q')", podName, ns, pod.Status.Phase)
return true, nil
}
return false, nil
})
w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: podName}))
if err != nil {
return err
}
_, err = watch.Until(PodStartTimeout, w, client.PodNotPending)
return err
}
// waitForPodTerminatedInNamespace returns an error if it took too long for the pod