Merge pull request #943 from smarterclayton/only_wait_for_acceptance

Clients must wait for completion of actions
This commit is contained in:
brendandburns 2014-08-20 21:01:21 -07:00
commit 1bd4ae0c62
13 changed files with 328 additions and 70 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
@ -113,7 +114,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
@ -125,7 +126,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd"))
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
@ -135,7 +136,24 @@ func startComponents(manifestURL string) (apiServerURL string) {
return apiServer.URL
}
func runReplicationControllerTest(kubeClient *client.Client) {
// podsOnMinions returns true when all of the selected pods exist on a minion.
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
podInfo := fakePodInfoGetter{}
return func() (bool, error) {
for i := range pods.Items {
host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID
if len(host) == 0 {
return false, nil
}
if _, err := podInfo.GetPodInfo(host, id); err != nil {
return false, nil
}
}
return true, nil
}
}
func runReplicationControllerTest(c *client.Client) {
data, err := ioutil.ReadFile("api/examples/controller.json")
if err != nil {
glog.Fatalf("Unexpected error: %#v", err)
@ -146,20 +164,26 @@ func runReplicationControllerTest(kubeClient *client.Client) {
}
glog.Infof("Creating replication controllers")
if _, err := kubeClient.CreateReplicationController(controllerRequest); err != nil {
if _, err := c.CreateReplicationController(controllerRequest); err != nil {
glog.Fatalf("Unexpected error: %#v", err)
}
glog.Infof("Done creating replication controllers")
// Give the controllers some time to actually create the pods
time.Sleep(time.Second * 10)
// Validate that they're truly up.
pods, err := kubeClient.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector())
if err != nil || len(pods.Items) != controllerRequest.DesiredState.Replicas {
glog.Fatalf("FAILED: %#v", pods.Items)
if err := wait.Poll(time.Second, 10, c.ControllerHasDesiredReplicas(controllerRequest)); err != nil {
glog.Fatalf("FAILED: pods never created %v", err)
}
glog.Infof("Replication controller produced:\n\n%#v\n\n", pods)
// wait for minions to indicate they have info about the desired pods
pods, err := c.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
glog.Fatalf("FAILED: unable to get pods to list: %v", err)
}
if err := wait.Poll(time.Second, 10, podsOnMinions(c, pods)); err != nil {
glog.Fatalf("FAILED: pods never started running %v", err)
}
glog.Infof("Pods created")
}
func runAtomicPutTest(c *client.Client) {

View File

@ -138,7 +138,7 @@ func main() {
if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList)
etcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd"))
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd"))
}
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop

View File

@ -49,7 +49,7 @@ Once that's up you can list the pods in the cluster, to verify that the master i
cluster/kubecfg.sh list pods
```
You'll see a single redis master pod. It will also display the machine that the pod is running on.
You'll see a single redis master pod. It will also display the machine that the pod is running on once it gets placed (may take up to thirty seconds).
```
Name Image(s) Host Labels
@ -230,7 +230,7 @@ Name Image(s) Selector Replicas
frontendController brendanburns/php-redis name=frontend 3
```
Once that's up you can list the pods in the cluster, to verify that the master, slaves and frontends are running:
Once that's up (it may take ten to thirty seconds to create the pods) you can list the pods in the cluster, to verify that the master, slaves and frontends are running:
```shell
$ cluster/kubecfg.sh list pods

35
pkg/client/conditions.go Normal file
View File

@ -0,0 +1,35 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)
// ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count
// for a controller's ReplicaSelector equals the Replicas count.
func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.ListPods(labels.Set(controller.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
return false, err
}
return len(pods.Items) == controller.DesiredState.Replicas, nil
}
}

View File

@ -41,20 +41,21 @@ type SourceEtcd struct {
client tools.EtcdClient
updates chan<- interface{}
waitDuration time.Duration
interval time.Duration
timeout time.Duration
}
// NewSourceEtcd creates a config source that watches and pulls from a key in etcd
func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd {
func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface{}) *SourceEtcd {
config := &SourceEtcd{
key: key,
client: client,
updates: updates,
waitDuration: period,
timeout: 1 * time.Minute,
}
glog.Infof("Watching etcd for %s", key)
go util.Forever(config.run, period)
go util.Forever(config.run, time.Second)
return config
}
@ -81,7 +82,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err err
if fromIndex == 0 {
response, err = s.client.Get(s.key, true, false)
} else {
response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration))
response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.timeout))
if tools.IsEtcdWatchStoppedByUser(err) {
return fromIndex, nil
}
@ -125,9 +126,13 @@ func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) {
return pods, nil
}
// stopChannel creates a channel that is closed after a duration for use with etcd client API
// stopChannel creates a channel that is closed after a duration for use with etcd client API.
// If until is 0, the channel will never close.
func stopChannel(until time.Duration) chan bool {
stop := make(chan bool)
if until == 0 {
return stop
}
go func() {
select {
case <-time.After(until):

View File

@ -43,7 +43,7 @@ func TestGetEtcdData(t *testing.T) {
},
E: nil,
}
NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch)
NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, ch)
//TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index
//returns an infinite stream of updates
@ -63,7 +63,7 @@ func TestGetEtcdNoData(t *testing.T) {
R: &etcd.Response{},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")
@ -86,7 +86,7 @@ func TestGetEtcd(t *testing.T) {
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
lastIndex, err := c.fetchNextState(0)
if err != nil {
t.Errorf("Unexpected error: %v", err)
@ -118,7 +118,7 @@ func TestWatchEtcd(t *testing.T) {
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
lastIndex, err := c.fetchNextState(1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
@ -140,7 +140,7 @@ func TestGetEtcdNotFound(t *testing.T) {
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")
@ -157,7 +157,7 @@ func TestGetEtcdError(t *testing.T) {
ErrorCode: 200, // non not found error
},
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")

View File

@ -48,7 +48,7 @@ func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.R
}
}
// Create registers then given ReplicationController.
// Create registers the given ReplicationController.
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(*api.ReplicationController)
if !ok {
@ -70,7 +70,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
if err != nil {
return nil, err
}
return rs.waitForController(*controller)
return rs.registry.GetController(controller.ID)
}), nil
}
@ -124,7 +124,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
if err != nil {
return nil, err
}
return rs.waitForController(*controller)
return rs.registry.GetController(controller.ID)
}), nil
}

