diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index 8d4fadc5362..d1651af7995 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -238,6 +238,14 @@ func IsStandardFinalizerName(str string) bool { return standardFinalizers.Has(str) } +// SingleObject returns a ListOptions for watching a single object. +func SingleObject(meta ObjectMeta) ListOptions { + return ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", meta.Name), + ResourceVersion: meta.ResourceVersion, + } +} + // AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice, // only if they do not already exist func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) { diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index 5087baa8067..69e309c120e 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -17,12 +17,15 @@ limitations under the License. package unversioned import ( + "fmt" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" ) // DefaultRetry is the recommended retry for a conflict where multiple clients @@ -168,3 +171,55 @@ func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions. deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil } } + +// ErrPodCompleted is returned by PodRunning or PodContainerRunning to indicate that +// the pod has already reached completed state. +var ErrPodCompleted = fmt.Errorf("pod ran to completion") + +// PodRunning returns true if the pod is running, false if the pod has not yet reached running state, +// returns ErrPodCompleted if the pod has run to completion, or an error in any other case. +func PodRunning(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodRunning: + return true, nil + case api.PodFailed, api.PodSucceeded: + return false, ErrPodCompleted + } + } + return false, nil +} + +// PodContainerRunning returns false until the named container has ContainerStatus running (at least once), +// and will return an error if the pod is deleted, runs to completion, or the container pod is not available. +func PodContainerRunning(containerName string) watch.ConditionFunc { + return func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "") + } + switch t := event.Object.(type) { + case *api.Pod: + switch t.Status.Phase { + case api.PodRunning, api.PodPending: + case api.PodFailed, api.PodSucceeded: + return false, ErrPodCompleted + default: + return false, nil + } + for _, s := range t.Status.ContainerStatuses { + if s.Name != containerName { + continue + } + return s.State.Running != nil, nil + } + return false, nil + } + return false, nil + } +} diff --git a/pkg/watch/filter.go b/pkg/watch/filter.go index 0acbaaf71a6..1eff5b94964 100644 --- a/pkg/watch/filter.go +++ b/pkg/watch/filter.go @@ -16,6 +16,10 @@ limitations under the License. package watch +import ( + "sync" +) + // FilterFunc should take an event, possibly modify it in some way, and return // the modified event. If the event should be ignored, then return keep=false. type FilterFunc func(in Event) (out Event, keep bool) @@ -69,3 +73,37 @@ func (fw *filteredWatch) loop() { } } } + +// Recorder records all events that are sent from the watch until it is closed. +type Recorder struct { + Interface + + lock sync.Mutex + events []Event +} + +var _ Interface = &Recorder{} + +// NewRecorder wraps an Interface and records any changes sent across it. +func NewRecorder(w Interface) *Recorder { + r := &Recorder{} + r.Interface = Filter(w, r.record) + return r +} + +// record is a FilterFunc and tracks each received event. +func (r *Recorder) record(in Event) (Event, bool) { + r.lock.Lock() + defer r.lock.Unlock() + r.events = append(r.events, in) + return in, true +} + +// Events returns a copy of the events sent across this recorder. +func (r *Recorder) Events() []Event { + r.lock.Lock() + defer r.lock.Unlock() + copied := make([]Event, len(r.events)) + copy(copied, r.events) + return copied +} diff --git a/pkg/watch/until.go b/pkg/watch/until.go new file mode 100644 index 00000000000..9f34f9d001b --- /dev/null +++ b/pkg/watch/until.go @@ -0,0 +1,82 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "time" + + "k8s.io/kubernetes/pkg/util/wait" +) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event Event) (bool, error) + +// Until reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var after <-chan time.Time + if timeout > 0 { + after = time.After(timeout) + } else { + ch := make(chan time.Time) + close(ch) + after = ch + } + var lastEvent *Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + break + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, wait.ErrWaitTimeout + } + lastEvent = &event + + // TODO: check for watch expired error and retry watch from latest point? + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-after: + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} diff --git a/pkg/watch/until_test.go b/pkg/watch/until_test.go new file mode 100644 index 00000000000..d411040523f --- /dev/null +++ b/pkg/watch/until_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "errors" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestUntil(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + fw.Modify(obj) + }() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Modified, nil }, + } + + timeout := time.Minute + lastEvent, err := Until(timeout, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilMultipleConditions(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + }() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return event.Type == Added, nil }, + } + + timeout := time.Minute + lastEvent, err := Until(timeout, fw, conditions...) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) + } + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Added { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) + } +} + +func TestUntilTimeout(t *testing.T) { + fw := NewFake() + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + } + + timeout := time.Duration(0) + lastEvent, err := Until(timeout, fw, conditions...) + if err != wait.ErrWaitTimeout { + t.Fatalf("expected ErrWaitTimeout error, got %#v", err) + } + if lastEvent != nil { + t.Fatalf("expected nil event, got %#v", lastEvent) + } +} + +func TestUntilErrorCondition(t *testing.T) { + fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + }() + expected := "something bad" + conditions := []ConditionFunc{ + func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { return false, errors.New(expected) }, + } + + timeout := time.Minute + _, err := Until(timeout, fw, conditions...) + if err == nil { + t.Fatal("expected an error") + } + if !strings.Contains(err.Error(), expected) { + t.Fatalf("expected %q in error string, got %q", expected, err.Error()) + } +}