mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #11942 from ironcladlou/rolling-update-availability
Auto commit by PR queue bot
This commit is contained in:
commit
0a062c5b24
@ -680,12 +680,6 @@ __EOF__
|
|||||||
kubectl delete pod valid-pod "${kube_flags[@]}"
|
kubectl delete pod valid-pod "${kube_flags[@]}"
|
||||||
kubectl delete service frontend{,-2,-3,-4,-5} "${kube_flags[@]}"
|
kubectl delete service frontend{,-2,-3,-4,-5} "${kube_flags[@]}"
|
||||||
|
|
||||||
### Perform a rolling update with --image
|
|
||||||
# Command
|
|
||||||
kubectl rolling-update frontend --image=kubernetes/pause --update-period=10ns --poll-interval=10ms "${kube_flags[@]}"
|
|
||||||
# Post-condition: current image IS kubernetes/pause
|
|
||||||
kube::test::get_object_assert 'rc frontend' '{{range \$c:=$rc_container_image_field}} {{\$c.image}} {{end}}' ' +kubernetes/pause +'
|
|
||||||
|
|
||||||
### Delete replication controller with id
|
### Delete replication controller with id
|
||||||
# Pre-condition: frontend replication controller is running
|
# Pre-condition: frontend replication controller is running
|
||||||
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend:'
|
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend:'
|
||||||
|
@ -44,8 +44,13 @@ func (c *FakePods) List(label labels.Selector, field fields.Selector) (*api.PodL
|
|||||||
if obj == nil {
|
if obj == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
list := &api.PodList{}
|
||||||
return obj.(*api.PodList), err
|
for _, pod := range obj.(*api.PodList).Items {
|
||||||
|
if label.Matches(labels.Set(pod.Labels)) {
|
||||||
|
list.Items = append(list.Items, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FakePods) Create(pod *api.Pod) (*api.Pod, error) {
|
func (c *FakePods) Create(pod *api.Pod) (*api.Pod, error) {
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RollingUpdateOptions is the start of the data required to perform the operation. As new fields are added, add them here instead of
|
// RollingUpdateOptions is the start of the data required to perform the operation. As new fields are added, add them here instead of
|
||||||
@ -148,8 +149,6 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
updaterClient := kubectl.NewRollingUpdaterClient(client)
|
|
||||||
|
|
||||||
var newRc *api.ReplicationController
|
var newRc *api.ReplicationController
|
||||||
// fetch rc
|
// fetch rc
|
||||||
oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName)
|
oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName)
|
||||||
@ -158,11 +157,11 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// We're in the middle of a rename, look for an RC with a source annotation of oldName
|
// We're in the middle of a rename, look for an RC with a source annotation of oldName
|
||||||
newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName)
|
newRc, err := kubectl.FindSourceController(client, cmdNamespace, oldName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName)
|
return kubectl.Rename(client, newRc, oldName)
|
||||||
}
|
}
|
||||||
|
|
||||||
var keepOldName bool
|
var keepOldName bool
|
||||||
@ -242,7 +241,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||||||
filename, oldName)
|
filename, oldName)
|
||||||
}
|
}
|
||||||
|
|
||||||
updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient)
|
updater := kubectl.NewRollingUpdater(newRc.Namespace, client)
|
||||||
|
|
||||||
// To successfully pull off a rolling update the new and old rc have to differ
|
// To successfully pull off a rolling update the new and old rc have to differ
|
||||||
// by at least one selector. Every new pod should have the selector and every
|
// by at least one selector. Every new pod should have the selector and every
|
||||||
@ -286,7 +285,8 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
|||||||
Interval: interval,
|
Interval: interval,
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
CleanupPolicy: updateCleanupPolicy,
|
CleanupPolicy: updateCleanupPolicy,
|
||||||
UpdateAcceptor: kubectl.DefaultUpdateAcceptor,
|
MaxUnavailable: util.NewIntOrStringFromInt(0),
|
||||||
|
MaxSurge: util.NewIntOrStringFromInt(1),
|
||||||
}
|
}
|
||||||
if cmdutil.GetFlagBool(cmd, "rollback") {
|
if cmdutil.GetFlagBool(cmd, "rollback") {
|
||||||
kubectl.AbortRollingUpdate(config)
|
kubectl.AbortRollingUpdate(config)
|
||||||
|
@ -30,23 +30,16 @@ import (
|
|||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RollingUpdater provides methods for updating replicated pods in a predictable,
|
const (
|
||||||
// fault-tolerant way.
|
sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
|
||||||
type RollingUpdater struct {
|
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
|
||||||
// Client interface for creating and updating controllers
|
originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas"
|
||||||
c RollingUpdaterClient
|
nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
|
||||||
// Namespace for resources
|
)
|
||||||
ns string
|
|
||||||
// scaleAndWait scales a controller and returns its updated state.
|
|
||||||
scaleAndWait scaleAndWait
|
|
||||||
}
|
|
||||||
|
|
||||||
// scaleAndWait scales rc and returns its updated state. This typedef is to
|
|
||||||
// abstract away the use of a Scaler to ease testing.
|
|
||||||
type scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error)
|
|
||||||
|
|
||||||
// RollingUpdaterConfig is the configuration for a rolling deployment process.
|
// RollingUpdaterConfig is the configuration for a rolling deployment process.
|
||||||
type RollingUpdaterConfig struct {
|
type RollingUpdaterConfig struct {
|
||||||
@ -67,16 +60,26 @@ type RollingUpdaterConfig struct {
|
|||||||
// CleanupPolicy defines the cleanup action to take after the deployment is
|
// CleanupPolicy defines the cleanup action to take after the deployment is
|
||||||
// complete.
|
// complete.
|
||||||
CleanupPolicy RollingUpdaterCleanupPolicy
|
CleanupPolicy RollingUpdaterCleanupPolicy
|
||||||
// UpdateAcceptor is given a chance to accept the new controller after each
|
// The maximum number of pods that can be unavailable during the update.
|
||||||
// scale-up operation. If the controller is accepted, updates continue; if
|
// Value can be an absolute number (ex: 5) or a percentage of total pods at
|
||||||
// the controller is rejected, the update will fail immediately.
|
// the start of update (ex: 10%). Absolute number is calculated from
|
||||||
UpdateAcceptor UpdateAcceptor
|
// percentage by rounding up. This can not be 0 if MaxSurge is 0. By
|
||||||
// UpdatePercent is optional; if specified, the amount of replicas scaled up
|
// default, a fixed value of 1 is used. Example: when this is set to 30%,
|
||||||
// and down each interval will be computed as a percentage of the desired
|
// the old RC can be scaled down by 30% immediately when the rolling update
|
||||||
// replicas for the new RC. If UpdatePercent is nil, one replica will be
|
// starts. Once new pods are ready, old RC can be scaled down further,
|
||||||
// scaled up and down each interval. If UpdatePercent is negative, the order
|
// followed by scaling up the new RC, ensuring that at least 70% of original
|
||||||
// of scaling will be down/up instead of up/down.
|
// number of pods are available at all times during the update.
|
||||||
UpdatePercent *int
|
MaxUnavailable util.IntOrString
|
||||||
|
// The maximum number of pods that can be scheduled above the original
|
||||||
|
// number of pods. Value can be an absolute number (ex: 5) or a percentage of total
|
||||||
|
// pods at the start of the update (ex: 10%). This can not be 0 if
|
||||||
|
// MaxUnavailable is 0. Absolute number is calculated from percentage by
|
||||||
|
// rounding up. By default, a value of 1 is used. Example: when this is set
|
||||||
|
// to 30%, the new RC can be scaled up by 30% immediately when the rolling
|
||||||
|
// update starts. Once old pods have been killed, new RC can be scaled up
|
||||||
|
// further, ensuring that total number of pods running at any time during
|
||||||
|
// the update is atmost 130% of original pods.
|
||||||
|
MaxSurge util.IntOrString
|
||||||
}
|
}
|
||||||
|
|
||||||
// RollingUpdaterCleanupPolicy is a cleanup action to take after the
|
// RollingUpdaterCleanupPolicy is a cleanup action to take after the
|
||||||
@ -93,27 +96,421 @@ const (
|
|||||||
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
|
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
|
||||||
)
|
)
|
||||||
|
|
||||||
// UpdateAcceptor is given a chance to accept or reject the new controller
|
// RollingUpdater provides methods for updating replicated pods in a predictable,
|
||||||
// during a deployment each time the controller is scaled up.
|
// fault-tolerant way.
|
||||||
//
|
type RollingUpdater struct {
|
||||||
// After the successful scale-up of the controller, the controller is given to
|
// Client interface for creating and updating controllers
|
||||||
// the UpdateAcceptor. If the UpdateAcceptor rejects the controller, the
|
c client.Interface
|
||||||
// deployment is stopped with an error.
|
// Namespace for resources
|
||||||
type UpdateAcceptor interface {
|
ns string
|
||||||
// Accept returns nil if the controller is okay, otherwise returns an error.
|
// scaleAndWait scales a controller and returns its updated state.
|
||||||
Accept(*api.ReplicationController) error
|
scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error)
|
||||||
|
//getOrCreateTargetController gets and validates an existing controller or
|
||||||
|
//makes a new one.
|
||||||
|
getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error)
|
||||||
|
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
|
||||||
|
cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
|
||||||
|
// waitForReadyPods should block until there are >0 total pods ready amongst
|
||||||
|
// the old and new controllers, and should return the amount of old and new
|
||||||
|
// ready.
|
||||||
|
waitForReadyPods func(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AlwaysAccept is an UpdateAcceptor which always accepts the controller.
|
// NewRollingUpdater creates a RollingUpdater from a client.
|
||||||
type AlwaysAccept struct{}
|
func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdater {
|
||||||
|
updater := &RollingUpdater{
|
||||||
|
c: client,
|
||||||
|
ns: namespace,
|
||||||
|
}
|
||||||
|
// Inject real implementations.
|
||||||
|
updater.scaleAndWait = updater.scaleAndWaitWithScaler
|
||||||
|
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
|
||||||
|
updater.waitForReadyPods = updater.pollForReadyPods
|
||||||
|
updater.cleanup = updater.cleanupWithClients
|
||||||
|
return updater
|
||||||
|
}
|
||||||
|
|
||||||
// Accept implements UpdateAcceptor.
|
// Update all pods for a ReplicationController (oldRc) by creating a new
|
||||||
func (a *AlwaysAccept) Accept(*api.ReplicationController) error { return nil }
|
// controller (newRc) with 0 replicas, and synchronously scaling oldRc and
|
||||||
|
// newRc until oldRc has 0 replicas and newRc has the original # of desired
|
||||||
|
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
|
||||||
|
//
|
||||||
|
// Each interval, the updater will attempt to make progress however it can
|
||||||
|
// without violating any availability constraints defined by the config. This
|
||||||
|
// means the amount scaled up or down each interval will vary based on the
|
||||||
|
// timeliness of readiness and the updater will always try to make progress,
|
||||||
|
// even slowly.
|
||||||
|
//
|
||||||
|
// If an update from newRc to oldRc is already in progress, we attempt to
|
||||||
|
// drive it to completion. If an error occurs at any step of the update, the
|
||||||
|
// error will be returned.
|
||||||
|
//
|
||||||
|
// A scaling event (either up or down) is considered progress; if no progress
|
||||||
|
// is made within the config.Timeout, an error is returned.
|
||||||
|
//
|
||||||
|
// TODO: make this handle performing a rollback of a partially completed
|
||||||
|
// rollout.
|
||||||
|
func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
|
||||||
|
out := config.Out
|
||||||
|
oldRc := config.OldRc
|
||||||
|
scaleRetryParams := NewRetryParams(config.Interval, config.Timeout)
|
||||||
|
|
||||||
// DefaultUpdaterAcceptor always accepts controllers.
|
// Find an existing controller (for continuing an interrupted update) or
|
||||||
var DefaultUpdateAcceptor UpdateAcceptor = &AlwaysAccept{}
|
// create a new one if necessary.
|
||||||
|
sourceId := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID)
|
||||||
|
newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if existed {
|
||||||
|
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name)
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(out, "Created %s\n", newRc.Name)
|
||||||
|
}
|
||||||
|
// Extract the desired replica count from the controller.
|
||||||
|
desired, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
|
||||||
|
newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation])
|
||||||
|
}
|
||||||
|
// Extract the original replica count from the old controller, adding the
|
||||||
|
// annotation if it doesn't yet exist.
|
||||||
|
_, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation]
|
||||||
|
if !hasOriginalAnnotation {
|
||||||
|
existing, err := r.c.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if existing.Annotations == nil {
|
||||||
|
existing.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(existing.Spec.Replicas)
|
||||||
|
updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oldRc = updated
|
||||||
|
}
|
||||||
|
original, err := strconv.Atoi(oldRc.Annotations[originalReplicasAnnotation])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to parse annotation for %s: %s=%s\n",
|
||||||
|
oldRc.Name, originalReplicasAnnotation, oldRc.Annotations[originalReplicasAnnotation])
|
||||||
|
}
|
||||||
|
// The maximum pods which can go unavailable during the update.
|
||||||
|
maxUnavailable, err := extractMaxValue(config.MaxUnavailable, "maxUnavailable", original)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// The maximum scaling increment.
|
||||||
|
maxSurge, err := extractMaxValue(config.MaxSurge, "maxSurge", original)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Further validation.
|
||||||
|
if maxUnavailable == 0 && maxSurge == 0 {
|
||||||
|
return fmt.Errorf("one of maxSurge or maxUnavailable must be specified")
|
||||||
|
}
|
||||||
|
// The minumum pods which must remain available througout the update
|
||||||
|
// calculated for internal convenience.
|
||||||
|
minAvailable := original - maxUnavailable
|
||||||
|
|
||||||
func LoadExistingNextReplicationController(c *client.Client, namespace, newName string) (*api.ReplicationController, error) {
|
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
|
||||||
|
newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, minAvailable, original+maxSurge)
|
||||||
|
|
||||||
|
// Scale newRc and oldRc until newRc has the desired number of replicas and
|
||||||
|
// oldRc has 0 replicas.
|
||||||
|
progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
|
||||||
|
for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 {
|
||||||
|
// Store the existing replica counts for progress timeout tracking.
|
||||||
|
newReplicas := newRc.Spec.Replicas
|
||||||
|
oldReplicas := oldRc.Spec.Replicas
|
||||||
|
|
||||||
|
// Scale up as much as possible.
|
||||||
|
scaledRc, err := r.scaleUp(newRc, oldRc, original, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
newRc = scaledRc
|
||||||
|
|
||||||
|
// Wait between scaling operations for things to settle.
|
||||||
|
time.Sleep(config.UpdatePeriod)
|
||||||
|
|
||||||
|
// Scale down as much as possible.
|
||||||
|
scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oldRc = scaledRc
|
||||||
|
|
||||||
|
// If we are making progress, continue to advance the progress deadline.
|
||||||
|
// Otherwise, time out with an error.
|
||||||
|
progressMade := (newRc.Spec.Replicas != newReplicas) || (oldRc.Spec.Replicas != oldReplicas)
|
||||||
|
if progressMade {
|
||||||
|
progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
|
||||||
|
} else if time.Now().UnixNano() > progressDeadline {
|
||||||
|
return fmt.Errorf("timed out waiting for any update progress to be made")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Housekeeping and cleanup policy execution.
|
||||||
|
return r.cleanup(oldRc, newRc, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// scaleUp scales up newRc to desired by whatever increment is possible given
|
||||||
|
// the configured surge threshold. scaleUp will safely no-op as necessary when
|
||||||
|
// it detects redundancy or other relevant conditions.
|
||||||
|
func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, original, desired, maxSurge, maxUnavailable int, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
|
||||||
|
// If we're already at the desired, do nothing.
|
||||||
|
if newRc.Spec.Replicas == desired {
|
||||||
|
return newRc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scale up as far as we can based on the surge limit.
|
||||||
|
increment := (original + maxSurge) - (oldRc.Spec.Replicas + newRc.Spec.Replicas)
|
||||||
|
// If the old is already scaled down, go ahead and scale all the way up.
|
||||||
|
if oldRc.Spec.Replicas == 0 {
|
||||||
|
increment = desired - newRc.Spec.Replicas
|
||||||
|
}
|
||||||
|
// We can't scale up without violating the surge limit, so do nothing.
|
||||||
|
if increment <= 0 {
|
||||||
|
return newRc, nil
|
||||||
|
}
|
||||||
|
// Increase the replica count, and deal with fenceposts.
|
||||||
|
newRc.Spec.Replicas += increment
|
||||||
|
if newRc.Spec.Replicas > desired {
|
||||||
|
newRc.Spec.Replicas = desired
|
||||||
|
}
|
||||||
|
// Perform the scale-up.
|
||||||
|
fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas)
|
||||||
|
scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return scaledRc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// scaleDown scales down oldRc to 0 at whatever increment possible given the
|
||||||
|
// thresholds defined on the config. scaleDown will safely no-op as necessary
|
||||||
|
// when it detects redundancy or other relevant conditions.
|
||||||
|
func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
|
||||||
|
// Already scaled down; do nothing.
|
||||||
|
if oldRc.Spec.Replicas == 0 {
|
||||||
|
return oldRc, nil
|
||||||
|
}
|
||||||
|
// Block until there are any pods ready.
|
||||||
|
oldAvailable, newAvailable, err := r.waitForReadyPods(config.Interval, config.Timeout, oldRc, newRc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// The old controller is considered as part of the total because we want to
|
||||||
|
// maintain minimum availability even with a volatile old controller.
|
||||||
|
// Scale down as much as possible while maintaining minimum availability.
|
||||||
|
decrement := (oldAvailable + newAvailable) - minAvailable
|
||||||
|
// The decrement normally shouldn't drop below 0 because the available count
|
||||||
|
// always starts below the old replica count, but the old replica count can
|
||||||
|
// decrement due to externalities like pods death in the replica set. This
|
||||||
|
// will be considered a transient condition; do nothing and try again later
|
||||||
|
// with new readiness values.
|
||||||
|
//
|
||||||
|
// If the most we can scale is 0, it means we can't scale down without
|
||||||
|
// violating the minimum. Do nothing and try again later when conditions may
|
||||||
|
// have changed.
|
||||||
|
if decrement <= 0 {
|
||||||
|
return oldRc, nil
|
||||||
|
}
|
||||||
|
// Reduce the replica count, and deal with fenceposts.
|
||||||
|
oldRc.Spec.Replicas -= decrement
|
||||||
|
if oldRc.Spec.Replicas < 0 {
|
||||||
|
oldRc.Spec.Replicas = 0
|
||||||
|
}
|
||||||
|
// If the new is already fully scaled and available up to the desired size, go
|
||||||
|
// ahead and scale old all the way down.
|
||||||
|
if newRc.Spec.Replicas == desired && newAvailable == desired {
|
||||||
|
oldRc.Spec.Replicas = 0
|
||||||
|
}
|
||||||
|
// Perform the scale-down.
|
||||||
|
fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas)
|
||||||
|
retryWait := &RetryParams{config.Interval, config.Timeout}
|
||||||
|
scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return scaledRc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// scalerScaleAndWait scales a controller using a Scaler and a real client.
|
||||||
|
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
|
||||||
|
scalerClient := NewScalerClient(r.c)
|
||||||
|
scaler, err := ScalerFor("ReplicationController", scalerClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Couldn't make scaler: %s", err)
|
||||||
|
}
|
||||||
|
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pollForReadyPods polls oldRc and newRc each interval and returns the old
|
||||||
|
// and new ready counts for their pods. If a pod is observed as being ready,
|
||||||
|
// it's considered ready even if it later becomes unready.
|
||||||
|
func (r *RollingUpdater) pollForReadyPods(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) {
|
||||||
|
controllers := []*api.ReplicationController{oldRc, newRc}
|
||||||
|
oldReady := 0
|
||||||
|
newReady := 0
|
||||||
|
err := wait.Poll(interval, timeout, func() (done bool, err error) {
|
||||||
|
anyReady := false
|
||||||
|
for _, controller := range controllers {
|
||||||
|
selector := labels.Set(controller.Spec.Selector).AsSelector()
|
||||||
|
pods, err := r.c.Pods(controller.Namespace).List(selector, fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if api.IsPodReady(&pod) {
|
||||||
|
switch controller.Name {
|
||||||
|
case oldRc.Name:
|
||||||
|
oldReady++
|
||||||
|
case newRc.Name:
|
||||||
|
newReady++
|
||||||
|
}
|
||||||
|
anyReady = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if anyReady {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
return oldReady, newReady, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOrCreateTargetControllerWithClient looks for an existing controller with
|
||||||
|
// sourceId. If found, the existing controller is returned with true
|
||||||
|
// indicating that the controller already exists. If the controller isn't
|
||||||
|
// found, a new one is created and returned along with false indicating the
|
||||||
|
// controller was created.
|
||||||
|
//
|
||||||
|
// Existing controllers are validated to ensure their sourceIdAnnotation
|
||||||
|
// matches sourceId; if there's a mismatch, an error is returned.
|
||||||
|
func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
|
||||||
|
existing, err := r.c.ReplicationControllers(controller.Namespace).Get(controller.Name)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
// There was an error trying to find the controller; don't assume we
|
||||||
|
// should create it.
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
if controller.Spec.Replicas <= 0 {
|
||||||
|
return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", controller.Name, controller.Spec)
|
||||||
|
}
|
||||||
|
// The controller wasn't found, so create it.
|
||||||
|
if controller.Annotations == nil {
|
||||||
|
controller.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", controller.Spec.Replicas)
|
||||||
|
controller.Annotations[sourceIdAnnotation] = sourceId
|
||||||
|
controller.Spec.Replicas = 0
|
||||||
|
newRc, err := r.c.ReplicationControllers(r.ns).Create(controller)
|
||||||
|
return newRc, false, err
|
||||||
|
}
|
||||||
|
// Validate and use the existing controller.
|
||||||
|
annotations := existing.Annotations
|
||||||
|
source := annotations[sourceIdAnnotation]
|
||||||
|
_, ok := annotations[desiredReplicasAnnotation]
|
||||||
|
if source != sourceId || !ok {
|
||||||
|
return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceId, annotations)
|
||||||
|
}
|
||||||
|
return existing, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupWithClients performs cleanup tasks after the rolling update. Update
|
||||||
|
// process related annotations are removed from oldRc and newRc. The
|
||||||
|
// CleanupPolicy on config is executed.
|
||||||
|
func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
|
||||||
|
// Clean up annotations
|
||||||
|
var err error
|
||||||
|
newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
delete(newRc.Annotations, sourceIdAnnotation)
|
||||||
|
delete(newRc.Annotations, desiredReplicasAnnotation)
|
||||||
|
|
||||||
|
newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
scalerClient := NewScalerClient(r.c)
|
||||||
|
if err = wait.Poll(config.Interval, config.Timeout, scalerClient.ControllerHasDesiredReplicas(newRc)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch config.CleanupPolicy {
|
||||||
|
case DeleteRollingUpdateCleanupPolicy:
|
||||||
|
// delete old rc
|
||||||
|
fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name)
|
||||||
|
return r.c.ReplicationControllers(r.ns).Delete(oldRc.Name)
|
||||||
|
case RenameRollingUpdateCleanupPolicy:
|
||||||
|
// delete old rc
|
||||||
|
fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name)
|
||||||
|
if err := r.c.ReplicationControllers(r.ns).Delete(oldRc.Name); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Fprintf(config.Out, "Renaming %s to %s\n", newRc.Name, oldRc.Name)
|
||||||
|
return Rename(r.c, newRc, oldRc.Name)
|
||||||
|
case PreserveRollingUpdateCleanupPolicy:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// func extractMaxValue is a helper to extract config max values as either
|
||||||
|
// absolute numbers or based on percentages of the original rc.
|
||||||
|
func extractMaxValue(field util.IntOrString, name string, original int) (int, error) {
|
||||||
|
switch field.Kind {
|
||||||
|
case util.IntstrInt:
|
||||||
|
if field.IntVal < 0 {
|
||||||
|
return 0, fmt.Errorf("%s must be >= 0", name)
|
||||||
|
}
|
||||||
|
return field.IntVal, nil
|
||||||
|
case util.IntstrString:
|
||||||
|
s := strings.Replace(field.StrVal, "%", "", -1)
|
||||||
|
v, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid %s value %q: %v", name, field.StrVal, err)
|
||||||
|
}
|
||||||
|
if v < 0 {
|
||||||
|
return 0, fmt.Errorf("%s must be >= 0", name)
|
||||||
|
}
|
||||||
|
return int(math.Ceil(float64(original) * (float64(v)) / 100)), nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("invalid kind %q for %s", field.Kind, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Rename(c client.ReplicationControllersNamespacer, rc *api.ReplicationController, newName string) error {
|
||||||
|
oldName := rc.Name
|
||||||
|
rc.Name = newName
|
||||||
|
rc.ResourceVersion = ""
|
||||||
|
|
||||||
|
_, err := c.ReplicationControllers(rc.Namespace).Create(rc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = c.ReplicationControllers(rc.Namespace).Delete(oldName)
|
||||||
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadExistingNextReplicationController(c client.ReplicationControllersNamespacer, namespace, newName string) (*api.ReplicationController, error) {
|
||||||
if len(newName) == 0 {
|
if len(newName) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -157,22 +554,6 @@ func CreateNewControllerFromCurrentController(c *client.Client, namespace, oldNa
|
|||||||
return newRc, nil
|
return newRc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRollingUpdater creates a RollingUpdater from a client
|
|
||||||
func NewRollingUpdater(namespace string, client RollingUpdaterClient) *RollingUpdater {
|
|
||||||
return &RollingUpdater{
|
|
||||||
c: client,
|
|
||||||
ns: namespace,
|
|
||||||
// Use a real scaleAndWait implementation.
|
|
||||||
scaleAndWait: scalerScaleAndWait(client, namespace),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
|
|
||||||
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
|
|
||||||
nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
|
|
||||||
)
|
|
||||||
|
|
||||||
func AbortRollingUpdate(c *RollingUpdaterConfig) {
|
func AbortRollingUpdate(c *RollingUpdaterConfig) {
|
||||||
// Swap the controllers
|
// Swap the controllers
|
||||||
tmp := c.OldRc
|
tmp := c.OldRc
|
||||||
@ -214,21 +595,6 @@ func UpdateExistingReplicationController(c client.Interface, oldRc *api.Replicat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// scalerScaleAndWait returns a scaleAndWait function which scales a
|
|
||||||
// controller using a Scaler and a real client.
|
|
||||||
func scalerScaleAndWait(client RollingUpdaterClient, namespace string) scaleAndWait {
|
|
||||||
scaler, err := ScalerFor("ReplicationController", client)
|
|
||||||
return func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Couldn't make scaler: %s", err)
|
|
||||||
}
|
|
||||||
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return client.GetReplicationController(namespace, rc.ObjectMeta.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const MaxRetries = 3
|
const MaxRetries = 3
|
||||||
|
|
||||||
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
||||||
@ -335,8 +701,8 @@ func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.R
|
|||||||
return rc, err
|
return rc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.ReplicationController, error) {
|
func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
|
||||||
list, err := r.ListReplicationControllers(namespace, labels.Everything())
|
list, err := r.ReplicationControllers(namespace).List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -348,272 +714,3 @@ func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.
|
|||||||
}
|
}
|
||||||
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
|
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update all pods for a ReplicationController (oldRc) by creating a new
|
|
||||||
// controller (newRc) with 0 replicas, and synchronously scaling oldRc and
|
|
||||||
// newRc until oldRc has 0 replicas and newRc has the original # of desired
|
|
||||||
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
|
|
||||||
//
|
|
||||||
// The scaling amount each interval is either 1 or based on a percent of the
|
|
||||||
// desired replicas. If a percentage is used and the percentage is negative,
|
|
||||||
// the scaling order is inverted to down/up instead of the default up/down.
|
|
||||||
//
|
|
||||||
// If an update from newRc to oldRc is already in progress, we attempt to
|
|
||||||
// drive it to completion. If an error occurs at any step of the update, the
|
|
||||||
// error will be returned.
|
|
||||||
//
|
|
||||||
// TODO: make this handle performing a rollback of a partially completed
|
|
||||||
// rollout.
|
|
||||||
func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
|
|
||||||
out := config.Out
|
|
||||||
oldRc := config.OldRc
|
|
||||||
newRc := config.NewRc
|
|
||||||
updatePeriod := config.UpdatePeriod
|
|
||||||
interval := config.Interval
|
|
||||||
timeout := config.Timeout
|
|
||||||
|
|
||||||
oldName := oldRc.ObjectMeta.Name
|
|
||||||
newName := newRc.ObjectMeta.Name
|
|
||||||
retry := &RetryParams{interval, timeout}
|
|
||||||
waitForReplicas := &RetryParams{interval, timeout}
|
|
||||||
if newRc.Spec.Replicas <= 0 {
|
|
||||||
return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", newName, newRc.Spec.Replicas)
|
|
||||||
}
|
|
||||||
desired := newRc.Spec.Replicas
|
|
||||||
sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)
|
|
||||||
|
|
||||||
// look for existing newRc, incase this update was previously started but interrupted
|
|
||||||
rc, existing, err := r.getExistingNewRc(sourceId, newName)
|
|
||||||
if existing {
|
|
||||||
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
|
|
||||||
desired, err = strconv.Atoi(replicas)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
|
|
||||||
newName, desiredReplicasAnnotation, replicas)
|
|
||||||
}
|
|
||||||
newRc = rc
|
|
||||||
} else {
|
|
||||||
fmt.Fprintf(out, "Creating %s\n", newName)
|
|
||||||
if newRc.ObjectMeta.Annotations == nil {
|
|
||||||
newRc.ObjectMeta.Annotations = map[string]string{}
|
|
||||||
}
|
|
||||||
newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)
|
|
||||||
newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
|
|
||||||
newRc.Spec.Replicas = 0
|
|
||||||
newRc, err = r.c.CreateReplicationController(r.ns, newRc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute the scale amount based on a percentage of the new desired count.
|
|
||||||
// A negative percentage indicates our scale direction should be down-first.
|
|
||||||
scaleAmount := 1
|
|
||||||
skipFirstUp := false
|
|
||||||
if config.UpdatePercent != nil {
|
|
||||||
scaleAmount = int(math.Ceil(float64(desired) * (math.Abs(float64(*config.UpdatePercent)) / 100)))
|
|
||||||
if *config.UpdatePercent < 0 {
|
|
||||||
skipFirstUp = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Helpful output about what we're about to do.
|
|
||||||
direction := "up"
|
|
||||||
if skipFirstUp {
|
|
||||||
direction = "down"
|
|
||||||
}
|
|
||||||
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (scale %s first by %d each interval)\n",
|
|
||||||
newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, direction, scaleAmount)
|
|
||||||
|
|
||||||
// Scale newRc and oldRc until newRc has the desired number of replicas and
|
|
||||||
// oldRc has 0 replicas.
|
|
||||||
for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 {
|
|
||||||
// Choose up/down vs. down/up scaling direction.
|
|
||||||
if !skipFirstUp {
|
|
||||||
scaledRc, err := r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
newRc = scaledRc
|
|
||||||
time.Sleep(updatePeriod)
|
|
||||||
skipFirstUp = true
|
|
||||||
}
|
|
||||||
scaledRc, err := r.scaleDown(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rc = scaledRc
|
|
||||||
time.Sleep(updatePeriod)
|
|
||||||
scaledRc, err = r.scaleUp(newRc, oldRc, desired, scaleAmount, retry, waitForReplicas, out, config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
newRc = scaledRc
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clean up annotations
|
|
||||||
if newRc, err = r.c.GetReplicationController(r.ns, newName); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
|
|
||||||
delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
|
|
||||||
newRc, err = r.updateAndWait(newRc, interval, timeout)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch config.CleanupPolicy {
|
|
||||||
case DeleteRollingUpdateCleanupPolicy:
|
|
||||||
// delete old rc
|
|
||||||
fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
|
|
||||||
return r.c.DeleteReplicationController(r.ns, oldName)
|
|
||||||
case RenameRollingUpdateCleanupPolicy:
|
|
||||||
// delete old rc
|
|
||||||
fmt.Fprintf(out, "Update succeeded. Deleting old controller: %s\n", oldName)
|
|
||||||
if err := r.c.DeleteReplicationController(r.ns, oldName); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fmt.Fprintf(out, "Renaming %s to %s\n", newRc.Name, oldName)
|
|
||||||
return r.rename(newRc, oldName)
|
|
||||||
case PreserveRollingUpdateCleanupPolicy:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// scaleUp scales up newRc to desired by scaleAmount. It accounts for
|
|
||||||
// fencepost conditions. If newRc is already scaled to desired, scaleUp does
|
|
||||||
// nothing. If the oldRc is already scaled to 0, newRc is scaled to desired
|
|
||||||
// immediately regardless of scale count.
|
|
||||||
func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
|
|
||||||
if newRc.Spec.Replicas == desired {
|
|
||||||
return newRc, nil
|
|
||||||
}
|
|
||||||
newRc.Spec.Replicas += scaleAmount
|
|
||||||
if newRc.Spec.Replicas > desired || oldRc.Spec.Replicas == 0 {
|
|
||||||
newRc.Spec.Replicas = desired
|
|
||||||
}
|
|
||||||
fmt.Fprintf(out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas)
|
|
||||||
scaledRc, err := r.scaleAndWait(newRc, retry, wait)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = config.UpdateAcceptor.Accept(scaledRc)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("update rejected for %s: %v", scaledRc.Name, err)
|
|
||||||
}
|
|
||||||
return scaledRc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// scaleDown scales down oldRc to 0 by scaleAmount. It accounts for fencepost
|
|
||||||
// conditions. If oldRc is already scaled to 0, scaleDown does nothing. If
|
|
||||||
// newRc is already scaled to desired, oldRc is scaled to 0 immediately
|
|
||||||
// regardless of scaleAmount.
|
|
||||||
func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, scaleAmount int, retry, wait *RetryParams, out io.Writer, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
|
|
||||||
if oldRc.Spec.Replicas == 0 {
|
|
||||||
return oldRc, nil
|
|
||||||
}
|
|
||||||
oldRc.Spec.Replicas -= scaleAmount
|
|
||||||
if oldRc.Spec.Replicas < 0 || newRc.Spec.Replicas == desired {
|
|
||||||
oldRc.Spec.Replicas = 0
|
|
||||||
}
|
|
||||||
fmt.Fprintf(out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas)
|
|
||||||
scaledRc, err := r.scaleAndWait(oldRc, retry, wait)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return scaledRc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) {
|
|
||||||
if rc, err = r.c.GetReplicationController(r.ns, name); err == nil {
|
|
||||||
existing = true
|
|
||||||
annotations := rc.ObjectMeta.Annotations
|
|
||||||
source := annotations[sourceIdAnnotation]
|
|
||||||
_, ok := annotations[desiredReplicasAnnotation]
|
|
||||||
if source != sourceId || !ok {
|
|
||||||
err = fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", name, sourceId, annotations)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = nil
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) {
|
|
||||||
rc, err := r.c.UpdateReplicationController(r.ns, rc)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err = wait.Poll(interval, timeout, r.c.ControllerHasDesiredReplicas(rc)); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return r.c.GetReplicationController(r.ns, rc.ObjectMeta.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RollingUpdater) rename(rc *api.ReplicationController, newName string) error {
|
|
||||||
return Rename(r.c, rc, newName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Rename(c RollingUpdaterClient, rc *api.ReplicationController, newName string) error {
|
|
||||||
oldName := rc.Name
|
|
||||||
rc.Name = newName
|
|
||||||
rc.ResourceVersion = ""
|
|
||||||
|
|
||||||
_, err := c.CreateReplicationController(rc.Namespace, rc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = c.DeleteReplicationController(rc.Namespace, oldName)
|
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RollingUpdaterClient abstracts access to ReplicationControllers.
|
|
||||||
type RollingUpdaterClient interface {
|
|
||||||
ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error)
|
|
||||||
GetReplicationController(namespace, name string) (*api.ReplicationController, error)
|
|
||||||
UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
|
||||||
CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
|
|
||||||
DeleteReplicationController(namespace, name string) error
|
|
||||||
ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRollingUpdaterClient(c client.Interface) RollingUpdaterClient {
|
|
||||||
return &realRollingUpdaterClient{c}
|
|
||||||
}
|
|
||||||
|
|
||||||
// realRollingUpdaterClient is a RollingUpdaterClient which uses a Kube client.
|
|
||||||
type realRollingUpdaterClient struct {
|
|
||||||
client client.Interface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
|
|
||||||
return c.client.ReplicationControllers(namespace).List(selector)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
|
|
||||||
return c.client.ReplicationControllers(namespace).Get(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
|
|
||||||
return c.client.ReplicationControllers(namespace).Update(rc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
|
|
||||||
return c.client.ReplicationControllers(namespace).Create(rc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) DeleteReplicationController(namespace, name string) error {
|
|
||||||
return c.client.ReplicationControllers(namespace).Delete(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *realRollingUpdaterClient) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc {
|
|
||||||
return client.ControllerHasDesiredReplicas(c.client, rc)
|
|
||||||
}
|
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -90,7 +90,6 @@ var _ = Describe("Kubectl client", func() {
|
|||||||
nautilusPath = filepath.Join(updateDemoRoot, "nautilus-rc.yaml")
|
nautilusPath = filepath.Join(updateDemoRoot, "nautilus-rc.yaml")
|
||||||
kittenPath = filepath.Join(updateDemoRoot, "kitten-rc.yaml")
|
kittenPath = filepath.Join(updateDemoRoot, "kitten-rc.yaml")
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should create and stop a replication controller", func() {
|
It("should create and stop a replication controller", func() {
|
||||||
defer cleanup(nautilusPath, ns, updateDemoSelector)
|
defer cleanup(nautilusPath, ns, updateDemoSelector)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user