diff --git a/hack/make-rules/test-cmd.sh b/hack/make-rules/test-cmd.sh index 54eb0854d58..ac8c203823b 100755 --- a/hack/make-rules/test-cmd.sh +++ b/hack/make-rules/test-cmd.sh @@ -125,6 +125,25 @@ function kubectl-with-retry() done } +# Waits for the pods with the given label to match the list of names. Don't call +# this function unless you know the exact pod names, or expect no pods. +# $1: label to match +# $2: list of pod names sorted by name +# Example invocation: +# wait-for-pods-with-label "app=foo" "nginx-0nginx-1" +function wait-for-pods-with-label() +{ + for i in $(seq 1 10); do + kubeout=`kubectl get po -l $1 --template '{{range.items}}{{.metadata.name}}{{end}}' --sort-by metadata.name "${kube_flags[@]}"` + if [[ $kubeout = $2 ]]; then + return + fi + echo Waiting for pods: $2, found $kubeout + sleep $i + done + kube::log::error_exit "Timeout waiting for pods with label $1" +} + kube::util::trap_add cleanup EXIT SIGINT kube::util::ensure-temp-dir @@ -311,6 +330,7 @@ runTests() { hpa_min_field=".spec.minReplicas" hpa_max_field=".spec.maxReplicas" hpa_cpu_field=".spec.targetCPUUtilizationPercentage" + petset_replicas_field=".spec.replicas" job_parallelism_field=".spec.parallelism" deployment_replicas=".spec.replicas" secret_data=".data" @@ -2100,6 +2120,37 @@ __EOF__ kubectl delete rs frontend "${kube_flags[@]}" + + ############ + # Pet Sets # + ############ + + kube::log::status "Testing kubectl(${version}:petsets)" + + ### Create and stop petset, make sure it doesn't leak pods + # Pre-condition: no petset exists + kube::test::get_object_assert petset "{{range.items}}{{$id_field}}:{{end}}" '' + # Command: create petset + kubectl create -f hack/testdata/nginx-petset.yaml "${kube_flags[@]}" + + ### Scale petset test with current-replicas and replicas + # Pre-condition: 0 replicas + kube::test::get_object_assert 'petset nginx' "{{$petset_replicas_field}}" '0' + # Command: Scale up + kubectl scale --current-replicas=0 --replicas=1 petset nginx "${kube_flags[@]}" + # Post-condition: 1 replica, named nginx-0 + kube::test::get_object_assert 'petset nginx' "{{$petset_replicas_field}}" '1' + # Typically we'd wait and confirm that N>1 replicas are up, but this framework + # doesn't start the scheduler, so pet-0 will block all others. + # TODO: test robust scaling in an e2e. + wait-for-pods-with-label "app=nginx-petset" "nginx-0" + + ### Clean up + kubectl delete -f hack/testdata/nginx-petset.yaml "${kube_flags[@]}" + # Post-condition: no pods from petset controller + wait-for-pods-with-label "app=nginx-petset" "" + + ###################### # Lists # ###################### @@ -2410,7 +2461,7 @@ __EOF__ exit 1 fi rm "${SAR_RESULT_FILE}" - + ##################### # Retrieve multiple # diff --git a/hack/testdata/nginx-petset.yaml b/hack/testdata/nginx-petset.yaml new file mode 100644 index 00000000000..30b4b6dca07 --- /dev/null +++ b/hack/testdata/nginx-petset.yaml @@ -0,0 +1,44 @@ +# A headless service to create DNS records +apiVersion: v1 +kind: Service +metadata: + annotations: + service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" + name: nginx + labels: + app: nginx-petset +spec: + ports: + - port: 80 + name: web + # *.nginx.default.svc.cluster.local + clusterIP: None + selector: + app: nginx-petset +--- +apiVersion: apps/v1alpha1 +kind: PetSet +metadata: + name: nginx +spec: + serviceName: "nginx" + replicas: 0 + template: + metadata: + labels: + app: nginx-petset + annotations: + pod.alpha.kubernetes.io/initialized: "true" + spec: + terminationGracePeriodSeconds: 0 + containers: + - name: nginx + image: gcr.io/google_containers/nginx-slim:0.7 + ports: + - containerPort: 80 + name: web + command: + - sh + - -c + - 'while true; do sleep 1; done' + diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 9abbb92dc42..f44b67dd229 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -2059,7 +2059,7 @@ func ValidatePodSecurityContext(securityContext *api.PodSecurityContext, spec *a return allErrs } -func validateContainerUpdates(newContainers, oldContainers []api.Container, fldPath *field.Path) (allErrs field.ErrorList, stop bool) { +func ValidateContainerUpdates(newContainers, oldContainers []api.Container, fldPath *field.Path) (allErrs field.ErrorList, stop bool) { allErrs = field.ErrorList{} if len(newContainers) != len(oldContainers) { //TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff @@ -2089,12 +2089,12 @@ func ValidatePodUpdate(newPod, oldPod *api.Pod) field.ErrorList { // 2. initContainers[*].image // 3. spec.activeDeadlineSeconds - containerErrs, stop := validateContainerUpdates(newPod.Spec.Containers, oldPod.Spec.Containers, specPath.Child("containers")) + containerErrs, stop := ValidateContainerUpdates(newPod.Spec.Containers, oldPod.Spec.Containers, specPath.Child("containers")) allErrs = append(allErrs, containerErrs...) if stop { return allErrs } - containerErrs, stop = validateContainerUpdates(newPod.Spec.InitContainers, oldPod.Spec.InitContainers, specPath.Child("initContainers")) + containerErrs, stop = ValidateContainerUpdates(newPod.Spec.InitContainers, oldPod.Spec.InitContainers, specPath.Child("initContainers")) allErrs = append(allErrs, containerErrs...) if stop { return allErrs diff --git a/pkg/apis/apps/validation/validation.go b/pkg/apis/apps/validation/validation.go index 5700cf8911a..ab4859f11a6 100644 --- a/pkg/apis/apps/validation/validation.go +++ b/pkg/apis/apps/validation/validation.go @@ -99,23 +99,26 @@ func ValidatePetSet(petSet *apps.PetSet) field.ErrorList { // ValidatePetSetUpdate tests if required fields in the PetSet are set. func ValidatePetSetUpdate(petSet, oldPetSet *apps.PetSet) field.ErrorList { - allErrs := field.ErrorList{} + allErrs := apivalidation.ValidateObjectMetaUpdate(&petSet.ObjectMeta, &oldPetSet.ObjectMeta, field.NewPath("metadata")) - // TODO: For now we're taking the safe route and disallowing all updates to spec except for Spec.Replicas. - // Enable on a case by case basis. + // TODO: For now we're taking the safe route and disallowing all updates to + // spec except for Replicas, for scaling, and Template.Spec.containers.image + // for rolling-update. Enable others on a case by case basis. restoreReplicas := petSet.Spec.Replicas petSet.Spec.Replicas = oldPetSet.Spec.Replicas - // The generation changes for this update - restoreGeneration := petSet.Generation - petSet.Generation = oldPetSet.Generation + restoreContainers := petSet.Spec.Template.Spec.Containers + petSet.Spec.Template.Spec.Containers = oldPetSet.Spec.Template.Spec.Containers - if !reflect.DeepEqual(petSet, oldPetSet) { + if !reflect.DeepEqual(petSet.Spec, oldPetSet.Spec) { allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to petset spec for fields other than 'replicas' are forbidden.")) } petSet.Spec.Replicas = restoreReplicas - petSet.Generation = restoreGeneration + petSet.Spec.Template.Spec.Containers = restoreContainers + allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(petSet.Spec.Replicas), field.NewPath("spec", "replicas"))...) + containerErrs, _ := apivalidation.ValidateContainerUpdates(petSet.Spec.Template.Spec.Containers, oldPetSet.Spec.Template.Spec.Containers, field.NewPath("spec").Child("template").Child("containers")) + allErrs = append(allErrs, containerErrs...) return allErrs } diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index 62dcdf6428e..cd913de61ac 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/util/wait" @@ -72,6 +73,17 @@ func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions. } } +func PetSetHasDesiredPets(c AppsInterface, petset *apps.PetSet) wait.ConditionFunc { + // TODO: Differentiate between 0 pets and a really quick scale down using generation. + return func() (bool, error) { + ps, err := c.PetSets(petset.Namespace).Get(petset.Name) + if err != nil { + return false, err + } + return ps.Status.Replicas == ps.Spec.Replicas, nil + } +} + // JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count // for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. func JobHasDesiredParallelism(c BatchInterface, job *batch.Job) wait.ConditionFunc { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 4226ef5d4fc..a5aca04f000 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -51,6 +52,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { return &ReplicaSetScaler{c.Extensions()}, nil case extensions.Kind("Job"), batch.Kind("Job"): return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface. + case apps.Kind("PetSet"): + return &PetSetScaler{c.Apps()}, nil case extensions.Kind("Deployment"): return &DeploymentScaler{c.Extensions()}, nil } @@ -126,6 +129,17 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s } } +// ValidatePetSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise. +func (precondition *ScalePrecondition) ValidatePetSet(ps *apps.PetSet) error { + if precondition.Size != -1 && int(ps.Spec.Replicas) != precondition.Size { + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(ps.Spec.Replicas))} + } + if len(precondition.ResourceVersion) != 0 && ps.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, ps.ResourceVersion} + } + return nil +} + // ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error { if precondition.Size != -1 && int(controller.Spec.Replicas) != precondition.Size { @@ -276,6 +290,56 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error { return nil } +type PetSetScaler struct { + c client.AppsInterface +} + +func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + ps, err := scaler.c.PetSets(namespace).Get(name) + if err != nil { + return ScaleError{ScaleGetFailure, "Unknown", err} + } + if preconditions != nil { + if err := preconditions.ValidatePetSet(ps); err != nil { + return err + } + } + ps.Spec.Replicas = int(newSize) + if _, err := scaler.c.PetSets(namespace).Update(ps); err != nil { + if errors.IsConflict(err) { + return ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err} + } + return ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err} + } + return nil +} + +func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + job, err := scaler.c.PetSets(namespace).Get(name) + if err != nil { + return err + } + err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.PetSetHasDesiredPets(scaler.c, job)) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", name) + } + return err + } + return nil +} + type JobScaler struct { c client.BatchInterface } diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index ae728c3fbde..010edbf1e5b 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -82,6 +83,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) { case extensions.Kind("Job"), batch.Kind("Job"): return &JobReaper{c, Interval, Timeout}, nil + case apps.Kind("PetSet"): + return &PetSetReaper{c, Interval, Timeout}, nil + case extensions.Kind("Deployment"): return &DeploymentReaper{c, Interval, Timeout}, nil @@ -119,6 +123,10 @@ type PodReaper struct { type ServiceReaper struct { client.Interface } +type PetSetReaper struct { + client.Interface + pollInterval, timeout time.Duration +} type objInterface interface { Delete(name string) error @@ -307,6 +315,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio if err != nil { return false, nil } + return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil }); err != nil { return err @@ -315,6 +324,53 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio return reaper.Extensions().DaemonSets(namespace).Delete(name) } +func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { + petsets := reaper.Apps().PetSets(namespace) + scaler, err := ScalerFor(apps.Kind("PetSet"), *reaper) + if err != nil { + return err + } + ps, err := petsets.Get(name) + if err != nil { + return err + } + if timeout == 0 { + numPets := ps.Spec.Replicas + timeout = Timeout + time.Duration(10*numPets)*time.Second + } + retry := NewRetryParams(reaper.pollInterval, reaper.timeout) + waitForPetSet := NewRetryParams(reaper.pollInterval, reaper.timeout) + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForPetSet); err != nil { + return err + } + + // TODO: This shouldn't be needed, see corresponding TODO in PetSetHasDesiredPets. + // PetSet should track generation number. + pods := reaper.Pods(namespace) + selector, _ := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + options := api.ListOptions{LabelSelector: selector} + podList, err := pods.List(options) + if err != nil { + return err + } + + errList := []error{} + for _, pod := range podList.Items { + if err := pods.Delete(pod.Name, gracePeriod); err != nil { + if !errors.IsNotFound(err) { + errList = append(errList, err) + } + } + } + if len(errList) > 0 { + return utilerrors.NewAggregate(errList) + } + + // TODO: Cleanup volumes? We don't want to accidentally delete volumes from + // stop, so just leave this up to the the petset. + return petsets.Delete(name, nil) +} + func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { jobs := reaper.Batch().Jobs(namespace) pods := reaper.Pods(namespace) diff --git a/pkg/registry/petset/strategy_test.go b/pkg/registry/petset/strategy_test.go index bc8a88e8069..d4fd81805ff 100644 --- a/pkg/registry/petset/strategy_test.go +++ b/pkg/registry/petset/strategy_test.go @@ -66,7 +66,7 @@ func TestPetSetStrategy(t *testing.T) { // Just Spec.Replicas is allowed to change validPs := &apps.PetSet{ - ObjectMeta: api.ObjectMeta{Name: ps.Name, Namespace: ps.Namespace}, + ObjectMeta: api.ObjectMeta{Name: ps.Name, Namespace: ps.Namespace, ResourceVersion: "1", Generation: 1}, Spec: apps.PetSetSpec{ Selector: ps.Spec.Selector, Template: validPodTemplate.Template, @@ -76,7 +76,7 @@ func TestPetSetStrategy(t *testing.T) { Strategy.PrepareForUpdate(ctx, validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) != 0 { - t.Errorf("Updating spec.Replicas is allowed on a petset.") + t.Errorf("Updating spec.Replicas is allowed on a petset: %v", errs) } validPs.Spec.Selector = &unversioned.LabelSelector{MatchLabels: map[string]string{"a": "bar"}}