Add watch.Until, a conditional watch mechanism

Also add helpers for collecting the events that happen during a watch
and a helper that makes it easy to start a watch from any object with
ObjectMeta.
This commit is contained in:
Clayton Coleman 2016-04-05 13:33:47 -04:00
parent ff1da7674d
commit f89bde1415
5 changed files with 303 additions and 0 deletions

View File

@ -238,6 +238,14 @@ func IsStandardFinalizerName(str string) bool {
return standardFinalizers.Has(str) return standardFinalizers.Has(str)
} }
// SingleObject returns a ListOptions for watching a single object.
func SingleObject(meta ObjectMeta) ListOptions {
return ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", meta.Name),
ResourceVersion: meta.ResourceVersion,
}
}
// AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice, // AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice,
// only if they do not already exist // only if they do not already exist
func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) { func AddToNodeAddresses(addresses *[]NodeAddress, addAddresses ...NodeAddress) {

View File

@ -17,12 +17,15 @@ limitations under the License.
package unversioned package unversioned
import ( import (
"fmt"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
) )
// DefaultRetry is the recommended retry for a conflict where multiple clients // DefaultRetry is the recommended retry for a conflict where multiple clients
@ -168,3 +171,55 @@ func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions.
deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil
} }
} }
// ErrPodCompleted is returned by PodRunning or PodContainerRunning to indicate that
// the pod has already reached completed state.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
// PodRunning returns true if the pod is running, false if the pod has not yet reached running state,
// returns ErrPodCompleted if the pod has run to completion, or an error in any other case.
func PodRunning(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodRunning:
return true, nil
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
}
}
return false, nil
}
// PodContainerRunning returns false until the named container has ContainerStatus running (at least once),
// and will return an error if the pod is deleted, runs to completion, or the container pod is not available.
func PodContainerRunning(containerName string) watch.ConditionFunc {
return func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodRunning, api.PodPending:
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
default:
return false, nil
}
for _, s := range t.Status.ContainerStatuses {
if s.Name != containerName {
continue
}
return s.State.Running != nil, nil
}
return false, nil
}
return false, nil
}
}

View File

@ -16,6 +16,10 @@ limitations under the License.
package watch package watch
import (
"sync"
)
// FilterFunc should take an event, possibly modify it in some way, and return // FilterFunc should take an event, possibly modify it in some way, and return
// the modified event. If the event should be ignored, then return keep=false. // the modified event. If the event should be ignored, then return keep=false.
type FilterFunc func(in Event) (out Event, keep bool) type FilterFunc func(in Event) (out Event, keep bool)
@ -69,3 +73,37 @@ func (fw *filteredWatch) loop() {
} }
} }
} }
// Recorder records all events that are sent from the watch until it is closed.
type Recorder struct {
Interface
lock sync.Mutex
events []Event
}
var _ Interface = &Recorder{}
// NewRecorder wraps an Interface and records any changes sent across it.
func NewRecorder(w Interface) *Recorder {
r := &Recorder{}
r.Interface = Filter(w, r.record)
return r
}
// record is a FilterFunc and tracks each received event.
func (r *Recorder) record(in Event) (Event, bool) {
r.lock.Lock()
defer r.lock.Unlock()
r.events = append(r.events, in)
return in, true
}
// Events returns a copy of the events sent across this recorder.
func (r *Recorder) Events() []Event {
r.lock.Lock()
defer r.lock.Unlock()
copied := make([]Event, len(r.events))
copy(copied, r.events)
return copied
}

82
pkg/watch/until.go Normal file
View File

@ -0,0 +1,82 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"time"
"k8s.io/kubernetes/pkg/util/wait"
)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event Event) (bool, error)
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var after <-chan time.Time
if timeout > 0 {
after = time.After(timeout)
} else {
ch := make(chan time.Time)
close(ch)
after = ch
}
var lastEvent *Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
break
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, wait.ErrWaitTimeout
}
lastEvent = &event
// TODO: check for watch expired error and retry watch from latest point?
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-after:
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}

120
pkg/watch/until_test.go Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"errors"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Modified, nil },
}
timeout := time.Minute
lastEvent, err := Until(timeout, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilMultipleConditions(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return event.Type == Added, nil },
}
timeout := time.Minute
lastEvent, err := Until(timeout, fw, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Added {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
func TestUntilTimeout(t *testing.T) {
fw := NewFake()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
}
timeout := time.Duration(0)
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
}
if lastEvent != nil {
t.Fatalf("expected nil event, got %#v", lastEvent)
}
}
func TestUntilErrorCondition(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
}()
expected := "something bad"
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) { return false, errors.New(expected) },
}
timeout := time.Minute
_, err := Until(timeout, fw, conditions...)
if err == nil {
t.Fatal("expected an error")
}
if !strings.Contains(err.Error(), expected) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}