Merge pull request #30383 from deads2k/fix-reaper

Automatic merge from submit-queue

speed up RC scaler

The RC scaler was waiting before starting the scale and then didn't use a watch to observe the result.  That led to longer than expected wait times.

@fabianofranz ptal.  You may want to sweep the rest of the file.  It could use some tidying with `RetryOnConflict` and `watch.Until`.
This commit is contained in:
Kubernetes Submit Queue 2016-08-13 01:05:00 -07:00 committed by GitHub
commit a44baa0a48
2 changed files with 28 additions and 6 deletions

View File

@ -27,7 +27,9 @@ import (
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
) )
// Scaler provides an interface for resources that can be scaled. // Scaler provides an interface for resources that can be scaled.
@ -171,15 +173,23 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
} }
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) cond := ScaleCondition(scaler, preconditions, namespace, name, newSize)
if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err return err
} }
if waitForReplicas != nil { if waitForReplicas != nil {
rc, err := scaler.c.ReplicationControllers(namespace).Get(name) watchOptions := api.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", name), ResourceVersion: "0"}
watcher, err := scaler.c.ReplicationControllers(namespace).Watch(watchOptions)
if err != nil { if err != nil {
return err return err
} }
err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.ControllerHasDesiredReplicas(scaler.c, rc)) _, err = watch.Until(waitForReplicas.Timeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Added && event.Type != watch.Modified {
return false, nil
}
rc := event.Object.(*api.ReplicationController)
return rc.Status.ObservedGeneration >= rc.Generation && rc.Status.Replicas == rc.Spec.Replicas, nil
})
if err == wait.ErrWaitTimeout { if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name) return fmt.Errorf("timed out waiting for %q to be synced", name)
} }

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/client/unversioned/testclient"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
) )
func TestReplicationControllerStop(t *testing.T) { func TestReplicationControllerStop(t *testing.T) {
@ -69,7 +70,7 @@ func TestReplicationControllerStop(t *testing.T) {
}, },
}, },
StopError: nil, StopError: nil,
ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, ExpectedActions: []string{"get", "list", "get", "update", "watch", "delete"},
}, },
{ {
Name: "NoOverlapping", Name: "NoOverlapping",
@ -107,7 +108,7 @@ func TestReplicationControllerStop(t *testing.T) {
}, },
}, },
StopError: nil, StopError: nil,
ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, ExpectedActions: []string{"get", "list", "get", "update", "watch", "delete"},
}, },
{ {
Name: "OverlappingError", Name: "OverlappingError",
@ -242,9 +243,20 @@ func TestReplicationControllerStop(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
copiedForWatch, err := api.Scheme.Copy(test.Objs[0])
if err != nil {
t.Fatalf("%s unexpected error: %v", test.Name, err)
}
fake := testclient.NewSimpleFake(test.Objs...) fake := testclient.NewSimpleFake(test.Objs...)
fakeWatch := watch.NewFake()
fake.PrependWatchReactor("replicationcontrollers", testclient.DefaultWatchReactor(fakeWatch, nil))
go func() {
fakeWatch.Add(copiedForWatch)
}()
reaper := ReplicationControllerReaper{fake, time.Millisecond, time.Millisecond} reaper := ReplicationControllerReaper{fake, time.Millisecond, time.Millisecond}
err := reaper.Stop(ns, name, 0, nil) err = reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) { if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err) t.Errorf("%s unexpected error: %v", test.Name, err)
continue continue