From da5e4d7bd52d8babfccebf4da2b78de8a27d30a6 Mon Sep 17 00:00:00 2001 From: Dan Mace Date: Tue, 28 Jul 2015 14:43:48 -0400 Subject: [PATCH] Rolling updater availability enhancements Enhance the rolling updater to support maintaining minimum pod availability for the duration of the update process. --- hack/test-cmd.sh | 6 - .../unversioned/testclient/fake_pods.go | 9 +- pkg/kubectl/cmd/rollingupdate.go | 12 +- pkg/kubectl/rolling_updater.go | 781 +++++---- pkg/kubectl/rolling_updater_test.go | 1408 ++++++++++------- test/e2e/kubectl.go | 1 - 6 files changed, 1289 insertions(+), 928 deletions(-) diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 3edf81d2dee..7ae9c01a863 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -680,12 +680,6 @@ __EOF__ kubectl delete pod valid-pod "${kube_flags[@]}" kubectl delete service frontend{,-2,-3,-4,-5} "${kube_flags[@]}" - ### Perform a rolling update with --image - # Command - kubectl rolling-update frontend --image=kubernetes/pause --update-period=10ns --poll-interval=10ms "${kube_flags[@]}" - # Post-condition: current image IS kubernetes/pause - kube::test::get_object_assert 'rc frontend' '{{range \$c:=$rc_container_image_field}} {{\$c.image}} {{end}}' ' +kubernetes/pause +' - ### Delete replication controller with id # Pre-condition: frontend replication controller is running kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend:' diff --git a/pkg/client/unversioned/testclient/fake_pods.go b/pkg/client/unversioned/testclient/fake_pods.go index 7e72e50b81c..d8c9bbb4bf9 100644 --- a/pkg/client/unversioned/testclient/fake_pods.go +++ b/pkg/client/unversioned/testclient/fake_pods.go @@ -44,8 +44,13 @@ func (c *FakePods) List(label labels.Selector, field fields.Selector) (*api.PodL if obj == nil { return nil, err } - - return obj.(*api.PodList), err + list := &api.PodList{} + for _, pod := range obj.(*api.PodList).Items { + if label.Matches(labels.Set(pod.Labels)) { + list.Items = append(list.Items, pod) + } + } + return list, err } func (c *FakePods) Create(pod *api.Pod) (*api.Pod, error) { diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go index d2e8ef0c23d..83a7fd3f297 100644 --- a/pkg/kubectl/cmd/rollingupdate.go +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/util" ) const ( @@ -141,8 +142,6 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg return err } - updaterClient := kubectl.NewRollingUpdaterClient(client) - var newRc *api.ReplicationController // fetch rc oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName) @@ -151,11 +150,11 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg return err } // We're in the middle of a rename, look for an RC with a source annotation of oldName - newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName) + newRc, err := kubectl.FindSourceController(client, cmdNamespace, oldName) if err != nil { return err } - return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName) + return kubectl.Rename(client, newRc, oldName) } var keepOldName bool @@ -235,7 +234,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg filename, oldName) } - updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient) + updater := kubectl.NewRollingUpdater(newRc.Namespace, client) // To successfully pull off a rolling update the new and old rc have to differ // by at least one selector. Every new pod should have the selector and every @@ -279,7 +278,8 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg Interval: interval, Timeout: timeout, CleanupPolicy: updateCleanupPolicy, - UpdateAcceptor: kubectl.DefaultUpdateAcceptor, + MaxUnavailable: util.NewIntOrStringFromInt(0), + MaxSurge: util.NewIntOrStringFromInt(1), } if cmdutil.GetFlagBool(cmd, "rollback") { kubectl.AbortRollingUpdate(config) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index ed8bd3f4693..448f34b8733 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -30,23 +30,16 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) -// RollingUpdater provides methods for updating replicated pods in a predictable, -// fault-tolerant way. -type RollingUpdater struct { - // Client interface for creating and updating controllers - c RollingUpdaterClient - // Namespace for resources - ns string - // scaleAndWait scales a controller and returns its updated state. - scaleAndWait scaleAndWait -} - -// scaleAndWait scales rc and returns its updated state. This typedef is to -// abstract away the use of a Scaler to ease testing. -type scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) +const ( + sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id" + desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas" + originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas" + nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id" +) // RollingUpdaterConfig is the configuration for a rolling deployment process. type RollingUpdaterConfig struct { @@ -67,16 +60,26 @@ type RollingUpdaterConfig struct { // CleanupPolicy defines the cleanup action to take after the deployment is // complete. CleanupPolicy RollingUpdaterCleanupPolicy - // UpdateAcceptor is given a chance to accept the new controller after each - // scale-up operation. If the controller is accepted, updates continue; if - // the controller is rejected, the update will fail immediately. - UpdateAcceptor UpdateAcceptor - // UpdatePercent is optional; if specified, the amount of replicas scaled up - // and down each interval will be computed as a percentage of the desired - // replicas for the new RC. If UpdatePercent is nil, one replica will be - // scaled up and down each interval. If UpdatePercent is negative, the order - // of scaling will be down/up instead of up/down. - UpdatePercent *int + // The maximum number of pods that can be unavailable during the update. + // Value can be an absolute number (ex: 5) or a percentage of total pods at + // the start of update (ex: 10%). Absolute number is calculated from + // percentage by rounding up. This can not be 0 if MaxSurge is 0. By + // default, a fixed value of 1 is used. Example: when this is set to 30%, + // the old RC can be scaled down by 30% immediately when the rolling update + // starts. Once new pods are ready, old RC can be scaled down further, + // followed by scaling up the new RC, ensuring that at least 70% of original + // number of pods are available at all times during the update. + MaxUnavailable util.IntOrString + // The maximum number of pods that can be scheduled above the original + // number of pods. Value can be an absolute number (ex: 5) or a percentage of total + // pods at the start of the update (ex: 10%). This can not be 0 if + // MaxUnavailable is 0. Absolute number is calculated from percentage by + // rounding up. By default, a value of 1 is used. Example: when this is set + // to 30%, the new RC can be scaled up by 30% immediately when the rolling + // update starts. Once old pods have been killed, new RC can be scaled up + // further, ensuring that total number of pods running at any time during + // the update is atmost 130% of original pods. + MaxSurge util.IntOrString } // RollingUpdaterCleanupPolicy is a cleanup action to take after the @@ -93,27 +96,421 @@ const ( RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename" ) -// UpdateAcceptor is given a chance to accept or reject the new controller -// during a deployment each time the controller is scaled up. -// -// After the successful scale-up of the controller, the controller is given to -// the UpdateAcceptor. If the UpdateAcceptor rejects the controller, the -// deployment is stopped with an error. -type UpdateAcceptor interface { - // Accept returns nil if the controller is okay, otherwise returns an error. - Accept(*api.ReplicationController) error +// RollingUpdater provides methods for updating replicated pods in a predictable, +// fault-tolerant way. +type RollingUpdater struct { + // Client interface for creating and updating controllers + c client.Interface + // Namespace for resources + ns string + // scaleAndWait scales a controller and returns its updated state. + scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) + //getOrCreateTargetController gets and validates an existing controller or + //makes a new one. + getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) + // cleanup performs post deployment cleanup tasks for newRc and oldRc. + cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error + // waitForReadyPods should block until there are >0 total pods ready amongst + // the old and new controllers, and should return the amount of old and new + // ready. + waitForReadyPods func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) } -// AlwaysAccept is an UpdateAcceptor which always accepts the controller. -type AlwaysAccept struct{} +// NewRollingUpdater creates a RollingUpdater from a client. +func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdater { + updater := &RollingUpdater{ + c: client, + ns: namespace, + } + // Inject real implementations. + updater.scaleAndWait = updater.scaleAndWaitWithScaler + updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient + updater.waitForReadyPods = updater.pollForReadyPods + updater.cleanup = updater.cleanupWithClients + return updater +} -// Accept implements UpdateAcceptor. -func (a *AlwaysAccept) Accept(*api.ReplicationController) error { return nil } +// Update all pods for a ReplicationController (oldRc) by creating a new +// controller (newRc) with 0 replicas, and synchronously scaling oldRc and +// newRc until oldRc has 0 replicas and newRc has the original # of desired +// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy. +// +// Each interval, the updater will attempt to make progress however it can +// without violating any availability constraints defined by the config. This +// means the amount scaled up or down each interval will vary based on the +// timeliness of readiness and the updater will always try to make progress, +// even slowly. +// +// If an update from newRc to oldRc is already in progress, we attempt to +// drive it to completion. If an error occurs at any step of the update, the +// error will be returned. +// +// A scaling event (either up or down) is considered progress; if no progress +// is made within the config.Timeout, an error is returned. +// +// TODO: make this handle performing a rollback of a partially completed +// rollout. +func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { + out := config.Out + oldRc := config.OldRc + scaleRetryParams := NewRetryParams(config.Interval, config.Timeout) -// DefaultUpdaterAcceptor always accepts controllers. -var DefaultUpdateAcceptor UpdateAcceptor = &AlwaysAccept{} + // Find an existing controller (for continuing an interrupted update) or + // create a new one if necessary. + sourceId := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID) + newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceId) + if err != nil { + return err + } + if existed { + fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name) + } else { + fmt.Fprintf(out, "Created %s\n", newRc.Name) + } + // Extract the desired replica count from the controller. + desired, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation]) + if err != nil { + return fmt.Errorf("Unable to parse annotation for %s: %s=%s", + newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation]) + } + // Extract the original replica count from the old controller, adding the + // annotation if it doesn't yet exist. + _, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation] + if !hasOriginalAnnotation { + existing, err := r.c.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name) + if err != nil { + return err + } + if existing.Annotations == nil { + existing.Annotations = map[string]string{} + } + existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(existing.Spec.Replicas) + updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing) + if err != nil { + return err + } + oldRc = updated + } + original, err := strconv.Atoi(oldRc.Annotations[originalReplicasAnnotation]) + if err != nil { + return fmt.Errorf("Unable to parse annotation for %s: %s=%s\n", + oldRc.Name, originalReplicasAnnotation, oldRc.Annotations[originalReplicasAnnotation]) + } + // The maximum pods which can go unavailable during the update. + maxUnavailable, err := extractMaxValue(config.MaxUnavailable, "maxUnavailable", original) + if err != nil { + return err + } + // The maximum scaling increment. + maxSurge, err := extractMaxValue(config.MaxSurge, "maxSurge", original) + if err != nil { + return err + } + // Further validation. + if maxUnavailable == 0 && maxSurge == 0 { + return fmt.Errorf("one of maxSurge or maxUnavailable must be specified") + } + // The minumum pods which must remain available througout the update + // calculated for internal convenience. + minAvailable := original - maxUnavailable -func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) { + fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n", + newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, minAvailable, original+maxSurge) + + // Scale newRc and oldRc until newRc has the desired number of replicas and + // oldRc has 0 replicas. + progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds() + for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 { + // Store the existing replica counts for progress timeout tracking. + newReplicas := newRc.Spec.Replicas + oldReplicas := oldRc.Spec.Replicas + + // Scale up as much as possible. + scaledRc, err := r.scaleUp(newRc, oldRc, original, desired, maxSurge, maxUnavailable, scaleRetryParams, config) + if err != nil { + return err + } + newRc = scaledRc + + // Wait between scaling operations for things to settle. + time.Sleep(config.UpdatePeriod) + + // Scale down as much as possible. + scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config) + if err != nil { + return err + } + oldRc = scaledRc + + // If we are making progress, continue to advance the progress deadline. + // Otherwise, time out with an error. + progressMade := (newRc.Spec.Replicas != newReplicas) || (oldRc.Spec.Replicas != oldReplicas) + if progressMade { + progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds() + } else if time.Now().UnixNano() > progressDeadline { + return fmt.Errorf("timed out waiting for any update progress to be made") + } + } + + // Housekeeping and cleanup policy execution. + return r.cleanup(oldRc, newRc, config) +} + +// scaleUp scales up newRc to desired by whatever increment is possible given +// the configured surge threshold. scaleUp will safely no-op as necessary when +// it detects redundancy or other relevant conditions. +func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, original, desired, maxSurge, maxUnavailable int, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*api.ReplicationController, error) { + // If we're already at the desired, do nothing. + if newRc.Spec.Replicas == desired { + return newRc, nil + } + + // Scale up as far as we can based on the surge limit. + increment := (original + maxSurge) - (oldRc.Spec.Replicas + newRc.Spec.Replicas) + // If the old is already scaled down, go ahead and scale all the way up. + if oldRc.Spec.Replicas == 0 { + increment = desired - newRc.Spec.Replicas + } + // We can't scale up without violating the surge limit, so do nothing. + if increment <= 0 { + return newRc, nil + } + // Increase the replica count, and deal with fenceposts. + newRc.Spec.Replicas += increment + if newRc.Spec.Replicas > desired { + newRc.Spec.Replicas = desired + } + // Perform the scale-up. + fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas) + scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams) + if err != nil { + return nil, err + } + return scaledRc, nil +} + +// scaleDown scales down oldRc to 0 at whatever increment possible given the +// thresholds defined on the config. scaleDown will safely no-op as necessary +// when it detects redundancy or other relevant conditions. +func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int, config *RollingUpdaterConfig) (*api.ReplicationController, error) { + // Already scaled down; do nothing. + if oldRc.Spec.Replicas == 0 { + return oldRc, nil + } + // Block until there are any pods ready. + oldAvailable, newAvailable, err := r.waitForReadyPods(config.Interval, config.Timeout, oldRc, newRc) + if err != nil { + return nil, err + } + // The old controller is considered as part of the total because we want to + // maintain minimum availability even with a volatile old controller. + // Scale down as much as possible while maintaining minimum availability. + decrement := (oldAvailable + newAvailable) - minAvailable + // The decrement normally shouldn't drop below 0 because the available count + // always starts below the old replica count, but the old replica count can + // decrement due to externalities like pods death in the replica set. This + // will be considered a transient condition; do nothing and try again later + // with new readiness values. + // + // If the most we can scale is 0, it means we can't scale down without + // violating the minimum. Do nothing and try again later when conditions may + // have changed. + if decrement <= 0 { + return oldRc, nil + } + // Reduce the replica count, and deal with fenceposts. + oldRc.Spec.Replicas -= decrement + if oldRc.Spec.Replicas < 0 { + oldRc.Spec.Replicas = 0 + } + // If the new is already fully scaled and available up to the desired size, go + // ahead and scale old all the way down. + if newRc.Spec.Replicas == desired && newAvailable == desired { + oldRc.Spec.Replicas = 0 + } + // Perform the scale-down. + fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas) + retryWait := &RetryParams{config.Interval, config.Timeout} + scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait) + if err != nil { + return nil, err + } + return scaledRc, nil +} + +// scalerScaleAndWait scales a controller using a Scaler and a real client. +func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { + scalerClient := NewScalerClient(r.c) + scaler, err := ScalerFor("ReplicationController", scalerClient) + if err != nil { + return nil, fmt.Errorf("Couldn't make scaler: %s", err) + } + if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { + return nil, err + } + return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name) +} + +// pollForReadyPods polls oldRc and newRc each interval and returns the old +// and new ready counts for their pods. If a pod is observed as being ready, +// it's considered ready even if it later becomes unready. +func (r *RollingUpdater) pollForReadyPods(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + controllers := []*api.ReplicationController{oldRc, newRc} + oldReady := 0 + newReady := 0 + err := wait.Poll(interval, timeout, func() (done bool, err error) { + anyReady := false + for _, controller := range controllers { + selector := labels.Set(controller.Spec.Selector).AsSelector() + pods, err := r.c.Pods(controller.Namespace).List(selector, fields.Everything()) + if err != nil { + return false, err + } + for _, pod := range pods.Items { + if api.IsPodReady(&pod) { + switch controller.Name { + case oldRc.Name: + oldReady++ + case newRc.Name: + newReady++ + } + anyReady = true + } + } + } + if anyReady { + return true, nil + } + return false, nil + }) + return oldReady, newReady, err +} + +// getOrCreateTargetControllerWithClient looks for an existing controller with +// sourceId. If found, the existing controller is returned with true +// indicating that the controller already exists. If the controller isn't +// found, a new one is created and returned along with false indicating the +// controller was created. +// +// Existing controllers are validated to ensure their sourceIdAnnotation +// matches sourceId; if there's a mismatch, an error is returned. +func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) { + existing, err := r.c.ReplicationControllers(controller.Namespace).Get(controller.Name) + if err != nil { + if !errors.IsNotFound(err) { + // There was an error trying to find the controller; don't assume we + // should create it. + return nil, false, err + } + if controller.Spec.Replicas <= 0 { + return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", controller.Name, controller.Spec) + } + // The controller wasn't found, so create it. + if controller.Annotations == nil { + controller.Annotations = map[string]string{} + } + controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", controller.Spec.Replicas) + controller.Annotations[sourceIdAnnotation] = sourceId + controller.Spec.Replicas = 0 + newRc, err := r.c.ReplicationControllers(r.ns).Create(controller) + return newRc, false, err + } + // Validate and use the existing controller. + annotations := existing.Annotations + source := annotations[sourceIdAnnotation] + _, ok := annotations[desiredReplicasAnnotation] + if source != sourceId || !ok { + return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceId, annotations) + } + return existing, true, nil +} + +// cleanupWithClients performs cleanup tasks after the rolling update. Update +// process related annotations are removed from oldRc and newRc. The +// CleanupPolicy on config is executed. +func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { + // Clean up annotations + var err error + newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name) + if err != nil { + return err + } + delete(newRc.Annotations, sourceIdAnnotation) + delete(newRc.Annotations, desiredReplicasAnnotation) + + newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc) + if err != nil { + return err + } + scalerClient := NewScalerClient(r.c) + if err = wait.Poll(config.Interval, config.Timeout, scalerClient.ControllerHasDesiredReplicas(newRc)); err != nil { + return err + } + newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name) + if err != nil { + return err + } + + switch config.CleanupPolicy { + case DeleteRollingUpdateCleanupPolicy: + // delete old rc + fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name) + return r.c.ReplicationControllers(r.ns).Delete(oldRc.Name) + case RenameRollingUpdateCleanupPolicy: + // delete old rc + fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name) + if err := r.c.ReplicationControllers(r.ns).Delete(oldRc.Name); err != nil { + return err + } + fmt.Fprintf(config.Out, "Renaming %s to %s\n", newRc.Name, oldRc.Name) + return Rename(r.c, newRc, oldRc.Name) + case PreserveRollingUpdateCleanupPolicy: + return nil + default: + return nil + } +} + +// func extractMaxValue is a helper to extract config max values as either +// absolute numbers or based on percentages of the original rc. +func extractMaxValue(field util.IntOrString, name string, original int) (int, error) { + switch field.Kind { + case util.IntstrInt: + if field.IntVal < 0 { + return 0, fmt.Errorf("%s must be >= 0", name) + } + return field.IntVal, nil + case util.IntstrString: + s := strings.Replace(field.StrVal, "%", "", -1) + v, err := strconv.Atoi(s) + if err != nil { + return 0, fmt.Errorf("invalid %s value %q: %v", name, field.StrVal, err) + } + if v < 0 { + return 0, fmt.Errorf("%s must be >= 0", name) + } + return int(math.Ceil(float64(original) * (float64(v)) / 100)), nil + } + return 0, fmt.Errorf("invalid kind %q for %s", field.Kind, name) +} + +func Rename(c client.ReplicationControllersNamespacer, rc *api.ReplicationController, newName string) error { + oldName := rc.Name + rc.Name = newName + rc.ResourceVersion = "" + + _, err := c.ReplicationControllers(rc.Namespace).Create(rc) + if err != nil { + return err + } + err = c.ReplicationControllers(rc.Namespace).Delete(oldName) + if err != nil && !errors.IsNotFound(err) { + return err + } + return nil +} + +func LoadExistingNextReplicationController(c client.ReplicationControllersNamespacer, namespace, newName string) (*api.ReplicationController, error) { if len(newName) == 0 { return nil, nil } @@ -157,22 +554,6 @@ func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldNa return newRc, nil } -// NewRollingUpdater creates a RollingUpdater from a client -func NewRollingUpdater(namespace string, client RollingUpdaterClient) *RollingUpdater { - return &RollingUpdater{ - c: client, - ns: namespace, - // Use a real scaleAndWait implementation. - scaleAndWait: scalerScaleAndWait(client, namespace), - } -} - -const ( - sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id" - desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas" - nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id" -) - func AbortRollingUpdate(c *RollingUpdaterConfig) { // Swap the controllers tmp := c.OldRc @@ -214,21 +595,6 @@ func UpdateExistingReplicationController(c client.Interface, oldRc *api.Replicat } } -// scalerScaleAndWait returns a scaleAndWait function which scales a -// controller using a Scaler and a real client. -func scalerScaleAndWait(client RollingUpdaterClient, namespace string) scaleAndWait { - scaler, err := ScalerFor("ReplicationController", client) - return func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - if err != nil { - return nil, fmt.Errorf("Couldn't make scaler: %s", err) - } - if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { - return nil, err - } - return client.GetReplicationController(namespace, rc.ObjectMeta.Name) - } -} - const MaxRetries = 3 func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { @@ -335,8 +701,8 @@ func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.R return rc, err } -func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.ReplicationController, error) { - list, err := r.ListReplicationControllers(namespace, labels.Everything()) +func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) { + list, err := r.ReplicationControllers(namespace).List(labels.Everything()) if err != nil { return nil, err } @@ -348,272 +714,3 @@ func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api. } return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name) } - -// Update all pods for a ReplicationController (oldRc) by creating a new -// controller (newRc) with 0 replicas, and synchronously scaling oldRc and -// newRc until oldRc has 0 replicas and newRc has the original # of desired -// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy. -// -// The scaling amount each interval is either 1 or based on a percent of the -// desired replicas. If a percentage is used and the percentage is negative, -// the scaling order is inverted to down/up instead of the default up/down. -// -// If an update from newRc to oldRc is already in progress, we attempt to -// drive it to completion. If an error occurs at any step of the update, the -// error will be returned. -// -// TODO: make this handle performing a rollback of a partially completed -// rollout. -func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { - out := config.Out - oldRc := config.OldRc - newRc := config.NewRc - updatePeriod := config.UpdatePeriod - interval := config.Interval - timeout := config.Timeout - - oldName := oldRc.ObjectMeta.Name - newName := newRc.ObjectMeta.Name - retry := &RetryParams{interval, timeout} - waitForReplicas := &RetryParams{interval, timeout} - if newRc.Spec.Replicas <= 0 { - return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", newName, newRc.Spec.Replicas) - } - desired := newRc.Spec.Replicas - sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID) - - // look for existing newRc, incase this update was previously started but interrupted - rc, existing, err := r.getExistingNewRc(sourceId, newName) - if existing { - fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName) - if err != nil { - return err - } - replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation] - desired, err = strconv.Atoi(replicas) - if err != nil { - return fmt.Errorf("Unable to parse annotation for %s: %s=%s", - newName, desiredReplicasAnnotation, replicas) - } - newRc = rc - } else { - fmt.Fprintf(out, "Creating %s\n", newName) - if newRc.ObjectMeta.Annotations == nil { - newRc.ObjectMeta.Annotations = map[string]string{} - } - newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired) - newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId - newRc.Spec.Replicas = 0 - newRc, err = r.c.CreateReplicationController(r.ns, newRc) - if err != nil { - return err - } - } - - // Compute the scale amount based on a percentage of the new desired count. - // A negative percentage indicates our scale direction should be down-first. - scaleAmount := 1 - skipFirstUp := false - if config.UpdatePercent != nil { - scaleAmount = int(math.Ceil(float64(desired) * (math.Abs(float64(*config.UpdatePercent)) / 100))) - if *config.UpdatePercent < 0 { - skipFirstUp = true - } - } - // Helpful output about what we're about to do. - direction := "up" - if skipFirstUp { - direction = "down" - } - fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (scale %s first by %d each interval)\n", - newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, direction, scaleAmount) - - // Scale newRc and oldRc until newRc has the desired number of replicas and - // oldRc has 0 replicas. - for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 { - // Choose up/down vs. down/up scaling direction. - if !skipFirstUp { - scaledRc, err := r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) - if err != nil { - return err - } - newRc = scaledRc - time.Sleep(updatePeriod) - skipFirstUp = true - } - scaledRc, err := r.scaleDown(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) - if err != nil { - return err - } - rc = scaledRc - time.Sleep(updatePeriod) - scaledRc, err = r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config) - if err != nil { - return err - } - newRc = scaledRc - } - - // Clean up annotations - if newRc, err = r.c.GetReplicationController(r.ns, newName); err != nil { - return err - } - delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation) - delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation) - newRc, err = r.updateAndWait(newRc, interval, timeout) - if err != nil { - return err - } - - switch config.CleanupPolicy { - case DeleteRollingUpdateCleanupPolicy: - // delete old rc - fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName) - return r.c.DeleteReplicationController(r.ns, oldName) - case RenameRollingUpdateCleanupPolicy: - // delete old rc - fmt.Fprintf(out, "Update succeeded. Deleting old controller: %s\n", oldName) - if err := r.c.DeleteReplicationController(r.ns, oldName); err != nil { - return err - } - fmt.Fprintf(out, "Renaming %s to %s\n", newRc.Name, oldName) - return r.rename(newRc, oldName) - case PreserveRollingUpdateCleanupPolicy: - return nil - default: - return nil - } -} - -// scaleUp scales up newRc to desired by scaleAmount. It accounts for -// fencepost conditions. If newRc is already scaled to desired, scaleUp does -// nothing. If the oldRc is already scaled to 0, newRc is scaled to desired -// immediately regardless of scale count. -func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) { - if newRc.Spec.Replicas == desired { - return newRc, nil - } - newRc.Spec.Replicas += scaleAmount - if newRc.Spec.Replicas > desired || oldRc.Spec.Replicas == 0 { - newRc.Spec.Replicas = desired - } - fmt.Fprintf(out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas) - scaledRc, err := r.scaleAndWait(newRc, retry, wait) - if err != nil { - return nil, err - } - err = config.UpdateAcceptor.Accept(scaledRc) - if err != nil { - return nil, fmt.Errorf("update rejected for %s: %v", scaledRc.Name, err) - } - return scaledRc, nil -} - -// scaleDown scales down oldRc to 0 by scaleAmount. It accounts for fencepost -// conditions. If oldRc is already scaled to 0, scaleDown does nothing. If -// newRc is already scaled to desired, oldRc is scaled to 0 immediately -// regardless of scaleAmount. -func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) { - if oldRc.Spec.Replicas == 0 { - return oldRc, nil - } - oldRc.Spec.Replicas -= scaleAmount - if oldRc.Spec.Replicas < 0 || newRc.Spec.Replicas == desired { - oldRc.Spec.Replicas = 0 - } - fmt.Fprintf(out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas) - scaledRc, err := r.scaleAndWait(oldRc, retry, wait) - if err != nil { - return nil, err - } - return scaledRc, nil -} - -func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) { - if rc, err = r.c.GetReplicationController(r.ns, name); err == nil { - existing = true - annotations := rc.ObjectMeta.Annotations - source := annotations[sourceIdAnnotation] - _, ok := annotations[desiredReplicasAnnotation] - if source != sourceId || !ok { - err = fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", name, sourceId, annotations) - } - return - } - err = nil - return -} - -func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) { - rc, err := r.c.UpdateReplicationController(r.ns, rc) - if err != nil { - return nil, err - } - if err = wait.Poll(interval, timeout, r.c.ControllerHasDesiredReplicas(rc)); err != nil { - return nil, err - } - return r.c.GetReplicationController(r.ns, rc.ObjectMeta.Name) -} - -func (r *RollingUpdater) rename(rc *api.ReplicationController, newName string) error { - return Rename(r.c, rc, newName) -} - -func Rename(c RollingUpdaterClient, rc *api.ReplicationController, newName string) error { - oldName := rc.Name - rc.Name = newName - rc.ResourceVersion = "" - - _, err := c.CreateReplicationController(rc.Namespace, rc) - if err != nil { - return err - } - err = c.DeleteReplicationController(rc.Namespace, oldName) - if err != nil && !errors.IsNotFound(err) { - return err - } - return nil -} - -// RollingUpdaterClient abstracts access to ReplicationControllers. -type RollingUpdaterClient interface { - ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) - GetReplicationController(namespace, name string) (*api.ReplicationController, error) - UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) - CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) - DeleteReplicationController(namespace, name string) error - ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc -} - -func NewRollingUpdaterClient(c client.Interface) RollingUpdaterClient { - return &realRollingUpdaterClient{c} -} - -// realRollingUpdaterClient is a RollingUpdaterClient which uses a Kube client. -type realRollingUpdaterClient struct { - client client.Interface -} - -func (c *realRollingUpdaterClient) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { - return c.client.ReplicationControllers(namespace).List(selector) -} - -func (c *realRollingUpdaterClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) { - return c.client.ReplicationControllers(namespace).Get(name) -} - -func (c *realRollingUpdaterClient) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return c.client.ReplicationControllers(namespace).Update(rc) -} - -func (c *realRollingUpdaterClient) CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return c.client.ReplicationControllers(namespace).Create(rc) -} - -func (c *realRollingUpdaterClient) DeleteReplicationController(namespace, name string) error { - return c.client.ReplicationControllers(namespace).Delete(name) -} - -func (c *realRollingUpdaterClient) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc { - return client.ControllerHasDesiredReplicas(c.client, rc) -} diff --git a/pkg/kubectl/rolling_updater_test.go b/pkg/kubectl/rolling_updater_test.go index 7bae929c93e..2c4907f9385 100644 --- a/pkg/kubectl/rolling_updater_test.go +++ b/pkg/kubectl/rolling_updater_test.go @@ -30,71 +30,18 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/wait" ) -type updaterFake struct { - *testclient.Fake - ctrl client.ReplicationControllerInterface -} - -func (c *updaterFake) ReplicationControllers(namespace string) client.ReplicationControllerInterface { - return c.ctrl -} - -func fakeClientFor(namespace string, responses []fakeResponse) client.Interface { - fake := testclient.Fake{} - return &updaterFake{ - &fake, - &fakeRc{ - &testclient.FakeReplicationControllers{ - Fake: &fake, - Namespace: namespace, - }, - responses, - }, - } -} - -type fakeResponse struct { - controller *api.ReplicationController - err error -} - -type fakeRc struct { - *testclient.FakeReplicationControllers - responses []fakeResponse -} - -func (c *fakeRc) Get(name string) (*api.ReplicationController, error) { - action := testclient.NewGetAction("replicationcontrollers", "", name) - if len(c.responses) == 0 { - return nil, fmt.Errorf("Unexpected Action: %s", action) - } - c.Fake.Invokes(action, nil) - result := c.responses[0] - c.responses = c.responses[1:] - return result.controller, result.err -} - -func (c *fakeRc) Create(controller *api.ReplicationController) (*api.ReplicationController, error) { - c.Fake.Invokes(testclient.NewCreateAction("replicationcontrollers", controller.Namespace, controller), nil) - return controller, nil -} - -func (c *fakeRc) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { - c.Fake.Invokes(testclient.NewUpdateAction("replicationcontrollers", controller.Namespace, controller), nil) - return controller, nil -} - -func oldRc(replicas int) *api.ReplicationController { +func oldRc(replicas int, original int) *api.ReplicationController { return &api.ReplicationController{ ObjectMeta: api.ObjectMeta{ Name: "foo-v1", UID: "7764ae47-9092-11e4-8393-42010af018ff", + Annotations: map[string]string{ + originalReplicasAnnotation: fmt.Sprintf("%d", original), + }, }, Spec: api.ReplicationControllerSpec{ Replicas: replicas, @@ -113,7 +60,7 @@ func oldRc(replicas int) *api.ReplicationController { } func newRc(replicas int, desired int) *api.ReplicationController { - rc := oldRc(replicas) + rc := oldRc(replicas, replicas) rc.Spec.Template = &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ Name: "foo-v2", @@ -131,379 +78,553 @@ func newRc(replicas int, desired int) *api.ReplicationController { return rc } +// TestUpdate performs complex scenario testing for rolling updates. It +// provides fine grained control over the states for each update interval to +// allow the expression of as many edge cases as possible. func TestUpdate(t *testing.T) { - // Helpers - Percent := func(p int) *int { - return &p + // up represents a simulated scale up event and expectation + type up struct { + // to is the expected replica count for a scale-up + to int } - var NilPercent *int - // Scenarios + // down represents a simulated scale down event and expectation + type down struct { + // oldReady is the number of oldRc replicas which will be seen + // as ready during the scale down attempt + oldReady int + // newReady is the number of newRc replicas which will be seen + // as ready during the scale up attempt + newReady int + // to is the expected replica count for the scale down + to int + // noop and to are mutually exclusive; if noop is true, that means for + // this down event, no scaling attempt should be made (for example, if + // by scaling down, the readiness minimum would be crossed.) + noop bool + } + tests := []struct { - oldRc, newRc *api.ReplicationController - accepted bool - percent *int - responses []fakeResponse - output string + name string + // oldRc is the "from" deployment + oldRc *api.ReplicationController + // newRc is the "to" deployment + newRc *api.ReplicationController + // whether newRc existed (false means it was created) + newRcExists bool + maxUnavail util.IntOrString + maxSurge util.IntOrString + // expected is the sequence of up/down events that will be simulated and + // verified + expected []interface{} + // output is the expected textual output written + output string }{ { - oldRc: oldRc(1), - newRc: newRc(1, 1), - accepted: true, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(1, 1), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, + name: "10->10 30/0 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("30%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 10, newReady: 0, to: 7}, + up{3}, + down{oldReady: 7, newReady: 3, to: 4}, + up{6}, + down{oldReady: 4, newReady: 6, to: 1}, + up{9}, + down{oldReady: 1, newReady: 9, to: 0}, + up{10}, }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 -Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(1), - newRc: newRc(1, 1), - accepted: true, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(1, 1), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - {newRc(1, 1), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 -Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(2), - newRc: newRc(2, 2), - accepted: true, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(1, 2), nil}, - {oldRc(1), nil}, - // scaling iteration - {newRc(2, 2), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(1, 1), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 7 pods available, don't exceed 10 pods) +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 6 Scaling foo-v1 down to 1 -Scaling foo-v2 up to 2 +Scaling foo-v2 up to 9 Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 +Scaling foo-v2 up to 10 `, - }, { - oldRc: oldRc(2), - newRc: newRc(7, 7), - accepted: true, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(1, 7), nil}, - {oldRc(1), nil}, - // scaling iteration - {newRc(2, 7), nil}, - {oldRc(0), nil}, - // final scale on newRc - {newRc(7, 7), nil}, - // cleanup annotations - {newRc(7, 7), nil}, - {newRc(7, 7), nil}, - {newRc(7, 7), nil}, + }, + { + name: "10->10 30/0 delayed readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("30%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 10, newReady: 0, to: 7}, + up{3}, + down{oldReady: 7, newReady: 0, noop: true}, + down{oldReady: 7, newReady: 1, to: 6}, + up{4}, + down{oldReady: 6, newReady: 4, to: 3}, + up{7}, + down{oldReady: 3, newReady: 7, to: 0}, + up{10}, }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 7, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 -Scaling foo-v1 down to 1 -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 0 -Scaling foo-v2 up to 7 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(7), - newRc: newRc(2, 2), - accepted: true, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(1, 2), nil}, - {oldRc(6), nil}, - // scaling iteration - {newRc(2, 2), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 7 pods available, don't exceed 10 pods) +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 3 Scaling foo-v1 down to 6 -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(7), - newRc: newRc(2, 2), - accepted: false, - percent: NilPercent, - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration (only up occurs since the update is rejected) - {newRc(1, 2), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 7 to 0 (scale up first by 1 each interval) -Scaling foo-v2 up to 1 -`, - }, { - oldRc: oldRc(10), - newRc: newRc(10, 10), - accepted: true, - percent: Percent(20), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(2, 10), nil}, - {oldRc(8), nil}, - // scaling iteration - {newRc(4, 10), nil}, - {oldRc(6), nil}, - // scaling iteration - {newRc(6, 10), nil}, - {oldRc(4), nil}, - // scaling iteration - {newRc(8, 10), nil}, - {oldRc(2), nil}, - // scaling iteration - {newRc(10, 10), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(10, 10), nil}, - {newRc(10, 10), nil}, - {newRc(10, 10), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval) -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 8 Scaling foo-v2 up to 4 -Scaling foo-v1 down to 6 +Scaling foo-v1 down to 3 +Scaling foo-v2 up to 7 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 10 +`, + }, { + name: "10->10 30/0 fast readiness, continuation", + oldRc: oldRc(7, 10), + newRc: newRc(3, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("30%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 7, newReady: 3, to: 4}, + up{6}, + down{oldReady: 4, newReady: 6, to: 1}, + up{9}, + down{oldReady: 1, newReady: 9, to: 0}, + up{10}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 3 to 10, scaling down foo-v1 from 7 to 0 (keep 7 pods available, don't exceed 10 pods) +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 6 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 9 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 10 +`, + }, { + name: "10->10 30/0 fast readiness, continued after restart which prevented first scale-up", + oldRc: oldRc(7, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("30%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 7, newReady: 0, noop: true}, + up{3}, + down{oldReady: 7, newReady: 3, to: 4}, + up{6}, + down{oldReady: 4, newReady: 6, to: 1}, + up{9}, + down{oldReady: 1, newReady: 9, to: 0}, + up{10}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 7 to 0 (keep 7 pods available, don't exceed 10 pods) +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 6 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 9 +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 10 +`, + }, { + name: "10->10 0/30 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("30%"), + expected: []interface{}{ + up{3}, + down{oldReady: 10, newReady: 3, to: 7}, + up{6}, + down{oldReady: 7, newReady: 6, to: 4}, + up{9}, + down{oldReady: 4, newReady: 9, to: 1}, + up{10}, + down{oldReady: 1, newReady: 10, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 10 pods available, don't exceed 13 pods) +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 6 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 9 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 0/30 delayed readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("30%"), + expected: []interface{}{ + up{3}, + down{oldReady: 10, newReady: 0, noop: true}, + down{oldReady: 10, newReady: 1, to: 9}, + up{4}, + down{oldReady: 9, newReady: 3, to: 7}, + up{6}, + down{oldReady: 7, newReady: 6, to: 4}, + up{9}, + down{oldReady: 4, newReady: 9, to: 1}, + up{10}, + down{oldReady: 1, newReady: 9, noop: true}, + down{oldReady: 1, newReady: 10, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 10 pods available, don't exceed 13 pods) +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 9 +Scaling foo-v2 up to 4 +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 6 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 9 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 10/20 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("10%"), + maxSurge: util.NewIntOrStringFromString("20%"), + expected: []interface{}{ + up{2}, + down{oldReady: 10, newReady: 2, to: 7}, + up{5}, + down{oldReady: 7, newReady: 5, to: 4}, + up{8}, + down{oldReady: 4, newReady: 8, to: 1}, + up{10}, + down{oldReady: 10, newReady: 1, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 9 pods available, don't exceed 12 pods) +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 5 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 8 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 10/20 delayed readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("10%"), + maxSurge: util.NewIntOrStringFromString("20%"), + expected: []interface{}{ + up{2}, + down{oldReady: 10, newReady: 2, to: 7}, + up{5}, + down{oldReady: 7, newReady: 4, to: 5}, + up{7}, + down{oldReady: 5, newReady: 4, noop: true}, + down{oldReady: 5, newReady: 7, to: 2}, + up{10}, + down{oldReady: 2, newReady: 9, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 9 pods available, don't exceed 12 pods) +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 5 +Scaling foo-v1 down to 5 +Scaling foo-v2 up to 7 +Scaling foo-v1 down to 2 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 10/20 fast readiness continued after restart which prevented first scale-down", + oldRc: oldRc(10, 10), + newRc: newRc(2, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("10%"), + maxSurge: util.NewIntOrStringFromString("20%"), + expected: []interface{}{ + down{oldReady: 10, newReady: 2, to: 7}, + up{5}, + down{oldReady: 7, newReady: 5, to: 4}, + up{8}, + down{oldReady: 4, newReady: 8, to: 1}, + up{10}, + down{oldReady: 1, newReady: 10, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 2 to 10, scaling down foo-v1 from 10 to 0 (keep 9 pods available, don't exceed 12 pods) +Scaling foo-v1 down to 7 +Scaling foo-v2 up to 5 +Scaling foo-v1 down to 4 +Scaling foo-v2 up to 8 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 0/100 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("100%"), + expected: []interface{}{ + up{10}, + down{oldReady: 10, newReady: 10, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 10 pods available, don't exceed 20 pods) +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 0/100 delayed readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("100%"), + expected: []interface{}{ + up{10}, + down{oldReady: 10, newReady: 0, noop: true}, + down{oldReady: 10, newReady: 2, to: 8}, + down{oldReady: 8, newReady: 7, to: 3}, + down{oldReady: 3, newReady: 10, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 10 pods available, don't exceed 20 pods) +Scaling foo-v2 up to 10 +Scaling foo-v1 down to 8 +Scaling foo-v1 down to 3 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 100/0 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("100%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 10, newReady: 0, to: 0}, + up{10}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 10, scaling down foo-v1 from 10 to 0 (keep 0 pods available, don't exceed 10 pods) +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 10 +`, + }, { + name: "1->1 10/0 fast readiness", + oldRc: oldRc(1, 1), + newRc: newRc(0, 1), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("10%"), + maxSurge: util.NewIntOrStringFromString("0%"), + expected: []interface{}{ + down{oldReady: 1, newReady: 0, to: 0}, + up{1}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (keep 0 pods available, don't exceed 1 pods) +Scaling foo-v1 down to 0 +Scaling foo-v2 up to 1 +`, + }, { + name: "1->1 0/10 delayed readiness", + oldRc: oldRc(1, 1), + newRc: newRc(0, 1), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("10%"), + expected: []interface{}{ + up{1}, + down{oldReady: 1, newReady: 0, noop: true}, + down{oldReady: 1, newReady: 1, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (keep 1 pods available, don't exceed 2 pods) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 0 +`, + }, { + name: "1->1 10/10 delayed readiness", + oldRc: oldRc(1, 1), + newRc: newRc(0, 1), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("10%"), + maxSurge: util.NewIntOrStringFromString("10%"), + expected: []interface{}{ + up{1}, + down{oldReady: 1, newReady: 0, noop: true}, + down{oldReady: 1, newReady: 1, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 1, scaling down foo-v1 from 1 to 0 (keep 0 pods available, don't exceed 2 pods) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 0 +`, + }, { + name: "3->3 1/1 fast readiness (absolute values)", + oldRc: oldRc(3, 3), + newRc: newRc(0, 3), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromInt(0), + maxSurge: util.NewIntOrStringFromInt(1), + expected: []interface{}{ + up{1}, + down{oldReady: 3, newReady: 1, to: 2}, + up{2}, + down{oldReady: 2, newReady: 2, to: 1}, + up{3}, + down{oldReady: 1, newReady: 3, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 3, scaling down foo-v1 from 3 to 0 (keep 3 pods available, don't exceed 4 pods) +Scaling foo-v2 up to 1 +Scaling foo-v1 down to 2 +Scaling foo-v2 up to 2 +Scaling foo-v1 down to 1 +Scaling foo-v2 up to 3 +Scaling foo-v1 down to 0 +`, + }, { + name: "10->10 0/20 fast readiness, continued after restart which resulted in partial first scale-up", + oldRc: oldRc(6, 10), + newRc: newRc(5, 10), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("20%"), + expected: []interface{}{ + up{6}, + down{oldReady: 6, newReady: 6, to: 4}, + up{8}, + down{oldReady: 4, newReady: 8, to: 2}, + up{10}, + down{oldReady: 10, newReady: 2, to: 0}, + }, + output: `Created foo-v2 +Scaling up foo-v2 from 5 to 10, scaling down foo-v1 from 6 to 0 (keep 10 pods available, don't exceed 12 pods) Scaling foo-v2 up to 6 Scaling foo-v1 down to 4 Scaling foo-v2 up to 8 Scaling foo-v1 down to 2 Scaling foo-v2 up to 10 Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 `, }, { - oldRc: oldRc(2), - newRc: newRc(6, 6), - accepted: true, - percent: Percent(50), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(3, 6), nil}, - {oldRc(0), nil}, - // scaling iteration - {newRc(6, 6), nil}, - // cleanup annotations - {newRc(6, 6), nil}, - {newRc(6, 6), nil}, - {newRc(6, 6), nil}, + name: "10->20 0/300 fast readiness", + oldRc: oldRc(10, 10), + newRc: newRc(0, 20), + newRcExists: false, + maxUnavail: util.NewIntOrStringFromString("0%"), + maxSurge: util.NewIntOrStringFromString("300%"), + expected: []interface{}{ + up{20}, + down{oldReady: 10, newReady: 20, to: 0}, }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 6, scaling down foo-v1 from 2 to 0 (scale up first by 3 each interval) -Scaling foo-v2 up to 3 + output: `Created foo-v2 +Scaling up foo-v2 from 0 to 20, scaling down foo-v1 from 10 to 0 (keep 10 pods available, don't exceed 40 pods) +Scaling foo-v2 up to 20 Scaling foo-v1 down to 0 -Scaling foo-v2 up to 6 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(10), - newRc: newRc(3, 3), - accepted: true, - percent: Percent(50), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {newRc(2, 3), nil}, - {oldRc(8), nil}, - // scaling iteration - {newRc(3, 3), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 3, scaling down foo-v1 from 10 to 0 (scale up first by 2 each interval) -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 8 -Scaling foo-v2 up to 3 -Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(4), - newRc: newRc(4, 4), - accepted: true, - percent: Percent(-50), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {oldRc(2), nil}, - {newRc(2, 4), nil}, - // scaling iteration - {oldRc(0), nil}, - {newRc(4, 4), nil}, - // cleanup annotations - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 2 each interval) -Scaling foo-v1 down to 2 -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 0 -Scaling foo-v2 up to 4 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(2), - newRc: newRc(4, 4), - accepted: true, - percent: Percent(-50), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {oldRc(0), nil}, - {newRc(4, 4), nil}, - // cleanup annotations - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 2 to 0 (scale down first by 2 each interval) -Scaling foo-v1 down to 0 -Scaling foo-v2 up to 4 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(4), - newRc: newRc(2, 2), - accepted: true, - percent: Percent(-50), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {oldRc(3), nil}, - {newRc(1, 2), nil}, - // scaling iteration - {oldRc(2), nil}, - {newRc(2, 2), nil}, - // scaling iteration - {oldRc(0), nil}, - // cleanup annotations - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - {newRc(2, 2), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 2, scaling down foo-v1 from 4 to 0 (scale down first by 1 each interval) -Scaling foo-v1 down to 3 -Scaling foo-v2 up to 1 -Scaling foo-v1 down to 2 -Scaling foo-v2 up to 2 -Scaling foo-v1 down to 0 -Update succeeded. Deleting foo-v1 -`, - }, { - oldRc: oldRc(4), - newRc: newRc(4, 4), - accepted: true, - percent: Percent(-100), - responses: []fakeResponse{ - // no existing newRc - {nil, fmt.Errorf("not found")}, - // scaling iteration - {oldRc(0), nil}, - {newRc(4, 4), nil}, - // cleanup annotations - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - {newRc(4, 4), nil}, - }, - output: `Creating foo-v2 -Scaling up foo-v2 from 0 to 4, scaling down foo-v1 from 4 to 0 (scale down first by 4 each interval) -Scaling foo-v1 down to 0 -Scaling foo-v2 up to 4 -Update succeeded. Deleting foo-v1 `, }, } - for _, test := range tests { - client := NewRollingUpdaterClient(fakeClientFor("default", test.responses)) - updater := RollingUpdater{ - c: client, + for i, test := range tests { + // Extract expectations into some makeshift FIFOs so they can be returned + // in the correct order from the right places. This lets scale downs be + // expressed a single event even though the data is used from multiple + // interface calls. + oldReady := []int{} + newReady := []int{} + upTo := []int{} + downTo := []int{} + for _, event := range test.expected { + switch e := event.(type) { + case down: + oldReady = append(oldReady, e.oldReady) + newReady = append(newReady, e.newReady) + if !e.noop { + downTo = append(downTo, e.to) + } + case up: + upTo = append(upTo, e.to) + } + } + + // Make a way to get the next item from our FIFOs. Returns -1 if the array + // is empty. + next := func(s *[]int) int { + slice := *s + v := -1 + if len(slice) > 0 { + v = slice[0] + if len(slice) > 1 { + *s = slice[1:] + } else { + *s = []int{} + } + } + return v + } + t.Logf("running test %d (%s) (up: %v, down: %v, oldReady: %v, newReady: %v)", i, test.name, upTo, downTo, oldReady, newReady) + updater := &RollingUpdater{ ns: "default", scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - return client.GetReplicationController(rc.Namespace, rc.Name) + // Return a scale up or scale down expectation depending on the rc, + // and throw errors if there is no expectation expressed for this + // call. + expected := -1 + switch { + case rc == test.newRc: + t.Logf("scaling up %s:%d", rc.Name, rc.Spec.Replicas) + expected = next(&upTo) + case rc == test.oldRc: + t.Logf("scaling down %s:%d", rc.Name, rc.Spec.Replicas) + expected = next(&downTo) + } + if expected == -1 { + t.Fatalf("unexpected scale of %s to %d", rc.Name, rc.Spec.Replicas) + } else if e, a := expected, rc.Spec.Replicas; e != a { + t.Fatalf("expected scale of %s to %d, got %d", rc.Name, e, a) + } + // Simulate the scale. + rc.Status.Replicas = rc.Spec.Replicas + return rc, nil }, + getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) { + // Simulate a create vs. update of an existing controller. + return test.newRc, test.newRcExists, nil + }, + cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { + return nil + }, + } + // Set up a mock readiness check which handles the test assertions. + updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + // Return simulated readiness, and throw an error if this call has no + // expectations defined. + oldReady := next(&oldReady) + newReady := next(&newReady) + if oldReady == -1 || newReady == -1 { + t.Fatalf("unexpected waitForReadyPods call for:\noldRc: %+v\nnewRc: %+v", oldRc, newRc) + } + return oldReady, newReady, nil } var buffer bytes.Buffer - acceptor := &testAcceptor{ - accept: func(rc *api.ReplicationController) error { - if test.accepted { - return nil - } - return fmt.Errorf("rejecting controller %s", rc.Name) - }, - } config := &RollingUpdaterConfig{ Out: &buffer, OldRc: test.oldRc, @@ -512,15 +633,12 @@ Update succeeded. Deleting foo-v1 Interval: time.Millisecond, Timeout: time.Millisecond, CleanupPolicy: DeleteRollingUpdateCleanupPolicy, - UpdateAcceptor: acceptor, - UpdatePercent: test.percent, + MaxUnavailable: test.maxUnavail, + MaxSurge: test.maxSurge, } err := updater.Update(config) - if test.accepted && err != nil { - t.Errorf("Update failed: %v", err) - } - if !test.accepted && err == nil { - t.Errorf("Expected update to fail") + if err != nil { + t.Errorf("unexpected error: %v", err) } if buffer.String() != test.output { t.Errorf("Bad output. expected:\n%s\ngot:\n%s", test.output, buffer.String()) @@ -528,166 +646,181 @@ Update succeeded. Deleting foo-v1 } } -func PTestUpdateRecovery(t *testing.T) { - // Test recovery from interruption - rc := oldRc(2) - rcExisting := newRc(1, 3) - - output := `Continuing update with existing controller foo-v2. -Scaling up foo-v2 from 1 to 3, scaling down foo-v1 from 2 to 0 (scale up first by 1 each interval) -Scaling foo-v2 to 2 -Scaling foo-v1 to 1 -Scaling foo-v2 to 3 -Scaling foo-v2 to 0 -Update succeeded. Deleting foo-v1 -` - responses := []fakeResponse{ - // Existing newRc - {rcExisting, nil}, - // scaling iteration - {newRc(2, 2), nil}, - {oldRc(1), nil}, - // scaling iteration - {newRc(3, 3), nil}, - {oldRc(0), nil}, - // cleanup annotations - {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - {newRc(3, 3), nil}, - } - - client := NewRollingUpdaterClient(fakeClientFor("default", responses)) - updater := RollingUpdater{ - c: client, +// TestUpdate_progressTimeout ensures that an update which isn't making any +// progress will eventually time out with a specified error. +func TestUpdate_progressTimeout(t *testing.T) { + oldRc := oldRc(2, 2) + newRc := newRc(0, 2) + updater := &RollingUpdater{ ns: "default", scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - return client.GetReplicationController(rc.Namespace, rc.Name) + // Do nothing. + return rc, nil + }, + getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) { + return newRc, false, nil + }, + cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { + return nil }, } - + updater.waitForReadyPods = func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + // Coerce a timeout by pods never becoming ready. + return 0, 0, nil + } var buffer bytes.Buffer config := &RollingUpdaterConfig{ Out: &buffer, - OldRc: rc, - NewRc: rcExisting, + OldRc: oldRc, + NewRc: newRc, UpdatePeriod: 0, Interval: time.Millisecond, Timeout: time.Millisecond, CleanupPolicy: DeleteRollingUpdateCleanupPolicy, - UpdateAcceptor: DefaultUpdateAcceptor, + MaxUnavailable: util.NewIntOrStringFromInt(0), + MaxSurge: util.NewIntOrStringFromInt(1), } - if err := updater.Update(config); err != nil { - t.Errorf("Update failed: %v", err) + err := updater.Update(config) + if err == nil { + t.Fatalf("expected an error") } - if buffer.String() != output { - t.Errorf("Output was not as expected. Expected:\n%s\nGot:\n%s", output, buffer.String()) + if e, a := "timed out waiting for any update progress to be made", err.Error(); e != a { + t.Fatalf("expected error message: %s, got: %s", e, a) } } -// TestRollingUpdater_preserveCleanup ensures that the old controller isn't -// deleted following a successful deployment. -func TestRollingUpdater_preserveCleanup(t *testing.T) { - rc := oldRc(2) - rcExisting := newRc(1, 3) - - client := &rollingUpdaterClientImpl{ - GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) { - switch name { - case rc.Name: - return rc, nil - case rcExisting.Name: - return rcExisting, nil - default: - return nil, fmt.Errorf("unexpected get call for %s/%s", namespace, name) - } - }, - UpdateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return rc, nil - }, - CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - t.Fatalf("unexpected call to create %s/rc:%#v", namespace, rc) - return nil, nil - }, - DeleteReplicationControllerFn: func(namespace, name string) error { - t.Fatalf("unexpected call to delete %s/%s", namespace, name) - return nil - }, - ControllerHasDesiredReplicasFn: func(rc *api.ReplicationController) wait.ConditionFunc { - return func() (done bool, err error) { - return true, nil - } - }, +func TestUpdate_assignOriginalAnnotation(t *testing.T) { + oldRc := oldRc(1, 1) + delete(oldRc.Annotations, originalReplicasAnnotation) + newRc := newRc(1, 1) + var updatedOldRc *api.ReplicationController + fake := &testclient.Fake{} + fake.ReactFn = func(action testclient.Action) (runtime.Object, error) { + switch a := action.(type) { + case testclient.GetAction: + return oldRc, nil + case testclient.UpdateAction: + updatedOldRc = a.GetObject().(*api.ReplicationController) + return updatedOldRc, nil + } + return nil, nil } updater := &RollingUpdater{ - ns: "default", - c: client, - scaleAndWait: scalerScaleAndWait(client, "default"), + c: fake, + ns: "default", + scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { + return rc, nil + }, + getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) { + return newRc, false, nil + }, + cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { + return nil + }, + waitForReadyPods: func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { + return 1, 1, nil + }, } - + var buffer bytes.Buffer config := &RollingUpdaterConfig{ - Out: ioutil.Discard, - OldRc: rc, - NewRc: rcExisting, + Out: &buffer, + OldRc: oldRc, + NewRc: newRc, UpdatePeriod: 0, Interval: time.Millisecond, Timeout: time.Millisecond, - CleanupPolicy: PreserveRollingUpdateCleanupPolicy, - UpdateAcceptor: DefaultUpdateAcceptor, + CleanupPolicy: DeleteRollingUpdateCleanupPolicy, + MaxUnavailable: util.NewIntOrStringFromString("100%"), } err := updater.Update(config) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + if updatedOldRc == nil { + t.Fatalf("expected rc to be updated") + } + if e, a := "1", updatedOldRc.Annotations[originalReplicasAnnotation]; e != a { + t.Fatalf("expected annotation value %s, got %s", e, a) } } -func TestRename(t *testing.T) { +// TestRollingUpdater_cleanupWithClients ensures that the cleanup policy is +// correctly implemented. +func TestRollingUpdater_cleanupWithClients(t *testing.T) { + rc := oldRc(2, 2) + rcExisting := newRc(1, 3) + tests := []struct { - namespace string - newName string - oldName string - err error - expectError bool + name string + policy RollingUpdaterCleanupPolicy + responses []runtime.Object + expected []string }{ { - namespace: "default", - newName: "bar", - oldName: "foo", + name: "preserve", + policy: PreserveRollingUpdateCleanupPolicy, + responses: []runtime.Object{rcExisting}, + expected: []string{ + "get", + "update", + "get", + "get", + }, }, { - namespace: "default", - newName: "bar", - oldName: "foo", - err: fmt.Errorf("Test Error"), - expectError: true, + name: "delete", + policy: DeleteRollingUpdateCleanupPolicy, + responses: []runtime.Object{rcExisting}, + expected: []string{ + "get", + "update", + "get", + "get", + "delete", + }, + }, + { + name: "rename", + policy: RenameRollingUpdateCleanupPolicy, + responses: []runtime.Object{rcExisting}, + expected: []string{ + "get", + "update", + "get", + "get", + "delete", + "create", + "delete", + }, }, } + for _, test := range tests { - fakeClient := &rollingUpdaterClientImpl{ - CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - if namespace != test.namespace { - t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace) - } - if rc.Name != test.newName { - t.Errorf("unexepected name: %s, expected %s", rc.Name, test.newName) - } - return rc, test.err - }, - DeleteReplicationControllerFn: func(namespace, name string) error { - if namespace != test.namespace { - t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace) - } - if name != test.oldName { - t.Errorf("unexepected name: %s, expected %s", name, test.oldName) - } - return nil - }, + fake := testclient.NewSimpleFake(test.responses...) + updater := &RollingUpdater{ + ns: "default", + c: fake, } - err := Rename(fakeClient, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: test.namespace, Name: test.oldName}}, test.newName) - if err != nil && !test.expectError { + config := &RollingUpdaterConfig{ + Out: ioutil.Discard, + OldRc: rc, + NewRc: rcExisting, + UpdatePeriod: 0, + Interval: time.Millisecond, + Timeout: time.Millisecond, + CleanupPolicy: test.policy, + } + err := updater.cleanupWithClients(rc, rcExisting, config) + if err != nil { t.Errorf("unexpected error: %v", err) } - if err == nil && test.expectError { - t.Errorf("unexpected non-error") + if len(fake.Actions()) != len(test.expected) { + t.Fatalf("%s: unexpected actions: %v, expected %v", test.name, fake.Actions, test.expected) + } + for j, action := range fake.Actions() { + if e, a := test.expected[j], action.GetVerb(); e != a { + t.Errorf("%s: unexpected action: expected %s, got %s", test.name, e, a) + } } } } @@ -764,12 +897,8 @@ func TestFindSourceController(t *testing.T) { }, } for _, test := range tests { - fakeClient := rollingUpdaterClientImpl{ - ListReplicationControllersFn: func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { - return test.list, test.err - }, - } - ctrl, err := FindSourceController(&fakeClient, "default", test.name) + fakeClient := testclient.NewSimpleFake(test.list) + ctrl, err := FindSourceController(fakeClient, "default", test.name) if test.expectError && err == nil { t.Errorf("unexpected non-error") } @@ -864,7 +993,7 @@ func TestUpdateExistingReplicationController(t *testing.T) { } for _, test := range tests { buffer := &bytes.Buffer{} - fakeClient := fakeClientFor("default", []fakeResponse{}) + fakeClient := testclient.NewSimpleFake(test.expectedRc) rc, err := UpdateExistingReplicationController(fakeClient, test.rc, "default", test.name, test.deploymentKey, test.deploymentValue, buffer) if !reflect.DeepEqual(rc, test.expectedRc) { t.Errorf("expected:\n%#v\ngot:\n%#v\n", test.expectedRc, rc) @@ -1063,44 +1192,181 @@ func TestAddDeploymentHash(t *testing.T) { } } -// rollingUpdaterClientImpl is a dynamic RollingUpdaterClient. -type rollingUpdaterClientImpl struct { - ListReplicationControllersFn func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) - GetReplicationControllerFn func(namespace, name string) (*api.ReplicationController, error) - UpdateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) - CreateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) - DeleteReplicationControllerFn func(namespace, name string) error - ControllerHasDesiredReplicasFn func(rc *api.ReplicationController) wait.ConditionFunc +func TestRollingUpdater_pollForReadyPods(t *testing.T) { + mkpod := func(owner *api.ReplicationController, ready bool) *api.Pod { + labels := map[string]string{} + for k, v := range owner.Spec.Selector { + labels[k] = v + } + status := api.ConditionTrue + if !ready { + status = api.ConditionFalse + } + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod", + Labels: labels, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: status, + }, + }, + }, + } + } + + tests := []struct { + oldRc *api.ReplicationController + newRc *api.ReplicationController + // expectated old/new ready counts + oldReady int + newReady int + // pods owned by the rcs; indicate whether they're ready + oldPods []bool + newPods []bool + }{ + { + oldRc: oldRc(4, 4), + newRc: newRc(4, 4), + oldReady: 4, + newReady: 2, + oldPods: []bool{ + true, + true, + true, + true, + }, + newPods: []bool{ + true, + false, + true, + false, + }, + }, + { + oldRc: oldRc(4, 4), + newRc: newRc(4, 4), + oldReady: 0, + newReady: 1, + oldPods: []bool{ + false, + }, + newPods: []bool{ + true, + }, + }, + { + oldRc: oldRc(4, 4), + newRc: newRc(4, 4), + oldReady: 1, + newReady: 0, + oldPods: []bool{ + true, + }, + newPods: []bool{ + false, + }, + }, + } + + for i, test := range tests { + t.Logf("evaluating test %d", i) + // Populate the fake client with pods associated with their owners. + pods := []runtime.Object{} + for _, ready := range test.oldPods { + pods = append(pods, mkpod(test.oldRc, ready)) + } + for _, ready := range test.newPods { + pods = append(pods, mkpod(test.newRc, ready)) + } + client := testclient.NewSimpleFake(pods...) + + updater := &RollingUpdater{ + ns: "default", + c: client, + } + oldReady, newReady, err := updater.pollForReadyPods(time.Millisecond, time.Second, test.oldRc, test.newRc) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if e, a := test.oldReady, oldReady; e != a { + t.Errorf("expected old ready %d, got %d", e, a) + } + if e, a := test.newReady, newReady; e != a { + t.Errorf("expected new ready %d, got %d", e, a) + } + } } -func (c *rollingUpdaterClientImpl) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) { - return c.ListReplicationControllersFn(namespace, selector) -} +func TestRollingUpdater_extractMaxValue(t *testing.T) { + tests := []struct { + field util.IntOrString + original int + expected int + valid bool + }{ + { + field: util.NewIntOrStringFromInt(1), + original: 100, + expected: 1, + valid: true, + }, + { + field: util.NewIntOrStringFromInt(0), + original: 100, + expected: 0, + valid: true, + }, + { + field: util.NewIntOrStringFromInt(-1), + original: 100, + valid: false, + }, + { + field: util.NewIntOrStringFromString("10%"), + original: 100, + expected: 10, + valid: true, + }, + { + field: util.NewIntOrStringFromString("100%"), + original: 100, + expected: 100, + valid: true, + }, + { + field: util.NewIntOrStringFromString("200%"), + original: 100, + expected: 200, + valid: true, + }, + { + field: util.NewIntOrStringFromString("0%"), + original: 100, + expected: 0, + valid: true, + }, + { + field: util.NewIntOrStringFromString("-1%"), + original: 100, + valid: false, + }, + } -func (c *rollingUpdaterClientImpl) GetReplicationController(namespace, name string) (*api.ReplicationController, error) { - return c.GetReplicationControllerFn(namespace, name) -} - -func (c *rollingUpdaterClientImpl) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return c.UpdateReplicationControllerFn(namespace, rc) -} - -func (c *rollingUpdaterClientImpl) CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return c.CreateReplicationControllerFn(namespace, rc) -} - -func (c *rollingUpdaterClientImpl) DeleteReplicationController(namespace, name string) error { - return c.DeleteReplicationControllerFn(namespace, name) -} - -func (c *rollingUpdaterClientImpl) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc { - return c.ControllerHasDesiredReplicasFn(rc) -} - -type testAcceptor struct { - accept func(*api.ReplicationController) error -} - -func (a *testAcceptor) Accept(rc *api.ReplicationController) error { - return a.accept(rc) + for i, test := range tests { + t.Logf("evaluating test %d", i) + max, err := extractMaxValue(test.field, "field", test.original) + if test.valid && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !test.valid && err == nil { + t.Fatalf("expected an error") + } + if e, a := test.expected, max; e != a { + t.Fatalf("expected max %d, got %d", e, a) + } + } } diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 48d96c8bdd0..82c649e7d3b 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -90,7 +90,6 @@ var _ = Describe("Kubectl client", func() { nautilusPath = filepath.Join(updateDemoRoot, "nautilus-rc.yaml") kittenPath = filepath.Join(updateDemoRoot, "kitten-rc.yaml") }) - It("should create and stop a replication controller", func() { defer cleanup(nautilusPath, ns, updateDemoSelector)