View File

@ -239,30 +239,10 @@ func TestCreateController(t *testing.T) {
}
select {
case <-channel:
// expected case
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
mockPodRegistry.Lock()
mockPodRegistry.Pods = []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
},
{
JSONBase: api.JSONBase{ID: "bar"},
Labels: map[string]string{"a": "b"},
},
}
mockPodRegistry.Unlock()
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected
t.Error("Unexpected timeout from async channel")
}
}

View File

@ -85,7 +85,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
if err := rs.scheduleAndCreatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
return rs.registry.GetPod(pod.ID)
}), nil
}
@ -153,7 +153,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
if err := rs.registry.UpdatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
return rs.registry.GetPod(pod.ID)
}), nil
}

View File

@ -406,23 +406,10 @@ func TestCreatePod(t *testing.T) {
}
select {
case <-channel:
// Do nothing, this is expected.
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
// TODO: Is the below actually testing something?
podRegistry.UpdatePod(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: api.PodRunning,
},
})
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected.
t.Error("Unexpected timeout on async channel")
}
}

19
pkg/util/wait/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package wait provides tools for polling or listening for changes
// to a condition.
package wait

83
pkg/util/wait/wait.go Normal file
View File

@ -0,0 +1,83 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wait
import (
"errors"
"time"
)
// ErrWaitTimeout is returned when the condition exited without success
var ErrWaitTimeout = errors.New("timed out waiting for the condition")
// ConditionFunc returns true if the condition is satisfied, or an error
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// Poll tries a condition func until it returns true, an error, or the
// wait channel is closed. Will always poll at least once.
func Poll(interval time.Duration, cycles int, condition ConditionFunc) error {
return WaitFor(poller(interval, cycles), condition)
}
// WaitFunc creates a channel that receives an item every time a test
// should be executed and is closed when the last test should be invoked.
type WaitFunc func() <-chan struct{}
// WaitFor implements the looping for a wait.
func WaitFor(wait WaitFunc, c ConditionFunc) error {
w := wait()
for {
_, open := <-w
ok, err := c()
if err != nil {
return err
}
if ok {
return nil
}
if !open {
break
}
}
return ErrWaitTimeout
}
// poller returns a WaitFunc that will send to the channel every
// interval until at most cycles * interval has elapsed and then
// close the channel. Over very short intervals you may receive
// no ticks before being closed.
func poller(interval time.Duration, cycles int) WaitFunc {
return WaitFunc(func() <-chan struct{} {
ch := make(chan struct{})
go func() {
tick := time.NewTicker(interval)
defer tick.Stop()
after := time.After(interval * time.Duration(cycles))
for {
select {
case <-tick.C:
ch <- struct{}{}
case <-after:
close(ch)
return
}
}
}()
return ch
})
}

125
pkg/util/wait/wait_test.go Normal file
View File

@ -0,0 +1,125 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wait
import (
"errors"
"testing"
"time"
)
func TestPoller(t *testing.T) {
w := poller(time.Millisecond, 2)
ch := w()
count := 0
DRAIN:
for {
select {
case _, open := <-ch:
if !open {
break DRAIN
}
count++
case <-time.After(time.Millisecond * 5):
t.Errorf("unexpected timeout after poll")
}
}
if count > 3 {
t.Errorf("expected up to three values, got %d", count)
}
}
func fakeTicker(count int) WaitFunc {
return func() <-chan struct{} {
ch := make(chan struct{})
go func() {
for i := 0; i < count; i++ {
ch <- struct{}{}
}
close(ch)
}()
return ch
}
}
func TestPoll(t *testing.T) {
invocations := 0
f := ConditionFunc(func() (bool, error) {
invocations++
return true, nil
})
if err := Poll(time.Microsecond, 1, f); err != nil {
t.Fatalf("unexpected error %v", err)
}
if invocations == 0 {
t.Errorf("Expected at least one invocation, got zero")
}
}
func TestWaitFor(t *testing.T) {
var invocations int
testCases := map[string]struct {
F ConditionFunc
Ticks int
Invoked int
Err bool
}{
"invoked once": {
ConditionFunc(func() (bool, error) {
invocations++
return true, nil
}),
2,
1,
false,
},
"invoked and returns a timeout": {
ConditionFunc(func() (bool, error) {
invocations++
return false, nil
}),
2,
3,
true,
},
"returns immediately on error": {
ConditionFunc(func() (bool, error) {
invocations++
return false, errors.New("test")
}),
2,
1,
true,
},
}
for k, c := range testCases {
invocations = 0
ticker := fakeTicker(c.Ticks)
err := WaitFor(ticker, c.F)
switch {
case c.Err && err == nil:
t.Errorf("%s: Expected error, got nil", k)
continue
case !c.Err && err != nil:
t.Errorf("%s: Expected no error, got: %#v", k, err)
continue
}
if invocations != c.Invoked {
t.Errorf("%s: Expected %d invocations, called %d", k, c.Invoked, invocations)
}
}
}