diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index 62dcdf6428e..cd913de61ac 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" @@ -72,6 +73,17 @@ func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions. } } +func PetSetHasDesiredPets(c AppsInterface, petset *apps.PetSet) wait.ConditionFunc { + // TODO: Differentiate between 0 pets and a really quick scale down using generation. + return func() (bool, error) { + ps, err := c.PetSets(petset.Namespace).Get(petset.Name) + if err != nil { + return false, err + } + return ps.Status.Replicas == ps.Spec.Replicas, nil + } +} + // JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count // for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. func JobHasDesiredParallelism(c BatchInterface, job *batch.Job) wait.ConditionFunc { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 4226ef5d4fc..a5aca04f000 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -51,6 +52,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { return &ReplicaSetScaler{c.Extensions()}, nil case extensions.Kind("Job"), batch.Kind("Job"): return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface. + case apps.Kind("PetSet"): + return &PetSetScaler{c.Apps()}, nil case extensions.Kind("Deployment"): return &DeploymentScaler{c.Extensions()}, nil } @@ -126,6 +129,17 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s } } +// ValidatePetSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise. +func (precondition *ScalePrecondition) ValidatePetSet(ps *apps.PetSet) error { + if precondition.Size != -1 && int(ps.Spec.Replicas) != precondition.Size { + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(ps.Spec.Replicas))} + } + if len(precondition.ResourceVersion) != 0 && ps.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, ps.ResourceVersion} + } + return nil +} + // ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error { if precondition.Size != -1 && int(controller.Spec.Replicas) != precondition.Size { @@ -276,6 +290,56 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error { return nil } +type PetSetScaler struct { + c client.AppsInterface +} + +func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + ps, err := scaler.c.PetSets(namespace).Get(name) + if err != nil { + return ScaleError{ScaleGetFailure, "Unknown", err} + } + if preconditions != nil { + if err := preconditions.ValidatePetSet(ps); err != nil { + return err + } + } + ps.Spec.Replicas = int(newSize) + if _, err := scaler.c.PetSets(namespace).Update(ps); err != nil { + if errors.IsConflict(err) { + return ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err} + } + return ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err} + } + return nil +} + +func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + job, err := scaler.c.PetSets(namespace).Get(name) + if err != nil { + return err + } + err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.PetSetHasDesiredPets(scaler.c, job)) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", name) + } + return err + } + return nil +} + type JobScaler struct { c client.BatchInterface } diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index ae728c3fbde..ba13edc6c91 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -82,6 +83,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) { case extensions.Kind("Job"), batch.Kind("Job"): return &JobReaper{c, Interval, Timeout}, nil + case apps.Kind("PetSet"): + return &PetSetReaper{c, Interval, Timeout}, nil + case extensions.Kind("Deployment"): return &DeploymentReaper{c, Interval, Timeout}, nil @@ -119,6 +123,10 @@ type PodReaper struct { type ServiceReaper struct { client.Interface } +type PetSetReaper struct { + client.Interface + pollInterval, timeout time.Duration +} type objInterface interface { Delete(name string) error @@ -307,6 +315,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio if err != nil { return false, nil } + return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil }); err != nil { return err @@ -315,6 +324,53 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio return reaper.Extensions().DaemonSets(namespace).Delete(name) } +func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { + petsets := reaper.Apps().PetSets(namespace) + scaler, err := ScalerFor(apps.Kind("PetSet"), *reaper) + if err != nil { + return err + } + ps, err := petsets.Get(name) + if err != nil { + return err + } + if timeout == 0 { + numPets := ps.Spec.Replicas + timeout = Timeout + time.Duration(10*numPets)*time.Second + } + retry := NewRetryParams(reaper.pollInterval, reaper.timeout) + waitForPetSet := NewRetryParams(reaper.pollInterval, reaper.timeout) + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForPetSet); err != nil { + return err + } + + // TODO: This shouldn't be needed, see corresponding TODO in PetSetHasDesiredPets. + // PetSet should track generation number. + pods := reaper.Pods(namespace) + selector, _ := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + options := api.ListOptions{LabelSelector: selector} + podList, err := pods.List(options) + if err != nil { + return err + } + + errList := []error{} + for _, pod := range podList.Items { + if err := pods.Delete(pod.Name, gracePeriod); err != nil { + if !errors.IsNotFound(err) { + errList = append(errList, err) + } + } + } + if len(errList) > 0 { + return utilerrors.NewAggregate(errList) + } + + // TODO: Cleanup volumes? We don't want to accidentaly delete volumes from + // stop, so just leave this up to the the petset. + return petsets.Delete(name, nil) +} + func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { jobs := reaper.Batch().Jobs(namespace) pods := reaper.Pods(namespace)