From f89bde1415b40f7b53fb0579511258c59f7f6ac4 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 5 Apr 2016 13:33:47 -0400 Subject: [PATCH 1/2] Add watch.Until, a conditional watch mechanism Also add helpers for collecting the events that happen during a watch and a helper that makes it easy to start a watch from any object with ObjectMeta. --- pkg/api/helpers.go | 8 ++ pkg/client/unversioned/conditions.go | 55 ++++++++++++ pkg/watch/filter.go | 38 +++++++++ pkg/watch/until.go | 82 ++++++++++++++++++ pkg/watch/until_test.go | 120 +++++++++++++++++++++++++++ 5 files changed, 303 insertions(+) create mode 100644 pkg/watch/until.go create mode 100644 pkg/watch/until_test.go diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index 8d4fadc5362..d1651af7995 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -238,6 +238,14 @@ func IsStandardFinalizerName(str string) bool { return standardFinalizers.Has(str) } +// SingleObject returns a ListOptions for watching a single object. +func SingleObject(meta ObjectMeta) ListOptions { + return ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", meta.Name), + ResourceVersion: meta.ResourceVersion, + } +} + // AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice, // only if they do not already exist func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) { diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index 5087baa8067..69e309c120e 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -17,12 +17,15 @@ limitations under the License. package unversioned import ( + "fmt" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" ) // DefaultRetry is the recommended retry for a conflict where multiple clients @@ -168,3 +171,55 @@ func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions. deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil } } + +// ErrPodCompleted is returned by PodRunning or PodContainerRunning to indicate that +// the pod has already reached completed state. +var ErrPodCompleted = fmt.Errorf("pod ran to completion") + +// PodRunning returns true if the pod is running, false if the pod has not yet reached running state, +// returns ErrPodCompleted if the pod has run to completion, or an error in any other case. +func PodRunning(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodRunning: + return true, nil + case api.PodFailed, api.PodSucceeded: + return false, ErrPodCompleted + } + } + return false, nil +} + +// PodContainerRunning returns false until the named container has ContainerStatus running (at least once), +// and will return an error if the pod is deleted, runs to completion, or the container pod is not available. +func PodContainerRunning(containerName string) watch.ConditionFunc { + return func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodRunning, api.PodPending: + case api.PodFailed, api.PodSucceeded: + return false, ErrPodCompleted + default: + return false, nil + } + for _, s := range t.Status.ContainerStatuses { + if s.Name != containerName { + continue + } + return s.State.Running != nil, nil + } + return false, nil + } + return false, nil + } +} diff --git a/pkg/watch/filter.go b/pkg/watch/filter.go index 0acbaaf71a6..1eff5b94964 100644 --- a/pkg/watch/filter.go +++ b/pkg/watch/filter.go @@ -16,6 +16,10 @@ limitations under the License. package watch +import ( + "sync" +) + // FilterFunc should take an event, possibly modify it in some way, and return // the modified event. If the event should be ignored, then return keep=false. type FilterFunc func(in Event) (out Event, keep bool) @@ -69,3 +73,37 @@ func (fw *filteredWatch) loop() { } } } + +// Recorder records all events that are sent from the watch until it is closed. +type Recorder struct { + Interface + + lock sync.Mutex + events []Event +} + +var _ Interface = &Recorder{} + +// NewRecorder wraps an Interface and records any changes sent across it. +func NewRecorder(w Interface) *Recorder { + r := &Recorder{} + r.Interface = Filter(w, r.record) + return r +} + +// record is a FilterFunc and tracks each received event. +func (r *Recorder) record(in Event) (Event, bool) { + r.lock.Lock() + defer r.lock.Unlock() + r.events = append(r.events, in) + return in, true +} + +// Events returns a copy of the events sent across this recorder. +func (r *Recorder) Events() []Event { + r.lock.Lock() + defer r.lock.Unlock() + copied := make([]Event, len(r.events)) + copy(copied, r.events) + return copied +} diff --git a/pkg/watch/until.go b/pkg/watch/until.go new file mode 100644 index 00000000000..9f34f9d001b --- /dev/null +++ b/pkg/watch/until.go @@ -0,0 +1,82 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "time" + + "k8s.io/kubernetes/pkg/util/wait" +) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event Event) (bool, error) + +// Until reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var after <-chan time.Time + if timeout > 0 { + after = time.After(timeout) + } else { + ch := make(chan time.Time) + close(ch) + after = ch + } + var lastEvent *Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + break + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, wait.ErrWaitTimeout + } + lastEvent = &event + + // TODO: check for watch expired error and retry watch from latest point? + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-after: + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} diff --git a/pkg/watch/until_test.go b/pkg/watch/until_test.go new file mode 100644 index 00000000000..d411040523f --- /dev/null +++ b/pkg/watch/until_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "errors" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestUntil(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + fw.Modify(obj) + }() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Modified, nil }, + } + + timeout := time.Minute + lastEvent, err := Until(timeout, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilMultipleConditions(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + }() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Added, nil }, + } + + timeout := time.Minute + lastEvent, err := Until(timeout, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Added { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilTimeout(t *testing.T) { + fw := NewFake() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + } + + timeout := time.Duration(0) + lastEvent, err := Until(timeout, fw, conditions...) + if err != wait.ErrWaitTimeout { + t.Fatalf("expected ErrWaitTimeout error, got %#v", err) + } + if lastEvent != nil { + t.Fatalf("expected nil event, got %#v", lastEvent) + } +} + +func TestUntilErrorCondition(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + }() + expected := "something bad" + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return false, errors.New(expected) }, + } + + timeout := time.Minute + _, err := Until(timeout, fw, conditions...) + if err == nil { + t.Fatal("expected an error") + } + if !strings.Contains(err.Error(), expected) { + t.Fatalf("expected %q in error string, got %q", expected, err.Error()) + } +} From 845e49657262a52d262aa60b18699ab24b466c6d Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 5 Apr 2016 22:06:24 -0400 Subject: [PATCH 2/2] Convert poll in e2e with watch.Until --- pkg/client/unversioned/conditions.go | 70 +++++++++++++++++++++++ test/e2e/framework/util.go | 83 ++++++++++------------------ 2 files changed, 100 insertions(+), 53 deletions(-) diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index 69e309c120e..20987c8abb3 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -195,6 +195,62 @@ func PodRunning(event watch.Event) (bool, error) { return false, nil } +// PodCompleted returns true if the pod has run to completion, false if the pod has not yet +// reached running state, or an error in any other case. +func PodCompleted(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodFailed, api.PodSucceeded: + return true, nil + } + } + return false, nil +} + +// PodRunningAndReady returns true if the pod is running and ready, false if the pod has not +// yet reached those states, returns ErrPodCompleted if the pod has run to completion, or +// an error in any other case. +func PodRunningAndReady(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodFailed, api.PodSucceeded: + return false, ErrPodCompleted + case api.PodRunning: + return api.IsPodReady(t), nil + } + } + return false, nil +} + +// PodNotPending returns true if the pod has left the pending state, false if it has not, +// or an error in any other case (such as if the pod was deleted). +func PodNotPending(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodPending: + return false, nil + default: + return true, nil + } + } + return false, nil +} + // PodContainerRunning returns false until the named container has ContainerStatus running (at least once), // and will return an error if the pod is deleted, runs to completion, or the container pod is not available. func PodContainerRunning(containerName string) watch.ConditionFunc { @@ -223,3 +279,17 @@ func PodContainerRunning(containerName string) watch.ConditionFunc { return false, nil } } + +// ServiceAccountHasSecrets returns true if the service account has at least one secret, +// false if it does not, or an error. +func ServiceAccountHasSecrets(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "serviceaccounts"}, "") + } + switch t := event.Object.(type) { + case *api.ServiceAccount: + return len(t.Secrets) > 0, nil + } + return false, nil +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 05b0d3287e0..3ff5315842b 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -68,7 +68,6 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/blang/semver" - "github.com/davecgh/go-spew/spew" "golang.org/x/crypto/ssh" "golang.org/x/net/websocket" @@ -329,6 +328,7 @@ var providersWithMasterSSH = []string{"gce", "gke", "kubemark", "aws"} type podCondition func(pod *api.Pod) (bool, error) // podReady returns whether pod has a condition of Ready with a status of true. +// TODO: should be replaced with api.IsPodReady func podReady(pod *api.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == api.PodReady && cond.Status == api.ConditionTrue { @@ -628,25 +628,12 @@ func WaitForNamespacesDeleted(c *client.Client, namespaces []string, timeout tim } func waitForServiceAccountInNamespace(c *client.Client, ns, serviceAccountName string, timeout time.Duration) error { - Logf("Waiting up to %v for service account %s to be provisioned in ns %s", timeout, serviceAccountName, ns) - for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) { - sa, err := c.ServiceAccounts(ns).Get(serviceAccountName) - if apierrs.IsNotFound(err) { - Logf("Get service account %s in ns %s failed, ignoring for %v: %v", serviceAccountName, ns, Poll, err) - continue - } - if err != nil { - Logf("Get service account %s in ns %s failed: %v", serviceAccountName, ns, err) - return err - } - if len(sa.Secrets) == 0 { - Logf("Service account %s in ns %s had 0 secrets, ignoring for %v: %v", serviceAccountName, ns, Poll, err) - continue - } - Logf("Service account %s in ns %s with secrets found. (%v)", serviceAccountName, ns, time.Since(start)) - return nil + w, err := c.ServiceAccounts(ns).Watch(api.SingleObject(api.ObjectMeta{Name: serviceAccountName})) + if err != nil { + return err } - return fmt.Errorf("Service account %s in namespace %s not ready within %v", serviceAccountName, ns, timeout) + _, err = watch.Until(timeout, w, client.ServiceAccountHasSecrets) + return err } func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout time.Duration, condition podCondition) error { @@ -902,16 +889,12 @@ func waitForPodRunningInNamespaceSlow(c *client.Client, podName string, namespac } func waitTimeoutForPodRunningInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error { - return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) { - if pod.Status.Phase == api.PodRunning { - Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName) - return true, nil - } - if pod.Status.Phase == api.PodFailed { - return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod)) - } - return false, nil - }) + w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName})) + if err != nil { + return err + } + _, err = watch.Until(timeout, w, client.PodRunning) + return err } // Waits default amount of time (podNoLongerRunningTimeout) for the specified pod to stop running. @@ -921,37 +904,31 @@ func WaitForPodNoLongerRunningInNamespace(c *client.Client, podName string, name } func waitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error { - return waitForPodCondition(c, namespace, podName, "no longer running", timeout, func(pod *api.Pod) (bool, error) { - if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed { - Logf("Found pod '%s' with status '%s' on node '%s'", podName, pod.Status.Phase, pod.Spec.NodeName) - return true, nil - } - return false, nil - }) + w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName})) + if err != nil { + return err + } + _, err = watch.Until(timeout, w, client.PodCompleted) + return err } func waitTimeoutForPodReadyInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error { - return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) { - if pod.Status.Phase == api.PodRunning { - Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName) - return true, nil - } - if pod.Status.Phase == api.PodFailed { - return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod)) - } - return podReady(pod), nil - }) + w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName})) + if err != nil { + return err + } + _, err = watch.Until(timeout, w, client.PodRunningAndReady) + return err } // WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state. func WaitForPodNotPending(c *client.Client, ns, podName string) error { - return waitForPodCondition(c, ns, podName, "!pending", PodStartTimeout, func(pod *api.Pod) (bool, error) { - if pod.Status.Phase != api.PodPending { - Logf("Saw pod '%s' in namespace '%s' out of pending state (found '%q')", podName, ns, pod.Status.Phase) - return true, nil - } - return false, nil - }) + w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: podName})) + if err != nil { + return err + } + _, err = watch.Until(PodStartTimeout, w, client.PodNotPending) + return err } // waitForPodTerminatedInNamespace returns an error if it took too long for the pod