diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index dc6e2953954..2d3849d6a44 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -144,7 +144,7 @@ func (s *CMServer) Run(_ []string) error { go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) - controllerManager.Run(10 * time.Second) + controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod) kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig) if err != nil { diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 307abbb85cf..66446920041 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -133,7 +133,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) controllerManager := controller.NewReplicationManager(cl) - controllerManager.Run(10 * time.Second) + controllerManager.Run(controller.DefaultSyncPeriod) } func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP, port int) { diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index ba9353c585f..190292e2abe 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -56,6 +56,9 @@ type RealPodControl struct { kubeClient client.Interface } +// Time period of main replication controller sync loop +const DefaultSyncPeriod = 10 * time.Second + func (r RealPodControl) createReplica(namespace string, controller api.ReplicationController) { desiredLabels := make(labels.Set) for k, v := range controller.Spec.Template.Labels { diff --git a/pkg/kubectl/cmd/resize.go b/pkg/kubectl/cmd/resize.go index c55b07b6751..91905c50e47 100644 --- a/pkg/kubectl/cmd/resize.go +++ b/pkg/kubectl/cmd/resize.go @@ -19,9 +19,12 @@ package cmd import ( "fmt" "io" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/spf13/cobra" ) @@ -37,6 +40,9 @@ $ kubectl resize --replicas=3 replicationcontrollers foo // If the replication controller named foo's current size is 2, resize foo to 3. $ kubectl resize --current-replicas=2 --replicas=3 replicationcontrollers foo` + + retryFrequency = controller.DefaultSyncPeriod / 100 + retryTimeout = 10 * time.Second ) func (f *Factory) NewCmdResize(out io.Writer) *cobra.Command { @@ -63,9 +69,15 @@ func (f *Factory) NewCmdResize(out io.Writer) *cobra.Command { resourceVersion := util.GetFlagString(cmd, "resource-version") currentSize := util.GetFlagInt(cmd, "current-replicas") - s, err := resizer.Resize(namespace, name, &kubectl.ResizePrecondition{currentSize, resourceVersion}, uint(count)) - checkErr(err) - fmt.Fprintf(out, "%s\n", s) + precondition := &kubectl.ResizePrecondition{currentSize, resourceVersion} + cond := kubectl.ResizeCondition(resizer, precondition, namespace, name, uint(count)) + + msg := "resized" + if err = wait.Poll(retryFrequency, retryTimeout, cond); err != nil { + msg = fmt.Sprintf("Failed to resize controller in spite of retrying for %s", retryTimeout) + checkErr(err) + } + fmt.Fprintf(out, "%s\n", msg) }, } cmd.Flags().String("resource-version", "", "Precondition for resource version. Requires that the current resource version match this value in order to resize.") diff --git a/pkg/kubectl/resize.go b/pkg/kubectl/resize.go index ff3f866a631..b10ebe9f663 100644 --- a/pkg/kubectl/resize.go +++ b/pkg/kubectl/resize.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" ) // ResizePrecondition describes a condition that must be true for the resize to take place @@ -33,23 +34,46 @@ type ResizePrecondition struct { ResourceVersion string } +// A PreconditionError is returned when a replication controller fails to match +// the resize preconditions passed to kubectl. type PreconditionError struct { Precondition string ExpectedValue string ActualValue string } -func (pe *PreconditionError) Error() string { +func (pe PreconditionError) Error() string { return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue) } +type ControllerResizeErrorType int + +const ( + ControllerResizeGetFailure ControllerResizeErrorType = iota + ControllerResizeUpdateFailure +) + +// A ControllerResizeError is returned when a the resize request passes +// preconditions but fails to actually resize the controller. +type ControllerResizeError struct { + FailureType ControllerResizeErrorType + ResourceVersion string + ActualError error +} + +func (c ControllerResizeError) Error() string { + return fmt.Sprintf( + "Resizing the controller failed with: %s; Current resource version %s", + c.ActualError, c.ResourceVersion) +} + // Validate ensures that the preconditions match. Returns nil if they are valid, an error otherwise func (precondition *ResizePrecondition) Validate(controller *api.ReplicationController) error { if precondition.Size != -1 && controller.Spec.Replicas != precondition.Size { - return &PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(controller.Spec.Replicas)} + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(controller.Spec.Replicas)} } if precondition.ResourceVersion != "" && controller.ResourceVersion != precondition.ResourceVersion { - return &PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion} + return PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion} } return nil } @@ -70,11 +94,27 @@ type ReplicationControllerResizer struct { client.Interface } +// ResizeCondition is a closure around Resize that facilitates retries via util.wait +func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, name string, count uint) wait.ConditionFunc { + return func() (bool, error) { + _, err := r.Resize(namespace, name, precondition, count) + switch e, _ := err.(ControllerResizeError); err.(type) { + case nil: + return true, nil + case ControllerResizeError: + if e.FailureType == ControllerResizeUpdateFailure { + return false, nil + } + } + return false, err + } +} + func (resize *ReplicationControllerResizer) Resize(namespace, name string, preconditions *ResizePrecondition, newSize uint) (string, error) { rc := resize.ReplicationControllers(namespace) controller, err := rc.Get(name) if err != nil { - return "", err + return "", ControllerResizeError{ControllerResizeGetFailure, "Unknown", err} } if preconditions != nil { @@ -86,7 +126,7 @@ func (resize *ReplicationControllerResizer) Resize(namespace, name string, preco controller.Spec.Replicas = int(newSize) // TODO: do retry on 409 errors here? if _, err := rc.Update(controller); err != nil { - return "", err + return "", ControllerResizeError{ControllerResizeUpdateFailure, controller.ResourceVersion, err} } // TODO: do a better job of printing objects here. return "resized", nil diff --git a/pkg/kubectl/resize_test.go b/pkg/kubectl/resize_test.go index 4e1da6c59a5..f0ea6888eae 100644 --- a/pkg/kubectl/resize_test.go +++ b/pkg/kubectl/resize_test.go @@ -17,14 +17,53 @@ limitations under the License. package kubectl import ( - // "strings" + "errors" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - // "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +type ErrorReplicationControllers struct { + client.FakeReplicationControllers +} + +func (c *ErrorReplicationControllers) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { + return nil, errors.New("Replication controller update failure") +} + +type ErrorReplicationControllerClient struct { + client.Fake +} + +func (c *ErrorReplicationControllerClient) ReplicationControllers(namespace string) client.ReplicationControllerInterface { + return &ErrorReplicationControllers{client.FakeReplicationControllers{&c.Fake, namespace}} +} + +func TestReplicationControllerResizeRetry(t *testing.T) { + fake := &ErrorReplicationControllerClient{Fake: client.Fake{}} + resizer := ReplicationControllerResizer{fake} + preconditions := ResizePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + resizeFunc := ResizeCondition(&resizer, &preconditions, namespace, name, count) + pass, err := resizeFunc() + if pass != false { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err != nil { + t.Errorf("Did not expect an error on update failure, got %v", err) + } + preconditions = ResizePrecondition{3, ""} + resizeFunc = ResizeCondition(&resizer, &preconditions, namespace, name, count) + pass, err = resizeFunc() + if err == nil { + t.Errorf("Expected error on precondition failure") + } +} + func TestReplicationControllerResize(t *testing.T) { fake := &client.Fake{} resizer := ReplicationControllerResizer{fake} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index c54d5b267f6..1a41c85d3d8 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -68,6 +68,13 @@ func TestPoll(t *testing.T) { if invocations == 0 { t.Errorf("Expected at least one invocation, got zero") } + expectedError := errors.New("Expected error") + f = ConditionFunc(func() (bool, error) { + return false, expectedError + }) + if err := Poll(time.Microsecond, time.Microsecond, f); err == nil || err != expectedError { + t.Fatalf("Expected error %v, got none %v", expectedError, err) + } } func TestPollForever(t *testing.T) {