From 6bc3052551550aa2bd7306daed44c8b293221ff0 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 25 Apr 2016 12:24:40 -0700 Subject: [PATCH] PetSet alpha controller --- .../app/controllermanager.go | 19 + contrib/completions/bash/kubectl | 2 + pkg/apis/apps/validation/validation.go | 23 +- pkg/apis/apps/validation/validation_test.go | 34 +- pkg/client/cache/listers.go | 65 +++ pkg/client/unversioned/apps.go | 2 +- pkg/client/unversioned/pet_sets.go | 2 +- pkg/client/unversioned/pet_sets_test.go | 12 +- pkg/controller/controller_utils.go | 25 +- .../persistentvolume_recycler_controller.go | 1 + pkg/controller/petset/fakes.go | 324 +++++++++++++++ pkg/controller/petset/identity_mappers.go | 247 ++++++++++++ .../petset/identity_mappers_test.go | 179 +++++++++ pkg/controller/petset/iterator.go | 163 ++++++++ pkg/controller/petset/iterator_test.go | 149 +++++++ pkg/controller/petset/pet.go | 310 ++++++++++++++ pkg/controller/petset/pet_set.go | 356 +++++++++++++++++ pkg/controller/petset/pet_set_test.go | 264 ++++++++++++ pkg/controller/petset/pet_set_utils.go | 168 ++++++++ pkg/kubectl/describe.go | 42 ++ pkg/registry/petset/strategy.go | 10 +- pkg/registry/petset/strategy_test.go | 8 + test/e2e/framework/util.go | 18 + test/e2e/petset.go | 378 ++++++++++++++++++ 24 files changed, 2761 insertions(+), 40 deletions(-) create mode 100644 pkg/controller/petset/fakes.go create mode 100644 pkg/controller/petset/identity_mappers.go create mode 100644 pkg/controller/petset/identity_mappers_test.go create mode 100644 pkg/controller/petset/iterator.go create mode 100644 pkg/controller/petset/iterator_test.go create mode 100644 pkg/controller/petset/pet.go create mode 100644 pkg/controller/petset/pet_set.go create mode 100644 pkg/controller/petset/pet_set_test.go create mode 100644 pkg/controller/petset/pet_set_utils.go create mode 100644 test/e2e/petset.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f974e5c5b1f..e6cfcb0330e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -56,6 +56,7 @@ import ( namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" + petset "k8s.io/kubernetes/pkg/controller/petset" "k8s.io/kubernetes/pkg/controller/podautoscaler" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" replicaset "k8s.io/kubernetes/pkg/controller/replicaset" @@ -351,6 +352,24 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } } + groupVersion = "apps/v1alpha1" + resources, found = resourceMap[groupVersion] + glog.Infof("Attempting to start petset, full resource map %+v", resourceMap) + if containsVersion(versions, groupVersion) && found { + glog.Infof("Starting %s apis", groupVersion) + if containsResource(resources, "petsets") { + glog.Infof("Starting PetSet controller") + resyncPeriod := ResyncPeriod(s)() + go petset.NewPetSetController( + podInformer, + // TODO: Switch to using clientset + kubeClient, + resyncPeriod, + ).Run(1, wait.NeverStop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + } + volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfiguration) provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration) if err != nil { diff --git a/contrib/completions/bash/kubectl b/contrib/completions/bash/kubectl index aca209dfd3b..3cc679556a7 100644 --- a/contrib/completions/bash/kubectl +++ b/contrib/completions/bash/kubectl @@ -541,6 +541,7 @@ _kubectl_describe() must_have_one_noun+=("node") must_have_one_noun+=("persistentvolume") must_have_one_noun+=("persistentvolumeclaim") + must_have_one_noun+=("petset") must_have_one_noun+=("pod") must_have_one_noun+=("replicaset") must_have_one_noun+=("replicationcontroller") @@ -570,6 +571,7 @@ _kubectl_describe() noun_aliases+=("ns") noun_aliases+=("persistentvolumeclaims") noun_aliases+=("persistentvolumes") + noun_aliases+=("petsets") noun_aliases+=("po") noun_aliases+=("pods") noun_aliases+=("pv") diff --git a/pkg/apis/apps/validation/validation.go b/pkg/apis/apps/validation/validation.go index fe66e289d7a..3180f28a4b1 100644 --- a/pkg/apis/apps/validation/validation.go +++ b/pkg/apis/apps/validation/validation.go @@ -17,6 +17,8 @@ limitations under the License. package validation import ( + "reflect" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" unversionedvalidation "k8s.io/kubernetes/pkg/api/unversioned/validation" @@ -31,6 +33,9 @@ import ( // Prefix indicates this name will be used as part of generation, in which case // trailing dashes are allowed. func ValidatePetSetName(name string, prefix bool) (bool, string) { + // TODO: Validate that there's name for the suffix inserted by the pets. + // Currently this is just "-index". In the future we may allow a user + // specified list of suffixes and we need to validate the longest one. return apivalidation.NameIsDNSSubdomain(name, prefix) } @@ -96,8 +101,22 @@ func ValidatePetSet(petSet *apps.PetSet) field.ErrorList { // ValidatePetSetUpdate tests if required fields in the PetSet are set. func ValidatePetSetUpdate(petSet, oldPetSet *apps.PetSet) field.ErrorList { allErrs := field.ErrorList{} - allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&petSet.ObjectMeta, &oldPetSet.ObjectMeta, field.NewPath("metadata"))...) - allErrs = append(allErrs, ValidatePetSetSpec(&petSet.Spec, field.NewPath("spec"))...) + + // TODO: For now we're taking the safe route and disallowing all updates to spec except for Spec.Replicas. + // Enable on a case by case basis. + restoreReplicas := petSet.Spec.Replicas + petSet.Spec.Replicas = oldPetSet.Spec.Replicas + + // The generation changes for this update + restoreGeneration := petSet.Generation + petSet.Generation = oldPetSet.Generation + + if !reflect.DeepEqual(petSet, oldPetSet) { + allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to petset spec for fields other than 'replicas' are forbidden.")) + } + petSet.Spec.Replicas = restoreReplicas + petSet.Generation = restoreGeneration + allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(petSet.Spec.Replicas), field.NewPath("spec", "replicas"))...) return allErrs } diff --git a/pkg/apis/apps/validation/validation_test.go b/pkg/apis/apps/validation/validation_test.go index 1deaab8aaa1..851ea895fc1 100644 --- a/pkg/apis/apps/validation/validation_test.go +++ b/pkg/apis/apps/validation/validation_test.go @@ -276,23 +276,6 @@ func TestValidatePetSetUpdate(t *testing.T) { }, }, }, - { - old: apps.PetSet{ - ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault}, - Spec: apps.PetSetSpec{ - Selector: &unversioned.LabelSelector{MatchLabels: validLabels}, - Template: validPodTemplate.Template, - }, - }, - update: apps.PetSet{ - ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault}, - Spec: apps.PetSetSpec{ - Replicas: 1, - Selector: &unversioned.LabelSelector{MatchLabels: validLabels}, - Template: readWriteVolumePodTemplate.Template, - }, - }, - }, } for _, successCase := range successCases { successCase.old.ObjectMeta.ResourceVersion = "1" @@ -319,6 +302,23 @@ func TestValidatePetSetUpdate(t *testing.T) { }, }, }, + "updates to a field other than spec.Replicas": { + old: apps.PetSet{ + ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault}, + Spec: apps.PetSetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: validLabels}, + Template: validPodTemplate.Template, + }, + }, + update: apps.PetSet{ + ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault}, + Spec: apps.PetSetSpec{ + Replicas: 1, + Selector: &unversioned.LabelSelector{MatchLabels: validLabels}, + Template: readWriteVolumePodTemplate.Template, + }, + }, + }, "invalid selector": { old: apps.PetSet{ ObjectMeta: api.ObjectMeta{Name: "", Namespace: api.NamespaceDefault}, diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 3e8fa20889e..5a9ec391d62 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/labels" @@ -557,3 +558,67 @@ func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id st return o.(*api.PersistentVolumeClaim), nil } + +// StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets. +type StoreToPetSetLister struct { + Store +} + +// Exists checks if the given PetSet exists in the store. +func (s *StoreToPetSetLister) Exists(ps *apps.PetSet) (bool, error) { + _, exists, err := s.Store.Get(ps) + if err != nil { + return false, err + } + return exists, nil +} + +// List lists all PetSets in the store. +func (s *StoreToPetSetLister) List() (psList []apps.PetSet, err error) { + for _, ps := range s.Store.List() { + psList = append(psList, *(ps.(*apps.PetSet))) + } + return psList, nil +} + +type storePetSetsNamespacer struct { + store Store + namespace string +} + +func (s *StoreToPetSetLister) PetSets(namespace string) storePetSetsNamespacer { + return storePetSetsNamespacer{s.Store, namespace} +} + +// GetPodPetSets returns a list of PetSets managing a pod. Returns an error only if no matching PetSets are found. +func (s *StoreToPetSetLister) GetPodPetSets(pod *api.Pod) (psList []apps.PetSet, err error) { + var selector labels.Selector + var ps apps.PetSet + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no PetSets found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + ps = *m.(*apps.PetSet) + if ps.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + if err != nil { + err = fmt.Errorf("invalid selector: %v", err) + return + } + + // If a PetSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + psList = append(psList, ps) + } + if len(psList) == 0 { + err = fmt.Errorf("could not find PetSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/client/unversioned/apps.go b/pkg/client/unversioned/apps.go index c81c53d84ee..1905c29c246 100644 --- a/pkg/client/unversioned/apps.go +++ b/pkg/client/unversioned/apps.go @@ -32,7 +32,7 @@ type AppsClient struct { *restclient.RESTClient } -func (c *AppsClient) PetSet(namespace string) PetSetInterface { +func (c *AppsClient) PetSets(namespace string) PetSetInterface { return newPetSet(c, namespace) } diff --git a/pkg/client/unversioned/pet_sets.go b/pkg/client/unversioned/pet_sets.go index e1897c28164..71b1ea02176 100644 --- a/pkg/client/unversioned/pet_sets.go +++ b/pkg/client/unversioned/pet_sets.go @@ -24,7 +24,7 @@ import ( // PetSetNamespacer has methods to work with PetSet resources in a namespace type PetSetNamespacer interface { - PetSet(namespace string) PetSetInterface + PetSets(namespace string) PetSetInterface } // PetSetInterface exposes methods to work on PetSet resources. diff --git a/pkg/client/unversioned/pet_sets_test.go b/pkg/client/unversioned/pet_sets_test.go index 40ff03323e5..879aa5ce7d9 100644 --- a/pkg/client/unversioned/pet_sets_test.go +++ b/pkg/client/unversioned/pet_sets_test.go @@ -56,7 +56,7 @@ func TestListPetSets(t *testing.T) { }, }, } - receivedRSList, err := c.Setup(t).Apps().PetSet(ns).List(api.ListOptions{}) + receivedRSList, err := c.Setup(t).Apps().PetSets(ns).List(api.ListOptions{}) c.Validate(t, receivedRSList, err) } @@ -81,14 +81,14 @@ func TestGetPetSet(t *testing.T) { }, }, } - receivedRS, err := c.Setup(t).Apps().PetSet(ns).Get("foo") + receivedRS, err := c.Setup(t).Apps().PetSets(ns).Get("foo") c.Validate(t, receivedRS, err) } func TestGetPetSetWithNoName(t *testing.T) { ns := api.NamespaceDefault c := &simple.Client{Error: true} - receivedPod, err := c.Setup(t).Apps().PetSet(ns).Get("") + receivedPod, err := c.Setup(t).Apps().PetSets(ns).Get("") if (err != nil) && (err.Error() != simple.NameRequiredError) { t.Errorf("Expected error: %v, but got %v", simple.NameRequiredError, err) } @@ -120,7 +120,7 @@ func TestUpdatePetSet(t *testing.T) { }, }, } - receivedRS, err := c.Setup(t).Apps().PetSet(ns).Update(requestRS) + receivedRS, err := c.Setup(t).Apps().PetSets(ns).Update(requestRS) c.Validate(t, receivedRS, err) } @@ -130,7 +130,7 @@ func TestDeletePetSet(t *testing.T) { Request: simple.Request{Method: "DELETE", Path: testapi.Apps.ResourcePath(getPetSetResourceName(), ns, "foo"), Query: simple.BuildQueryValues(nil)}, Response: simple.Response{StatusCode: 200}, } - err := c.Setup(t).Apps().PetSet(ns).Delete("foo", nil) + err := c.Setup(t).Apps().PetSets(ns).Delete("foo", nil) c.Validate(t, nil, err) } @@ -158,7 +158,7 @@ func TestCreatePetSet(t *testing.T) { }, }, } - receivedRS, err := c.Setup(t).Apps().PetSet(ns).Create(requestRS) + receivedRS, err := c.Setup(t).Apps().PetSets(ns).Create(requestRS) c.Validate(t, receivedRS, err) } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 405b4500dc4..7945741c668 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -413,15 +413,15 @@ func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *a return r.createPods(nodeName, namespace, template, object) } -func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { +func GetPodFromTemplate(template *api.PodTemplateSpec, parentObject runtime.Object) (*api.Pod, error) { desiredLabels := getPodsLabelSet(template) - desiredAnnotations, err := getPodsAnnotationSet(template, object) + desiredAnnotations, err := getPodsAnnotationSet(template, parentObject) if err != nil { - return err + return nil, err } - accessor, err := meta.Accessor(object) + accessor, err := meta.Accessor(parentObject) if err != nil { - return fmt.Errorf("object does not have ObjectMeta, %v", err) + return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err) } prefix := getPodsPrefix(accessor.GetName()) @@ -433,7 +433,15 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod }, } if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil { - return fmt.Errorf("unable to convert pod template: %v", err) + return nil, fmt.Errorf("unable to convert pod template: %v", err) + } + return pod, nil +} + +func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { + pod, err := GetPodFromTemplate(template, object) + if err != nil { + return err } if len(nodeName) != 0 { pod.Spec.NodeName = nodeName @@ -445,6 +453,11 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err) return fmt.Errorf("unable to create pods: %v", err) } else { + accessor, err := meta.Accessor(object) + if err != nil { + glog.Errorf("parentObject does not have ObjectMeta, %v", err) + return nil + } glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulCreate", "Created pod: %v", newPod.Name) } diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index 20dd1b83eb8..e73a5b9ebc7 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -117,6 +117,7 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time glog.Errorf("Error casting object to PersistentVolume: %v", obj) return } + recycler.reclaimVolume(pv) recycler.removeReleasedVolume(pv) }, }, diff --git a/pkg/controller/petset/fakes.go b/pkg/controller/petset/fakes.go new file mode 100644 index 00000000000..ddbaff1b073 --- /dev/null +++ b/pkg/controller/petset/fakes.go @@ -0,0 +1,324 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api" + api_pod "k8s.io/kubernetes/pkg/api/pod" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/sets" + "speter.net/go/exp/math/dec/inf" +) + +func dec(i int64, exponent int) *inf.Dec { + return inf.NewDec(i, inf.Scale(-exponent)) +} + +func newPVC(name string) api.PersistentVolumeClaim { + return api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: api.PersistentVolumeClaimSpec{ + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceStorage: resource.Quantity{ + Amount: dec(1, 0), + Format: resource.BinarySI, + }, + }, + }, + }, + } +} + +func newPetSetWithVolumes(replicas int, name string, petMounts []api.VolumeMount, podMounts []api.VolumeMount) *apps.PetSet { + mounts := append(petMounts, podMounts...) + claims := []api.PersistentVolumeClaim{} + for _, m := range petMounts { + claims = append(claims, newPVC(m.Name)) + } + + vols := []api.Volume{} + for _, m := range podMounts { + vols = append(vols, api.Volume{ + Name: m.Name, + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: fmt.Sprintf("/tmp/%v", m.Name), + }, + }, + }) + } + + return &apps.PetSet{ + TypeMeta: unversioned.TypeMeta{ + Kind: "PetSet", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + UID: types.UID("test"), + }, + Spec: apps.PetSetSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Replicas: replicas, + Template: api.PodTemplateSpec{ + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "nginx", + Image: "nginx", + VolumeMounts: mounts, + }, + }, + Volumes: vols, + }, + }, + VolumeClaimTemplates: claims, + ServiceName: "governingsvc", + }, + } +} + +func runningPod(ns, name string) *api.Pod { + p := &api.Pod{Status: api.PodStatus{Phase: api.PodRunning}} + p.Namespace = ns + p.Name = name + return p +} + +func newPodList(ps *apps.PetSet, num int) []*api.Pod { + // knownPods are pods in the system + knownPods := []*api.Pod{} + for i := 0; i < num; i++ { + k, _ := newPCB(fmt.Sprintf("%v", i), ps) + knownPods = append(knownPods, k.pod) + } + return knownPods +} + +func newPetSet(replicas int) *apps.PetSet { + petMounts := []api.VolumeMount{ + {Name: "datadir", MountPath: "/tmp/zookeeper"}, + } + podMounts := []api.VolumeMount{ + {Name: "home", MountPath: "/home"}, + } + return newPetSetWithVolumes(replicas, "foo", petMounts, podMounts) +} + +func checkPodForMount(pod *api.Pod, mountName string) error { + for _, c := range pod.Spec.Containers { + for _, v := range c.VolumeMounts { + if v.Name == mountName { + return nil + } + } + } + return fmt.Errorf("Found volume but no associated mount %v in pod %v", mountName, pod.Name) +} + +func newFakePetClient() *fakePetClient { + return &fakePetClient{ + pets: []*pcb{}, + claims: []api.PersistentVolumeClaim{}, + recorder: &record.FakeRecorder{}, + petHealthChecker: &defaultPetHealthChecker{}, + } +} + +type fakePetClient struct { + pets []*pcb + claims []api.PersistentVolumeClaim + petsCreated, petsDeleted int + claimsCreated, claimsDeleted int + recorder record.EventRecorder + petHealthChecker +} + +// Delete fakes pet client deletion. +func (f *fakePetClient) Delete(p *pcb) error { + pets := []*pcb{} + found := false + for i, pet := range f.pets { + if p.pod.Name == pet.pod.Name { + found = true + f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulDelete", "pet: %v", pet.pod.Name) + continue + } + pets = append(pets, f.pets[i]) + } + if !found { + // TODO: Return proper not found error + return fmt.Errorf("Delete failed: pet %v doesn't exist", p.pod.Name) + } + f.pets = pets + f.petsDeleted++ + return nil +} + +// Get fakes getting pets. +func (f *fakePetClient) Get(p *pcb) (*pcb, bool, error) { + for i, pet := range f.pets { + if p.pod.Name == pet.pod.Name { + return f.pets[i], true, nil + } + } + return nil, false, nil +} + +// Create fakes pet creation. +func (f *fakePetClient) Create(p *pcb) error { + for _, pet := range f.pets { + if p.pod.Name == pet.pod.Name { + return fmt.Errorf("Create failed: pet %v already exists", p.pod.Name) + } + } + f.recorder.Eventf(p.parent, api.EventTypeNormal, "SuccessfulCreate", "pet: %v", p.pod.Name) + f.pets = append(f.pets, p) + f.petsCreated++ + return nil +} + +// Update fakes pet updates. +func (f *fakePetClient) Update(expected, wanted *pcb) error { + found := false + pets := []*pcb{} + for i, pet := range f.pets { + if wanted.pod.Name == pet.pod.Name { + f.pets[i].pod.Annotations[api_pod.PodHostnameAnnotation] = wanted.pod.Annotations[api_pod.PodHostnameAnnotation] + f.pets[i].pod.Annotations[api_pod.PodSubdomainAnnotation] = wanted.pod.Annotations[api_pod.PodSubdomainAnnotation] + f.pets[i].pod.Spec = wanted.pod.Spec + found = true + } + pets = append(pets, f.pets[i]) + } + f.pets = pets + if !found { + return fmt.Errorf("Cannot update pet %v not found", wanted.pod.Name) + } + // TODO: Delete pvcs/volumes that are in wanted but not in expected. + return nil +} + +func (f *fakePetClient) getPodList() []*api.Pod { + p := []*api.Pod{} + for i, pet := range f.pets { + if pet.pod == nil { + continue + } + p = append(p, f.pets[i].pod) + } + return p +} + +func (f *fakePetClient) deletePetAtIndex(index int) { + p := []*pcb{} + for i := range f.pets { + if i != index { + p = append(p, f.pets[i]) + } + } + f.pets = p +} + +func (f *fakePetClient) setHealthy(index int) error { + if len(f.pets) < index { + return fmt.Errorf("Index out of range, len %v index %v", len(f.pets), index) + } + f.pets[index].pod.Status.Phase = api.PodRunning + f.pets[index].pod.Annotations[PetSetInitAnnotation] = "true" + return nil +} + +// isHealthy is a convenience wrapper around the default health checker. +// The first invocation returns not-healthy, but marks the pet healthy so +// subsequent invocations see it as healthy. +func (f *fakePetClient) isHealthy(pod *api.Pod) bool { + if f.petHealthChecker.isHealthy(pod) { + return true + } + return false +} + +func (f *fakePetClient) setDeletionTimestamp(index int) error { + if len(f.pets) < index { + return fmt.Errorf("Index out of range, len %v index %v", len(f.pets), index) + } + f.pets[index].pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()} + return nil +} + +// SyncPVCs fakes pvc syncing. +func (f *fakePetClient) SyncPVCs(pet *pcb) error { + v := pet.pvcs + updateClaims := map[string]api.PersistentVolumeClaim{} + for i, update := range v { + updateClaims[update.Name] = v[i] + } + claimList := []api.PersistentVolumeClaim{} + for i, existing := range f.claims { + if update, ok := updateClaims[existing.Name]; ok { + claimList = append(claimList, update) + delete(updateClaims, existing.Name) + } else { + claimList = append(claimList, f.claims[i]) + } + } + for _, remaining := range updateClaims { + claimList = append(claimList, remaining) + f.claimsCreated++ + f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulCreate", "pvc: %v", remaining.Name) + } + f.claims = claimList + return nil +} + +// DeletePVCs fakes pvc deletion. +func (f *fakePetClient) DeletePVCs(pet *pcb) error { + claimsToDelete := pet.pvcs + deleteClaimNames := sets.NewString() + for _, c := range claimsToDelete { + deleteClaimNames.Insert(c.Name) + } + pvcs := []api.PersistentVolumeClaim{} + for i, existing := range f.claims { + if deleteClaimNames.Has(existing.Name) { + deleteClaimNames.Delete(existing.Name) + f.claimsDeleted++ + f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulDelete", "pvc: %v", existing.Name) + continue + } + pvcs = append(pvcs, f.claims[i]) + } + f.claims = pvcs + if deleteClaimNames.Len() != 0 { + return fmt.Errorf("Claims %+v don't exist. Failed deletion.", deleteClaimNames) + } + return nil +} diff --git a/pkg/controller/petset/identity_mappers.go b/pkg/controller/petset/identity_mappers.go new file mode 100644 index 00000000000..ae72ef2a8f9 --- /dev/null +++ b/pkg/controller/petset/identity_mappers.go @@ -0,0 +1,247 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "crypto/md5" + "fmt" + "sort" + "strings" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + podapi "k8s.io/kubernetes/pkg/api/pod" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/util/sets" +) + +// identityMapper is an interface for assigning identities to a pet. +// All existing identity mappers just append "-(index)" to the petset name to +// generate a unique identity. This is used in claims/DNS/hostname/petname +// etc. There's a more elegant way to achieve this mapping, but we're +// taking the simplest route till we have data on whether users will need +// more customization. +// Note that running a single identity mapper is not guaranteed to give +// your pet a unique identity. You must run them all. Order doesn't matter. +type identityMapper interface { + // SetIdentity takes an id and assigns the given pet an identity based + // on the pet set spec. The is must be unique amongst members of the + // pet set. + SetIdentity(id string, pet *api.Pod) + + // Identity returns the identity of the pet. + Identity(pod *api.Pod) string +} + +func newIdentityMappers(ps *apps.PetSet) []identityMapper { + return []identityMapper{ + &NameIdentityMapper{ps}, + &NetworkIdentityMapper{ps}, + &VolumeIdentityMapper{ps}, + } +} + +// NetworkIdentityMapper assigns network identity to pets. +type NetworkIdentityMapper struct { + ps *apps.PetSet +} + +// SetIdentity sets network identity on the pet. +func (n *NetworkIdentityMapper) SetIdentity(id string, pet *api.Pod) { + pet.Annotations[podapi.PodHostnameAnnotation] = fmt.Sprintf("%v-%v", n.ps.Name, id) + pet.Annotations[podapi.PodSubdomainAnnotation] = n.ps.Spec.ServiceName + return +} + +// Identity returns the network identity of the pet. +func (n *NetworkIdentityMapper) Identity(pet *api.Pod) string { + return n.String(pet) +} + +// String is a string function for the network identity of the pet. +func (n *NetworkIdentityMapper) String(pet *api.Pod) string { + hostname := pet.Annotations[podapi.PodHostnameAnnotation] + subdomain := pet.Annotations[podapi.PodSubdomainAnnotation] + return strings.Join([]string{hostname, subdomain, n.ps.Namespace}, ".") +} + +// VolumeIdentityMapper assigns storage identity to pets. +type VolumeIdentityMapper struct { + ps *apps.PetSet +} + +// SetIdentity sets storge identity on the pet. +func (v *VolumeIdentityMapper) SetIdentity(id string, pet *api.Pod) { + petVolumes := []api.Volume{} + petClaims := v.GetClaims(id) + + // These volumes will all go down with the pod. If a name matches one of + // the claims in the pet set, it gets clobbered. + podVolumes := map[string]api.Volume{} + for _, podVol := range pet.Spec.Volumes { + podVolumes[podVol.Name] = podVol + } + + // Insert claims for the idempotent petSet volumes + for name, claim := range petClaims { + // Volumes on a pet for which there are no associated claims on the + // petset are pod local, and die with the pod. + podVol, ok := podVolumes[name] + if ok { + // TODO: Validate and reject this. + glog.V(4).Infof("Overwriting existing volume source %v", podVol.Name) + } + newVol := api.Volume{ + Name: name, + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: claim.Name, + // TODO: Use source definition to set this value when we have one. + ReadOnly: false, + }, + }, + } + petVolumes = append(petVolumes, newVol) + } + + // Transfer any ephemeral pod volumes + for name, vol := range podVolumes { + if _, ok := petClaims[name]; !ok { + petVolumes = append(petVolumes, vol) + } + } + pet.Spec.Volumes = petVolumes + return +} + +// Identity returns the storage identity of the pet. +func (v *VolumeIdentityMapper) Identity(pet *api.Pod) string { + // TODO: Make this a hash? + return v.String(pet) +} + +// String is a string function for the network identity of the pet. +func (v *VolumeIdentityMapper) String(pet *api.Pod) string { + ids := []string{} + petVols := sets.NewString() + for _, petVol := range v.ps.Spec.VolumeClaimTemplates { + petVols.Insert(petVol.Name) + } + for _, podVol := range pet.Spec.Volumes { + // Volumes on a pet for which there are no associated claims on the + // petset are pod local, and die with the pod. + if !petVols.Has(podVol.Name) { + continue + } + if podVol.VolumeSource.PersistentVolumeClaim == nil { + // TODO: Is this a part of the identity? + ids = append(ids, fmt.Sprintf("%v:None", podVol.Name)) + continue + } + ids = append(ids, fmt.Sprintf("%v:%v", podVol.Name, podVol.VolumeSource.PersistentVolumeClaim.ClaimName)) + } + sort.Strings(ids) + return strings.Join(ids, "") +} + +// GetClaims returns the volume claims associated with the given id. +// The claims belong to the petset. The id should be unique within a petset. +func (v *VolumeIdentityMapper) GetClaims(id string) map[string]api.PersistentVolumeClaim { + petClaims := map[string]api.PersistentVolumeClaim{} + for _, pvc := range v.ps.Spec.VolumeClaimTemplates { + claim := pvc + // TODO: Name length checking in validation. + claim.Name = fmt.Sprintf("%v-%v-%v", claim.Name, v.ps.Name, id) + claim.Namespace = v.ps.Namespace + claim.Labels = v.ps.Spec.Selector.MatchLabels + + // TODO: We're assuming that the claim template has a volume QoS key, eg: + // volume.alpha.kubernetes.io/storage-class: anything + petClaims[pvc.Name] = claim + } + return petClaims +} + +// GetClaimsForPet returns the pvcs for the given pet. +func (v *VolumeIdentityMapper) GetClaimsForPet(pet *api.Pod) []api.PersistentVolumeClaim { + // Strip out the "-(index)" from the pet name and use it to generate + // claim names. + id := strings.Split(pet.Name, "-") + petID := id[len(id)-1] + pvcs := []api.PersistentVolumeClaim{} + for _, pvc := range v.GetClaims(petID) { + pvcs = append(pvcs, pvc) + } + return pvcs +} + +// NameIdentityMapper assigns names to pets. +// It also puts the pet in the same namespace as the parent. +type NameIdentityMapper struct { + ps *apps.PetSet +} + +// SetIdentity sets the pet namespace and name. +func (n *NameIdentityMapper) SetIdentity(id string, pet *api.Pod) { + pet.Name = fmt.Sprintf("%v-%v", n.ps.Name, id) + pet.Namespace = n.ps.Namespace + return +} + +// Identity returns the name identity of the pet. +func (n *NameIdentityMapper) Identity(pet *api.Pod) string { + return n.String(pet) +} + +// String is a string function for the name identity of the pet. +func (n *NameIdentityMapper) String(pet *api.Pod) string { + return fmt.Sprintf("%v/%v", pet.Namespace, pet.Name) +} + +// identityHash computes a hash of the pet by running all the above identity +// mappers. +func identityHash(ps *apps.PetSet, pet *api.Pod) string { + id := "" + for _, idMapper := range newIdentityMappers(ps) { + id += idMapper.Identity(pet) + } + return fmt.Sprintf("%x", md5.Sum([]byte(id))) +} + +// copyPetID gives the realPet the same identity as the expectedPet. +// Note that this is *not* a literal copy, but a copy of the fields that +// contribute to the pet's identity. The returned boolean 'needsUpdate' will +// be false if the realPet already has the same identity as the expectedPet. +func copyPetID(realPet, expectedPet *pcb) (pod api.Pod, needsUpdate bool, err error) { + if realPet.pod == nil || expectedPet.pod == nil { + return pod, false, fmt.Errorf("Need a valid to and from pet for copy") + } + if realPet.parent.UID != expectedPet.parent.UID { + return pod, false, fmt.Errorf("Cannot copy pets with different parents") + } + ps := realPet.parent + if identityHash(ps, realPet.pod) == identityHash(ps, expectedPet.pod) { + return *realPet.pod, false, nil + } + copyPod := *realPet.pod + // This is the easiest way to give an identity to a pod. It won't work + // when we stop using names for id. + for _, idMapper := range newIdentityMappers(ps) { + idMapper.SetIdentity(expectedPet.id, ©Pod) + } + return copyPod, true, nil +} diff --git a/pkg/controller/petset/identity_mappers_test.go b/pkg/controller/petset/identity_mappers_test.go new file mode 100644 index 00000000000..f9a736fc30d --- /dev/null +++ b/pkg/controller/petset/identity_mappers_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "reflect" + "strings" + + "k8s.io/kubernetes/pkg/api" + api_pod "k8s.io/kubernetes/pkg/api/pod" + "testing" +) + +func TestPetIDName(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + for i := 0; i < replicas; i++ { + petName := fmt.Sprintf("%v-%d", ps.Name, i) + pcb, err := newPCB(fmt.Sprintf("%d", i), ps) + if err != nil { + t.Fatalf("Failed to generate pet %v", err) + } + pod := pcb.pod + if pod.Name != petName || pod.Namespace != ps.Namespace { + t.Errorf("Wrong name identity, expected %v", pcb.pod.Name) + } + } +} + +func TestPetIDDNS(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + for i := 0; i < replicas; i++ { + petName := fmt.Sprintf("%v-%d", ps.Name, i) + petSubdomain := ps.Spec.ServiceName + pcb, err := newPCB(fmt.Sprintf("%d", i), ps) + pod := pcb.pod + if err != nil { + t.Fatalf("Failed to generate pet %v", err) + } + if hostname, ok := pod.Annotations[api_pod.PodHostnameAnnotation]; !ok || hostname != petName { + t.Errorf("Wrong hostname: %v", petName) + } + // TODO: Check this against the governing service. + if subdomain, ok := pod.Annotations[api_pod.PodSubdomainAnnotation]; !ok || subdomain != petSubdomain { + t.Errorf("Wrong subdomain: %v", petName) + } + } +} +func TestPetIDVolume(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + for i := 0; i < replicas; i++ { + pcb, err := newPCB(fmt.Sprintf("%d", i), ps) + if err != nil { + t.Fatalf("Failed to generate pet %v", err) + } + pod := pcb.pod + petName := fmt.Sprintf("%v-%d", ps.Name, i) + claimName := fmt.Sprintf("datadir-%v", petName) + for _, v := range pod.Spec.Volumes { + switch v.Name { + case "datadir": + c := v.VolumeSource.PersistentVolumeClaim + if c == nil || c.ClaimName != claimName { + t.Fatalf("Unexpected claim %v", c) + } + if err := checkPodForMount(pod, "datadir"); err != nil { + t.Errorf("Expected pod mount: %v", err) + } + case "home": + h := v.VolumeSource.HostPath + if h == nil || h.Path != "/tmp/home" { + t.Errorf("Unexpected modification to hostpath, expected /tmp/home got %+v", h) + } + default: + t.Errorf("Unexpected volume %v", v.Name) + } + } + } + // TODO: Check volume mounts. +} + +func TestPetIDVolumeClaims(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + for i := 0; i < replicas; i++ { + pcb, err := newPCB(fmt.Sprintf("%v", i), ps) + if err != nil { + t.Fatalf("Failed to generate pet %v", err) + } + pvcs := pcb.pvcs + petName := fmt.Sprintf("%v-%d", ps.Name, i) + claimName := fmt.Sprintf("datadir-%v", petName) + if len(pvcs) != 1 || pvcs[0].Name != claimName { + t.Errorf("Wrong pvc expected %v got %v", claimName, pvcs[0].Name) + } + } +} + +func TestPetIDCrossAssignment(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + + nameMapper := &NameIdentityMapper{ps} + volumeMapper := &VolumeIdentityMapper{ps} + networkMapper := &NetworkIdentityMapper{ps} + + // Check that the name is consistent across identity. + for i := 0; i < replicas; i++ { + pet, _ := newPCB(fmt.Sprintf("%v", i), ps) + p := pet.pod + name := strings.Split(nameMapper.Identity(p), "/")[1] + network := networkMapper.Identity(p) + volume := volumeMapper.Identity(p) + + petVolume := strings.Split(volume, ":")[1] + + if petVolume != fmt.Sprintf("datadir-%v", name) { + t.Errorf("Unexpected pet volume name %v, expected %v", petVolume, name) + } + if network != fmt.Sprintf("%v.%v.%v", name, ps.Spec.ServiceName, ps.Namespace) { + t.Errorf("Unexpected pet network ID %v, expected %v", network, name) + } + t.Logf("[%v] volume: %+v, network: %+v, name: %+v", i, volume, network, name) + } +} + +func TestPetIDReset(t *testing.T) { + replicas := 2 + ps := newPetSet(replicas) + firstPCB, err := newPCB("1", ps) + secondPCB, err := newPCB("2", ps) + if identityHash(ps, firstPCB.pod) == identityHash(ps, secondPCB.pod) { + t.Fatalf("Failed to generate uniquey identities:\n%+v\n%+v", firstPCB.pod.Spec, secondPCB.pod.Spec) + } + userAdded := api.Volume{ + Name: "test", + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory}, + }, + } + firstPCB.pod.Spec.Volumes = append(firstPCB.pod.Spec.Volumes, userAdded) + pod, needsUpdate, err := copyPetID(firstPCB, secondPCB) + if err != nil { + t.Errorf("%v", err) + } + if !needsUpdate { + t.Errorf("expected update since identity of %v was reset", secondPCB.pod.Name) + } + if identityHash(ps, &pod) != identityHash(ps, secondPCB.pod) { + t.Errorf("Failed to copy identity for pod %v -> %v", firstPCB.pod.Name, secondPCB.pod.Name) + } + foundVol := false + for _, v := range pod.Spec.Volumes { + if reflect.DeepEqual(v, userAdded) { + foundVol = true + break + } + } + if !foundVol { + t.Errorf("User added volume was corrupted by reset action.") + } +} diff --git a/pkg/controller/petset/iterator.go b/pkg/controller/petset/iterator.go new file mode 100644 index 00000000000..81df6814ccd --- /dev/null +++ b/pkg/controller/petset/iterator.go @@ -0,0 +1,163 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "sort" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/controller" +) + +// newPCB generates a new PCB using the id string as a unique qualifier +func newPCB(id string, ps *apps.PetSet) (*pcb, error) { + petPod, err := controller.GetPodFromTemplate(&ps.Spec.Template, ps) + if err != nil { + return nil, err + } + for _, im := range newIdentityMappers(ps) { + im.SetIdentity(id, petPod) + } + petPVCs := []api.PersistentVolumeClaim{} + vMapper := &VolumeIdentityMapper{ps} + for _, c := range vMapper.GetClaims(id) { + petPVCs = append(petPVCs, c) + } + // TODO: Replace id field with IdentityHash, since id is more than just an index. + return &pcb{pod: petPod, pvcs: petPVCs, id: id, parent: ps}, nil +} + +// petQueue is a custom datastructure that's resembles a queue of pets. +type petQueue struct { + pets []*pcb + idMapper identityMapper +} + +// enqueue enqueues the given pet, evicting any pets with the same id +func (pt *petQueue) enqueue(p *pcb) { + if p == nil { + pt.pets = append(pt.pets, nil) + return + } + // Pop an existing pet from the know list, append the new pet to the end. + petList := []*pcb{} + petID := pt.idMapper.Identity(p.pod) + for i := range pt.pets { + if petID != pt.idMapper.Identity(pt.pets[i].pod) { + petList = append(petList, pt.pets[i]) + } + } + pt.pets = petList + p.event = syncPet + pt.pets = append(pt.pets, p) +} + +// dequeue returns the last element of the queue +func (pt *petQueue) dequeue() *pcb { + if pt.empty() { + glog.Warningf("Dequeue invoked on an empty queue") + return nil + } + l := len(pt.pets) - 1 + pet := pt.pets[l] + pt.pets = pt.pets[:l] + return pet +} + +// empty returns true if the pet queue is empty. +func (pt *petQueue) empty() bool { + return len(pt.pets) == 0 +} + +// NewPetQueue returns a queue for tracking pets +func NewPetQueue(ps *apps.PetSet, podList []*api.Pod) *petQueue { + pt := petQueue{pets: []*pcb{}, idMapper: &NameIdentityMapper{ps}} + // Seed the queue with existing pets. Assume all pets are scheduled for + // deletion, enqueuing a pet will "undelete" it. We always want to delete + // from the higher ids, so sort by creation timestamp. + + sort.Sort(PodsByCreationTimestamp(podList)) + vMapper := VolumeIdentityMapper{ps} + for i := range podList { + pod := podList[i] + pt.pets = append(pt.pets, &pcb{pod: pod, pvcs: vMapper.GetClaimsForPet(pod), parent: ps, event: deletePet, id: fmt.Sprintf("%v", i)}) + } + return &pt +} + +// petsetIterator implements a simple iterator over pets in the given petset. +type petSetIterator struct { + // ps is the petset for this iterator. + ps *apps.PetSet + // queue contains the elements to iterate over. + queue *petQueue + // errs is a list because we always want the iterator to drain. + errs []error + // petCount is the number of pets iterated over. + petCount int +} + +// Next returns true for as long as there are elements in the underlying queue. +func (pi *petSetIterator) Next() bool { + var pet *pcb + var err error + if pi.petCount < pi.ps.Spec.Replicas { + pet, err = newPCB(fmt.Sprintf("%d", pi.petCount), pi.ps) + if err != nil { + pi.errs = append(pi.errs, err) + // Don't stop iterating over the set on errors. Caller handles nil. + pet = nil + } + pi.queue.enqueue(pet) + pi.petCount++ + } + // Keep the iterator running till we've deleted pets in the queue. + return !pi.queue.empty() +} + +// Value dequeues an element from the queue. +func (pi *petSetIterator) Value() *pcb { + return pi.queue.dequeue() +} + +// NewPetSetIterator returns a new iterator. All pods in the given podList +// are used to seed the queue of the iterator. +func NewPetSetIterator(ps *apps.PetSet, podList []*api.Pod) *petSetIterator { + pi := &petSetIterator{ + ps: ps, + queue: NewPetQueue(ps, podList), + errs: []error{}, + petCount: 0, + } + return pi +} + +// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker. +type PodsByCreationTimestamp []*api.Pod + +func (o PodsByCreationTimestamp) Len() int { return len(o) } +func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o PodsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/pkg/controller/petset/iterator_test.go b/pkg/controller/petset/iterator_test.go new file mode 100644 index 00000000000..ab07c4223b1 --- /dev/null +++ b/pkg/controller/petset/iterator_test.go @@ -0,0 +1,149 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/sets" + "testing" +) + +func TestPetQueueCreates(t *testing.T) { + replicas := 3 + ps := newPetSet(replicas) + q := NewPetQueue(ps, []*api.Pod{}) + for i := 0; i < replicas; i++ { + pet, _ := newPCB(fmt.Sprintf("%v", i), ps) + q.enqueue(pet) + p := q.dequeue() + if p.event != syncPet { + t.Errorf("Failed to retrieve sync event from queue") + } + } + if q.dequeue() != nil { + t.Errorf("Expected no pets") + } +} + +func TestPetQueueScaleDown(t *testing.T) { + replicas := 1 + ps := newPetSet(replicas) + + // knownPods are the pods in the system + knownPods := newPodList(ps, 3) + + q := NewPetQueue(ps, knownPods) + + // The iterator will insert a single replica, the enqueue + // mimics that behavior. + pet, _ := newPCB(fmt.Sprintf("%v", 0), ps) + q.enqueue(pet) + + deletes := sets.NewString(fmt.Sprintf("%v-1", ps.Name), fmt.Sprintf("%v-2", ps.Name)) + syncs := sets.NewString(fmt.Sprintf("%v-0", ps.Name)) + + // Confirm that 2 known pods are deleted + for i := 0; i < 3; i++ { + p := q.dequeue() + switch p.event { + case syncPet: + if !syncs.Has(p.pod.Name) { + t.Errorf("Unexpected sync %v expecting %+v", p.pod.Name, syncs) + } + case deletePet: + if !deletes.Has(p.pod.Name) { + t.Errorf("Unexpected deletes %v expecting %+v", p.pod.Name, deletes) + } + } + } + if q.dequeue() != nil { + t.Errorf("Expected no pets") + } +} + +func TestPetQueueScaleUp(t *testing.T) { + replicas := 5 + ps := newPetSet(replicas) + + // knownPods are pods in the system + knownPods := newPodList(ps, 2) + + q := NewPetQueue(ps, knownPods) + for i := 0; i < 5; i++ { + pet, _ := newPCB(fmt.Sprintf("%v", i), ps) + q.enqueue(pet) + } + for i := 4; i >= 0; i-- { + pet := q.dequeue() + expectedName := fmt.Sprintf("%v-%d", ps.Name, i) + if pet.event != syncPet || pet.pod.Name != expectedName { + t.Errorf("Unexpected pet %+v, expected %v", pet.pod.Name, expectedName) + } + } +} + +func TestPetSetIteratorRelist(t *testing.T) { + replicas := 5 + ps := newPetSet(replicas) + + // knownPods are pods in the system + knownPods := newPodList(ps, 5) + for i := range knownPods { + knownPods[i].Spec.NodeName = fmt.Sprintf("foo-node-%v", i) + knownPods[i].Status.Phase = api.PodRunning + } + pi := NewPetSetIterator(ps, knownPods) + + // A simple resync should not change identity of pods in the system + i := 0 + for pi.Next() { + p := pi.Value() + if identityHash(ps, p.pod) != identityHash(ps, knownPods[i]) { + t.Errorf("Got unexpected identity hash from iterator.") + } + if p.event != syncPet { + t.Errorf("Got unexpected sync event for %v: %v", p.pod.Name, p.event) + } + i++ + } + if i != 5 { + t.Errorf("Unexpected iterations %v, this probably means too many/few pets", i) + } + + // Scale to 0 should delete all pods in system + ps.Spec.Replicas = 0 + pi = NewPetSetIterator(ps, knownPods) + i = 0 + for pi.Next() { + p := pi.Value() + if p.event != deletePet { + t.Errorf("Got unexpected sync event for %v: %v", p.pod.Name, p.event) + } + i++ + } + if i != 5 { + t.Errorf("Unexpected iterations %v, this probably means too many/few pets", i) + } + + // Relist with 0 replicas should no-op + pi = NewPetSetIterator(ps, []*api.Pod{}) + if pi.Next() != false { + t.Errorf("Unexpected iteration without any replicas or pods in system") + } +} diff --git a/pkg/controller/petset/pet.go b/pkg/controller/petset/pet.go new file mode 100644 index 00000000000..685e3d6e031 --- /dev/null +++ b/pkg/controller/petset/pet.go @@ -0,0 +1,310 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "strconv" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/runtime" + + "github.com/golang/glog" +) + +// petLifeCycleEvent is used to communicate high level actions the controller +// needs to take on a given pet. It's recorded in the pcb. The recognized values +// are listed below. +type petLifeCycleEvent string + +const ( + syncPet petLifeCycleEvent = "sync" + deletePet petLifeCycleEvent = "delete" + // updateRetries is the number of Get/Update cycles we perform when an + // update fails. + updateRetries = 3 + // PetSetInitAnnotation is an annotation which when set, indicates that the + // pet has finished initializing itself. + // TODO: Replace this with init container status. + PetSetInitAnnotation = "pod.alpha.kubernetes.io/initialized" +) + +// pcb is the control block used to transmit all updates about a single pet. +// It serves as the manifest for a single pet. Users must populate the pod +// and parent fields to pass it around safely. +type pcb struct { + // pod is the desired pet pod. + pod *api.Pod + // pvcs is a list of desired persistent volume claims for the pet pod. + pvcs []api.PersistentVolumeClaim + // event is the lifecycle event associated with this update. + event petLifeCycleEvent + // id is the identity index of this pet. + id string + // parent is a pointer to the parent petset. + parent *apps.PetSet +} + +// pvcClient is a client for managing persistent volume claims. +type pvcClient interface { + // DeletePVCs deletes the pvcs in the given pcb. + DeletePVCs(*pcb) error + // SyncPVCs creates/updates pvcs in the given pcb. + SyncPVCs(*pcb) error +} + +// petSyncer syncs a single pet. +type petSyncer struct { + petClient + + // blockingPet is an unhealthy pet either from this iteration or a previous + // iteration, either because it is not yet Running, or being Deleted, that + // prevents other creates/deletions. + blockingPet *pcb +} + +// Sync syncs the given pet. +func (p *petSyncer) Sync(pet *pcb) error { + if pet == nil { + return nil + } + realPet, exists, err := p.Get(pet) + if err != nil { + return err + } + // There is not constraint except quota on the number of pvcs created. + // This is done per pet so we get a working cluster ASAP, even if user + // runs out of quota. + if err := p.SyncPVCs(pet); err != nil { + return err + } + if exists { + if !p.isHealthy(realPet.pod) { + glog.Infof("PetSet %v waiting on unhealthy pet %v", pet.parent.Name, realPet.pod.Name) + } + return p.Update(realPet, pet) + } + if p.blockingPet != nil { + glog.Infof("Create of %v in PetSet %v blocked by unhealthy pet %v", pet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name) + return nil + } + // This is counted as a create, even if it fails. We can't skip indices + // because some pets might allocate a special role to earlier indices. + // The returned error will force a requeue. + // TODO: What's the desired behavior if pet-0 is deleted while pet-1 is + // not yet healthy? currently pet-0 will wait till pet-1 is healthy, + // this feels safer, but might lead to deadlock. + p.blockingPet = pet + if err := p.Create(pet); err != nil { + return err + } + return nil +} + +// Delete deletes the given pet, if no other pet in the petset is blocking a +// scale event. +func (p *petSyncer) Delete(pet *pcb) error { + if pet == nil { + return nil + } + realPet, exists, err := p.Get(pet) + if err != nil { + return err + } + if !exists { + return nil + } + if p.blockingPet != nil { + glog.Infof("Delete of %v in PetSet %v blocked by unhealthy pet %v", realPet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name) + return nil + } + // This is counted as a delete, even if it fails. + // The returned error will force a requeue. + p.blockingPet = realPet + if !p.isDying(realPet.pod) { + glog.Infof("PetSet %v deleting pet %v", pet.parent.Name, pet.pod.Name) + return p.petClient.Delete(pet) + } + glog.Infof("PetSet %v waiting on pet %v to die in %v", pet.parent.Name, realPet.pod.Name, realPet.pod.DeletionTimestamp) + return nil +} + +// petClient is a client for managing pets. +type petClient interface { + pvcClient + petHealthChecker + Delete(*pcb) error + Get(*pcb) (*pcb, bool, error) + Create(*pcb) error + Update(*pcb, *pcb) error +} + +// apiServerPetClient is a petset aware Kubernetes client. +type apiServerPetClient struct { + c *client.Client + recorder record.EventRecorder + petHealthChecker +} + +// Get gets the pet in the pcb from the apiserver. +func (p *apiServerPetClient) Get(pet *pcb) (*pcb, bool, error) { + found := true + ns := pet.parent.Namespace + pod, err := podClient(p.c, ns).Get(pet.pod.Name) + if errors.IsNotFound(err) { + found = false + err = nil + } + if err != nil || !found { + return nil, found, err + } + realPet := *pet + realPet.pod = pod + return &realPet, true, nil +} + +// Delete deletes the pet in the pcb from the apiserver. +func (p *apiServerPetClient) Delete(pet *pcb) error { + err := podClient(p.c, pet.parent.Namespace).Delete(pet.pod.Name, nil) + if errors.IsNotFound(err) { + err = nil + } + p.event(pet.parent, "Delete", fmt.Sprintf("pet: %v", pet.pod.Name), err) + return err +} + +// Create creates the pet in the pcb. +func (p *apiServerPetClient) Create(pet *pcb) error { + _, err := podClient(p.c, pet.parent.Namespace).Create(pet.pod) + p.event(pet.parent, "Create", fmt.Sprintf("pet: %v", pet.pod.Name), err) + return err +} + +// Update updates the pet in the 'pet' pcb to match the pet in the 'expectedPet' pcb. +func (p *apiServerPetClient) Update(pet *pcb, expectedPet *pcb) (updateErr error) { + var getErr error + pc := podClient(p.c, pet.parent.Namespace) + + pod, needsUpdate, err := copyPetID(pet, expectedPet) + if err != nil || !needsUpdate { + return err + } + glog.Infof("Resetting pet %v to match PetSet %v spec", pod.Name, pet.parent.Name) + for i, p := 0, &pod; ; i++ { + _, updateErr = pc.Update(p) + if updateErr == nil || i >= updateRetries { + return updateErr + } + if p, getErr = pc.Get(pod.Name); getErr != nil { + return getErr + } + } +} + +// DeletePVCs should delete PVCs, when implemented. +func (p *apiServerPetClient) DeletePVCs(pet *pcb) error { + // TODO: Implement this when we delete pvcs. + return nil +} + +func (p *apiServerPetClient) getPVC(pvcName, pvcNamespace string) (*api.PersistentVolumeClaim, bool, error) { + found := true + pvc, err := claimClient(p.c, pvcNamespace).Get(pvcName) + if errors.IsNotFound(err) { + found = false + } + if err != nil || !found { + return nil, found, err + } + return pvc, true, nil +} + +func (p *apiServerPetClient) createPVC(pvc *api.PersistentVolumeClaim) error { + _, err := claimClient(p.c, pvc.Namespace).Create(pvc) + return err +} + +// SyncPVCs syncs pvcs in the given pcb. +func (p *apiServerPetClient) SyncPVCs(pet *pcb) error { + errMsg := "" + // Create new claims. + for i, pvc := range pet.pvcs { + _, exists, err := p.getPVC(pvc.Name, pet.parent.Namespace) + if !exists { + if err := p.createPVC(&pet.pvcs[i]); err != nil { + errMsg += fmt.Sprintf("Failed to create %v: %v", pvc.Name, err) + } + p.event(pet.parent, "Create", fmt.Sprintf("pvc: %v", pvc.Name), err) + } else if err != nil { + errMsg += fmt.Sprintf("Error trying to get pvc %v, %v.", pvc.Name, err) + } + // TODO: Check resource requirements and accessmodes, update if necessary + } + if len(errMsg) != 0 { + return fmt.Errorf("%v", errMsg) + } + return nil +} + +// event formats an event for the given runtime object. +func (p *apiServerPetClient) event(obj runtime.Object, reason, msg string, err error) { + if err != nil { + p.recorder.Eventf(obj, api.EventTypeWarning, fmt.Sprintf("Failed%v", reason), fmt.Sprintf("%v, error: %v", msg, err)) + } else { + p.recorder.Eventf(obj, api.EventTypeNormal, fmt.Sprintf("Successful%v", reason), msg) + } +} + +// petHealthChecker is an interface to check pet health. It makes a boolean +// decision based on the given pod. +type petHealthChecker interface { + isHealthy(*api.Pod) bool + isDying(*api.Pod) bool +} + +// defaultPetHealthChecks does basic health checking. +// It doesn't update, probe or get the pod. +type defaultPetHealthChecker struct{} + +// isHealthy returns true if the pod is running and has the +// "pod.alpha.kubernetes.io/initialized" set to "true". +func (d *defaultPetHealthChecker) isHealthy(pod *api.Pod) bool { + if pod == nil || pod.Status.Phase != api.PodRunning { + return false + } + initialized, ok := pod.Annotations[PetSetInitAnnotation] + if !ok { + glog.Infof("PetSet pod %v in %v, waiting on annotation %v", api.PodRunning, pod.Name, PetSetInitAnnotation) + return false + } + b, err := strconv.ParseBool(initialized) + if err != nil { + return false + } + return b +} + +// isDying returns true if the pod has a non-nil deletion timestamp. Since the +// timestamp can only decrease, once this method returns true for a given pet, it +// will never return false. +func (d *defaultPetHealthChecker) isDying(pod *api.Pod) bool { + return pod != nil && pod.DeletionTimestamp != nil +} diff --git a/pkg/controller/petset/pet_set.go b/pkg/controller/petset/pet_set.go new file mode 100644 index 00000000000..809327397c3 --- /dev/null +++ b/pkg/controller/petset/pet_set.go @@ -0,0 +1,356 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "reflect" + "sort" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + // Time to sleep before polling to see if the pod cache has synced. + PodStoreSyncedPollPeriod = 100 * time.Millisecond + // number of retries for a status update. + statusUpdateRetries = 2 + // period to relist petsets and verify pets + petSetResyncPeriod = 30 * time.Second +) + +// PetSetController controls petsets. +type PetSetController struct { + kubeClient *client.Client + + // newSyncer returns an interface capable of syncing a single pet. + // Abstracted out for testing. + newSyncer func(*pcb) *petSyncer + + // podStore is a cache of watched pods. + podStore cache.StoreToPodLister + + // podStoreSynced returns true if the pod store has synced at least once. + podStoreSynced func() bool + // Watches changes to all pods. + podController framework.ControllerInterface + + // A store of PetSets, populated by the psController. + psStore cache.StoreToPetSetLister + // Watches changes to all PetSets. + psController *framework.Controller + + // A store of the 1 unhealthy pet blocking progress for a given ps + blockingPetStore *unhealthyPetTracker + + // Controllers that need to be synced. + queue *workqueue.Type + + // syncHandler handles sync events for petsets. + // Abstracted as a func to allow injection for testing. + syncHandler func(psKey string) []error +} + +// NewPetSetController creates a new petset controller. +func NewPetSetController(podInformer framework.SharedInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "petset"}) + pc := &apiServerPetClient{kubeClient, recorder, &defaultPetHealthChecker{}} + + psc := &PetSetController{ + kubeClient: kubeClient, + blockingPetStore: newUnHealthyPetTracker(pc), + newSyncer: func(blockingPet *pcb) *petSyncer { + return &petSyncer{pc, blockingPet} + }, + queue: workqueue.New(), + } + + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + // lookup the petset and enqueue + AddFunc: psc.addPod, + // lookup current and old petset if labels changed + UpdateFunc: psc.updatePod, + // lookup petset accounting for deletion tombstones + DeleteFunc: psc.deletePod, + }) + psc.podStore.Store = podInformer.GetStore() + psc.podController = podInformer.GetController() + + psc.psStore.Store, psc.psController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return psc.kubeClient.Apps().PetSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return psc.kubeClient.Apps().PetSets(api.NamespaceAll).Watch(options) + }, + }, + &apps.PetSet{}, + petSetResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: psc.enqueuePetSet, + UpdateFunc: func(old, cur interface{}) { + oldPS := old.(*apps.PetSet) + curPS := cur.(*apps.PetSet) + if oldPS.Status.Replicas != curPS.Status.Replicas { + glog.V(4).Infof("Observed updated replica count for PetSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas) + } + psc.enqueuePetSet(cur) + }, + DeleteFunc: psc.enqueuePetSet, + }, + ) + // TODO: Watch volumes + psc.podStoreSynced = psc.podController.HasSynced + psc.syncHandler = psc.Sync + return psc +} + +// Run runs the petset controller. +func (psc *PetSetController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + glog.Infof("Starting petset controller") + go psc.podController.Run(stopCh) + go psc.psController.Run(stopCh) + for i := 0; i < workers; i++ { + go wait.Until(psc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down petset controller") + psc.queue.ShutDown() +} + +// addPod adds the petset for the pod to the sync queue +func (psc *PetSetController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) + ps := psc.getPetSetForPod(pod) + if ps == nil { + return + } + psc.enqueuePetSet(ps) +} + +// updatePod adds the petset for the current and old pods to the sync queue. +// If the labels of the pod didn't change, this method enqueues a single petset. +func (psc *PetSetController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + curPod := cur.(*api.Pod) + oldPod := old.(*api.Pod) + ps := psc.getPetSetForPod(curPod) + if ps == nil { + return + } + psc.enqueuePetSet(ps) + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + if oldPS := psc.getPetSetForPod(oldPod); oldPS != nil { + psc.enqueuePetSet(oldPS) + } + } +} + +// deletePod enqueues the petset for the pod accounting for deletion tombstones. +func (psc *PetSetController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new PetSet will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %+v", obj) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("tombstone contained object that is not a pod %+v", obj) + return + } + } + glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) + if ps := psc.getPetSetForPod(pod); ps != nil { + psc.enqueuePetSet(ps) + } +} + +// getPodsForPetSets returns the pods that match the selectors of the given petset. +func (psc *PetSetController) getPodsForPetSet(ps *apps.PetSet) ([]*api.Pod, error) { + // TODO: Do we want the petset to fight with RCs? check parent petset annoation, or name prefix? + sel, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + if err != nil { + return []*api.Pod{}, err + } + petList, err := psc.podStore.Pods(ps.Namespace).List(sel) + if err != nil { + return []*api.Pod{}, err + } + pods := []*api.Pod{} + for _, p := range petList.Items { + pods = append(pods, &p) + } + return pods, nil +} + +// getPetSetForPod returns the pet set managing the given pod. +func (psc *PetSetController) getPetSetForPod(pod *api.Pod) *apps.PetSet { + ps, err := psc.psStore.GetPodPetSets(pod) + if err != nil { + glog.V(4).Infof("No PetSets found for pod %v, PetSet controller will avoid syncing", pod.Name) + return nil + } + // Resolve a overlapping petset tie by creation timestamp. + // Let's hope users don't create overlapping petsets. + if len(ps) > 1 { + glog.Errorf("user error! more than one PetSet is selecting pods with labels: %+v", pod.Labels) + sort.Sort(overlappingPetSets(ps)) + } + return &ps[0] +} + +// enqueuePetSet enqueues the given petset in the work queue. +func (psc *PetSetController) enqueuePetSet(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Cound't get key for object %+v: %v", obj, err) + return + } + psc.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (psc *PetSetController) worker() { + for { + func() { + key, quit := psc.queue.Get() + if quit { + return + } + defer psc.queue.Done(key) + if errs := psc.syncHandler(key.(string)); len(errs) != 0 { + glog.Errorf("Error syncing PetSet %v, requeuing: %v", key.(string), errs) + psc.queue.Add(key) + } + }() + } +} + +// Sync syncs the given petset. +func (psc *PetSetController) Sync(key string) []error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing pet set %q (%v)", key, time.Now().Sub(startTime)) + }() + + if !psc.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(PodStoreSyncedPollPeriod) + return []error{fmt.Errorf("waiting for pods controller to sync")} + } + + obj, exists, err := psc.psStore.Store.GetByKey(key) + if !exists { + if err = psc.blockingPetStore.store.Delete(key); err != nil { + return []error{err} + } + glog.Infof("PetSet has been deleted %v", key) + return []error{} + } + if err != nil { + glog.Errorf("Unable to retrieve PetSet %v from store: %v", key, err) + return []error{err} + } + + ps := *obj.(*apps.PetSet) + petList, err := psc.getPodsForPetSet(&ps) + if err != nil { + return []error{err} + } + + numPets, errs := psc.syncPetSet(&ps, petList) + if err := updatePetCount(psc.kubeClient, ps, numPets); err != nil { + glog.Infof("Failed to update replica count for petset %v/%v; requeuing; error: %v", ps.Namespace, ps.Name, err) + errs = append(errs, err) + } + + return errs +} + +// syncPetSet syncs a tuple of (petset, pets). +func (psc *PetSetController) syncPetSet(ps *apps.PetSet, pets []*api.Pod) (int, []error) { + glog.Infof("Syncing PetSet %v/%v with %d pets", ps.Namespace, ps.Name, len(pets)) + + it := NewPetSetIterator(ps, pets) + blockingPet, err := psc.blockingPetStore.Get(ps, pets) + if err != nil { + return 0, []error{err} + } + if blockingPet != nil { + glog.Infof("PetSet %v blocked from scaling on pet %v", ps.Name, blockingPet.pod.Name) + } + petManager := psc.newSyncer(blockingPet) + numPets := 0 + + for it.Next() { + pet := it.Value() + if pet == nil { + continue + } + switch pet.event { + case syncPet: + err = petManager.Sync(pet) + if err == nil { + numPets++ + } + case deletePet: + err = petManager.Delete(pet) + } + if err != nil { + it.errs = append(it.errs, err) + } + } + + if err := psc.blockingPetStore.Add(petManager.blockingPet); err != nil { + it.errs = append(it.errs, err) + } + // TODO: GC pvcs. We can't delete them per pet because of grace period, and + // in fact we *don't want to* till petset is stable to guarantee that bugs + // in the controller don't corrupt user data. + return numPets, it.errs +} diff --git a/pkg/controller/petset/pet_set_test.go b/pkg/controller/petset/pet_set_test.go new file mode 100644 index 00000000000..e396f264cc4 --- /dev/null +++ b/pkg/controller/petset/pet_set_test.go @@ -0,0 +1,264 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "math/rand" + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" +) + +func newFakePetSetController() (*PetSetController, *fakePetClient) { + fpc := newFakePetClient() + return &PetSetController{ + kubeClient: nil, + blockingPetStore: newUnHealthyPetTracker(fpc), + podStoreSynced: func() bool { return true }, + psStore: cache.StoreToPetSetLister{Store: cache.NewStore(controller.KeyFunc)}, + podStore: cache.StoreToPodLister{Store: cache.NewStore(controller.KeyFunc)}, + newSyncer: func(blockingPet *pcb) *petSyncer { + return &petSyncer{fpc, blockingPet} + }, + }, fpc +} + +func checkPets(ps *apps.PetSet, creates, deletes int, fc *fakePetClient, t *testing.T) { + if fc.petsCreated != creates || fc.petsDeleted != deletes { + t.Errorf("Found (creates: %d, deletes: %d), expected (creates: %d, deletes: %d)", fc.petsCreated, fc.petsDeleted, creates, deletes) + } + gotClaims := map[string]api.PersistentVolumeClaim{} + for _, pvc := range fc.claims { + gotClaims[pvc.Name] = pvc + } + for i := range fc.pets { + expectedPet, _ := newPCB(fmt.Sprintf("%v", i), ps) + if identityHash(ps, fc.pets[i].pod) != identityHash(ps, expectedPet.pod) { + t.Errorf("Unexpected pet at index %d", i) + } + for _, pvc := range expectedPet.pvcs { + gotPVC, ok := gotClaims[pvc.Name] + if !ok { + t.Errorf("PVC %v not created for pet %v", pvc.Name, expectedPet.pod.Name) + } + if !reflect.DeepEqual(gotPVC.Spec, pvc.Spec) { + t.Errorf("got PVC %v differs from created pvc", pvc.Name) + } + } + } +} + +func scalePetSet(t *testing.T, ps *apps.PetSet, psc *PetSetController, fc *fakePetClient, scale int) []error { + errs := []error{} + for i := 0; i < scale; i++ { + pl := fc.getPodList() + if len(pl) != i { + t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets)) + } + _, syncErrs := psc.syncPetSet(ps, pl) + errs = append(errs, syncErrs...) + fc.setHealthy(i) + checkPets(ps, i+1, 0, fc, t) + } + return errs +} + +func saturatePetSet(t *testing.T, ps *apps.PetSet, psc *PetSetController, fc *fakePetClient) { + errs := scalePetSet(t, ps, psc, fc, ps.Spec.Replicas) + if len(errs) != 0 { + t.Errorf("%v", errs) + } +} + +func TestPetSetControllerCreates(t *testing.T) { + psc, fc := newFakePetSetController() + replicas := 3 + ps := newPetSet(replicas) + + saturatePetSet(t, ps, psc, fc) + + podList := fc.getPodList() + // Deleted pet gets recreated + fc.pets = fc.pets[:replicas-1] + if _, errs := psc.syncPetSet(ps, podList); len(errs) != 0 { + t.Errorf("%v", errs) + } + checkPets(ps, replicas+1, 0, fc, t) +} + +func TestPetSetControllerDeletes(t *testing.T) { + psc, fc := newFakePetSetController() + replicas := 4 + ps := newPetSet(replicas) + + saturatePetSet(t, ps, psc, fc) + + // Drain + errs := []error{} + ps.Spec.Replicas = 0 + knownPods := fc.getPodList() + for i := replicas - 1; i >= 0; i-- { + if len(fc.pets) != i+1 { + t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets)) + } + _, syncErrs := psc.syncPetSet(ps, knownPods) + errs = append(errs, syncErrs...) + } + if len(errs) != 0 { + t.Errorf("%v", errs) + } + checkPets(ps, replicas, replicas, fc, t) +} + +func TestPetSetControllerRespectsTermination(t *testing.T) { + psc, fc := newFakePetSetController() + replicas := 4 + ps := newPetSet(replicas) + + saturatePetSet(t, ps, psc, fc) + + fc.setDeletionTimestamp(replicas - 1) + ps.Spec.Replicas = 2 + _, errs := psc.syncPetSet(ps, fc.getPodList()) + if len(errs) != 0 { + t.Errorf("%v", errs) + } + // Finding a pod with the deletion timestamp will pause all deletions. + knownPods := fc.getPodList() + if len(knownPods) != 4 { + t.Errorf("Pods deleted prematurely before deletion timestamp expired, len %d", len(knownPods)) + } + fc.pets = fc.pets[:replicas-1] + _, errs = psc.syncPetSet(ps, fc.getPodList()) + if len(errs) != 0 { + t.Errorf("%v", errs) + } + checkPets(ps, replicas, 1, fc, t) +} + +func TestPetSetControllerRespectsOrder(t *testing.T) { + psc, fc := newFakePetSetController() + replicas := 4 + ps := newPetSet(replicas) + + saturatePetSet(t, ps, psc, fc) + + errs := []error{} + ps.Spec.Replicas = 0 + // Shuffle known list and check that pets are deleted in reverse + knownPods := fc.getPodList() + for i := range knownPods { + j := rand.Intn(i + 1) + knownPods[i], knownPods[j] = knownPods[j], knownPods[i] + } + + for i := 0; i < replicas; i++ { + if len(fc.pets) != replicas-i { + t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets)) + } + _, syncErrs := psc.syncPetSet(ps, knownPods) + errs = append(errs, syncErrs...) + checkPets(ps, replicas, i+1, fc, t) + } + if len(errs) != 0 { + t.Errorf("%v", errs) + } +} + +func TestPetSetControllerBlocksScaling(t *testing.T) { + psc, fc := newFakePetSetController() + replicas := 5 + ps := newPetSet(replicas) + scalePetSet(t, ps, psc, fc, 3) + + // Create 4th pet, then before flipping it to healthy, kill the first pet. + // There should only be 1 not-healty pet at a time. + pl := fc.getPodList() + if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 { + t.Errorf("%v", errs) + } + + deletedPod := pl[0] + fc.deletePetAtIndex(0) + pl = fc.getPodList() + if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 { + t.Errorf("%v", errs) + } + newPodList := fc.getPodList() + for _, p := range newPodList { + if p.Name == deletedPod.Name { + t.Errorf("Deleted pod was created while existing pod was unhealthy") + } + } + + fc.setHealthy(len(newPodList) - 1) + if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 { + t.Errorf("%v", errs) + } + + found := false + for _, p := range fc.getPodList() { + if p.Name == deletedPod.Name { + found = true + } + } + if !found { + t.Errorf("Deleted pod was not created after existing pods became healthy") + } +} + +func TestPetSetBlockingPetIsCleared(t *testing.T) { + psc, fc := newFakePetSetController() + ps := newPetSet(3) + scalePetSet(t, ps, psc, fc, 1) + + if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking != nil { + t.Errorf("Unexpected blocking pet %v, err %v", blocking, err) + } + + // 1 not yet healthy pet + psc.syncPetSet(ps, fc.getPodList()) + + if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking == nil { + t.Errorf("Expected blocking pet %v, err %v", blocking, err) + } + + // Deleting the petset should clear the blocking pet + if err := psc.psStore.Store.Delete(ps); err != nil { + t.Fatalf("Unable to delete pet %v from petset controller store.", ps.Name) + } + if errs := psc.Sync(fmt.Sprintf("%v/%v", ps.Namespace, ps.Name)); len(errs) != 0 { + t.Errorf("Error during sync of deleted petset %v", errs) + } + fc.pets = []*pcb{} + fc.petsCreated = 0 + if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking != nil { + t.Errorf("Unexpected blocking pet %v, err %v", blocking, err) + } + saturatePetSet(t, ps, psc, fc) + + // Make sure we don't leak the final blockin pet in the store + psc.syncPetSet(ps, fc.getPodList()) + if p, exists, err := psc.blockingPetStore.store.GetByKey(fmt.Sprintf("%v/%v", ps.Namespace, ps.Name)); err != nil || exists { + t.Errorf("Unexpected blocking pet, err %v: %+v", err, p) + } +} diff --git a/pkg/controller/petset/pet_set_utils.go b/pkg/controller/petset/pet_set_utils.go new file mode 100644 index 00000000000..d6d373050dd --- /dev/null +++ b/pkg/controller/petset/pet_set_utils.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package petset + +import ( + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + + "github.com/golang/glog" +) + +// overlappingPetSets sorts a list of PetSets by creation timestamp, using their names as a tie breaker. +// Generally used to tie break between PetSets that have overlapping selectors. +type overlappingPetSets []apps.PetSet + +func (o overlappingPetSets) Len() int { return len(o) } +func (o overlappingPetSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o overlappingPetSets) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} + +// updatePetCount attempts to update the Status.Replicas of the given PetSet, with a single GET/PUT retry. +func updatePetCount(kubeClient *client.Client, ps apps.PetSet, numPets int) (updateErr error) { + if ps.Status.Replicas == numPets || kubeClient == nil { + return nil + } + psClient := kubeClient.Apps().PetSets(ps.Namespace) + var getErr error + for i, ps := 0, &ps; ; i++ { + glog.V(4).Infof(fmt.Sprintf("Updating replica count for PetSet: %s/%s, ", ps.Namespace, ps.Name) + + fmt.Sprintf("replicas %d->%d (need %d), ", ps.Status.Replicas, numPets, ps.Spec.Replicas)) + + ps.Status = apps.PetSetStatus{Replicas: numPets} + _, updateErr = psClient.UpdateStatus(ps) + if updateErr == nil || i >= statusUpdateRetries { + return updateErr + } + if ps, getErr = psClient.Get(ps.Name); getErr != nil { + return getErr + } + } +} + +// claimClient returns the pvcClient for the given kubeClient/ns. +func claimClient(kubeClient *client.Client, ns string) client.PersistentVolumeClaimInterface { + return kubeClient.PersistentVolumeClaims(ns) +} + +// podClient returns the given podClient for the given kubeClient/ns. +func podClient(kubeClient *client.Client, ns string) client.PodInterface { + return kubeClient.Pods(ns) +} + +// unhealthyPetTracker tracks unhealthy pets for petsets. +type unhealthyPetTracker struct { + pc petClient + store cache.Store + storeLock sync.Mutex +} + +// Get returns a previously recorded blocking pet for the given petset. +func (u *unhealthyPetTracker) Get(ps *apps.PetSet, knownPets []*api.Pod) (*pcb, error) { + u.storeLock.Lock() + defer u.storeLock.Unlock() + + // We "Get" by key but "Add" by object because the store interface doesn't + // allow us to Get/Add a related obj (eg petset: blocking pet). + key, err := controller.KeyFunc(ps) + if err != nil { + return nil, err + } + obj, exists, err := u.store.GetByKey(key) + if err != nil { + return nil, err + } + + hc := defaultPetHealthChecker{} + // There's no unhealthy pet blocking a scale event, but this might be + // a controller manager restart. If it is, knownPets can be trusted. + if !exists { + for _, p := range knownPets { + if hc.isHealthy(p) && !hc.isDying(p) { + glog.V(4).Infof("Ignoring healthy pet %v for PetSet %v", p.Name, ps.Name) + continue + } + glog.Infof("No recorded blocking pet, but found unhealty pet %v for PetSet %v", p.Name, ps.Name) + return &pcb{pod: p, parent: ps}, nil + } + return nil, nil + } + + // This is a pet that's blocking further creates/deletes of a petset. If it + // disappears, it's no longer blocking. If it exists, it continues to block + // till it turns healthy or disappears. + bp := obj.(*pcb) + blockingPet, exists, err := u.pc.Get(bp) + if err != nil { + return nil, err + } + if !exists { + glog.V(4).Infof("Clearing blocking pet %v for PetSet %v because it's been deleted", bp.pod.Name, ps.Name) + return nil, nil + } + blockingPetPod := blockingPet.pod + if hc.isHealthy(blockingPetPod) && !hc.isDying(blockingPetPod) { + glog.V(4).Infof("Clearing blocking pet %v for PetSet %v because it's healthy", bp.pod.Name, ps.Name) + u.store.Delete(blockingPet) + blockingPet = nil + } + return blockingPet, nil +} + +// Add records the given pet as a blocking pet. +func (u *unhealthyPetTracker) Add(blockingPet *pcb) error { + u.storeLock.Lock() + defer u.storeLock.Unlock() + + if blockingPet == nil { + return nil + } + glog.V(4).Infof("Adding blocking pet %v for PetSet %v", blockingPet.pod.Name, blockingPet.parent.Name) + return u.store.Add(blockingPet) +} + +// newUnHealthyPetTracker tracks unhealthy pets that block progress of petsets. +func newUnHealthyPetTracker(pc petClient) *unhealthyPetTracker { + return &unhealthyPetTracker{pc: pc, store: cache.NewStore(pcbKeyFunc)} +} + +// pcbKeyFunc computes the key for a given pcb. +// If it's given a key, it simply returns it. +func pcbKeyFunc(obj interface{}) (string, error) { + if key, ok := obj.(string); ok { + return key, nil + } + p, ok := obj.(*pcb) + if !ok { + return "", fmt.Errorf("not a valid pet control block %+v", p) + } + if p.parent == nil { + return "", fmt.Errorf("cannot compute pet control block key without parent pointer %+v", p) + } + return controller.KeyFunc(p.parent) +} diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index be91bf85ad3..c95fa0c1053 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" @@ -99,6 +100,7 @@ func describerMap(c *client.Client) map[unversioned.GroupKind]Describer { extensions.Kind("Deployment"): &DeploymentDescriber{adapter.FromUnversionedClient(c)}, extensions.Kind("Job"): &JobDescriber{c}, batch.Kind("Job"): &JobDescriber{c}, + apps.Kind("PetSet"): &PetSetDescriber{c}, extensions.Kind("Ingress"): &IngressDescriber{c}, } @@ -1630,6 +1632,46 @@ func describeNode(node *api.Node, nodeNonTerminatedPodsList *api.PodList, events }) } +type PetSetDescriber struct { + client *client.Client +} + +func (p *PetSetDescriber) Describe(namespace, name string) (string, error) { + ps, err := p.client.Apps().PetSets(namespace).Get(name) + if err != nil { + return "", err + } + pc := p.client.Pods(namespace) + + selector, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + if err != nil { + return "", err + } + + running, waiting, succeeded, failed, err := getPodStatusForController(pc, selector) + if err != nil { + return "", err + } + + return tabbedString(func(out io.Writer) error { + fmt.Fprintf(out, "Name:\t%s\n", ps.Name) + fmt.Fprintf(out, "Namespace:\t%s\n", ps.Namespace) + fmt.Fprintf(out, "Image(s):\t%s\n", makeImageList(&ps.Spec.Template.Spec)) + fmt.Fprintf(out, "Selector:\t%s\n", unversioned.FormatLabelSelector(ps.Spec.Selector)) + fmt.Fprintf(out, "Labels:\t%s\n", labels.FormatLabels(ps.Labels)) + fmt.Fprintf(out, "Replicas:\t%d current / %d desired\n", ps.Status.Replicas, ps.Spec.Replicas) + fmt.Fprintf(out, "Annotations:\t%s\n", labels.FormatLabels(ps.Annotations)) + fmt.Fprintf(out, "CreationTimestamp:\t%s\n", ps.CreationTimestamp.Time.Format(time.RFC1123Z)) + fmt.Fprintf(out, "Pods Status:\t%d Running / %d Waiting / %d Succeeded / %d Failed\n", running, waiting, succeeded, failed) + describeVolumes(ps.Spec.Template.Spec.Volumes, out, "") + events, _ := p.client.Events(namespace).Search(ps) + if events != nil { + DescribeEvents(events, out) + } + return nil + }) +} + // HorizontalPodAutoscalerDescriber generates information about a horizontal pod autoscaler. type HorizontalPodAutoscalerDescriber struct { client *client.Client diff --git a/pkg/registry/petset/strategy.go b/pkg/registry/petset/strategy.go index 20a74c0174c..80bff2a5dde 100644 --- a/pkg/registry/petset/strategy.go +++ b/pkg/registry/petset/strategy.go @@ -86,13 +86,9 @@ func (petSetStrategy) AllowCreateOnUpdate() bool { // ValidateUpdate is the default update validation for an end user. func (petSetStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.ErrorList { - return field.ErrorList{field.Forbidden(field.NewPath("spec"), "updates to petset spec are forbidden.")} - - // TODO: For now we're taking the safe route and disallowing all updates to spec. - // Enable on a case by case basis. - //validationErrorList := validation.ValidatePetSet(obj.(*apps.PetSet)) - //updateErrorList := validation.ValidatePetSetUpdate(obj.(*apps.PetSet), old.(*apps.PetSet)) - //return append(validationErrorList, updateErrorList...) + validationErrorList := validation.ValidatePetSet(obj.(*apps.PetSet)) + updateErrorList := validation.ValidatePetSetUpdate(obj.(*apps.PetSet), old.(*apps.PetSet)) + return append(validationErrorList, updateErrorList...) } // AllowUnconditionalUpdate is the default update policy for PetSet objects. diff --git a/pkg/registry/petset/strategy_test.go b/pkg/registry/petset/strategy_test.go index 5869f4bdefb..1e28fa6199c 100644 --- a/pkg/registry/petset/strategy_test.go +++ b/pkg/registry/petset/strategy_test.go @@ -64,6 +64,7 @@ func TestPetSetStrategy(t *testing.T) { t.Errorf("Unexpected error validating %v", errs) } + // Just Spec.Replicas is allowed to change validPs := &apps.PetSet{ ObjectMeta: api.ObjectMeta{Name: ps.Name, Namespace: ps.Namespace}, Spec: apps.PetSetSpec{ @@ -74,6 +75,13 @@ func TestPetSetStrategy(t *testing.T) { } Strategy.PrepareForUpdate(validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) + if len(errs) != 0 { + t.Errorf("Updating spec.Replicas is allowed on a petset.") + } + + validPs.Spec.Selector = &unversioned.LabelSelector{MatchLabels: map[string]string{"a": "bar"}} + Strategy.PrepareForUpdate(validPs, ps) + errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) == 0 { t.Errorf("Expected a validation error since updates are disallowed on petsets.") } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index cfb0c201e35..71e8af7a077 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3743,3 +3743,21 @@ func CoreDump(dir string) { Logf("Error running cluster/log-dump.sh: %v", err) } } + +func UpdatePodWithRetries(client *client.Client, ns, name string, update func(*api.Pod)) (*api.Pod, error) { + for i := 0; i < 3; i++ { + pod, err := client.Pods(ns).Get(name) + if err != nil { + return nil, fmt.Errorf("Failed to get pod %q: %v", name, err) + } + update(pod) + pod, err = client.Pods(ns).Update(pod) + if err == nil { + return pod, nil + } + if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) { + return nil, fmt.Errorf("Failed to update pod %q: %v", name, err) + } + } + return nil, fmt.Errorf("Too many retries updating Pod %q", name) +} diff --git a/test/e2e/petset.go b/test/e2e/petset.go new file mode 100644 index 00000000000..f78024aa60a --- /dev/null +++ b/test/e2e/petset.go @@ -0,0 +1,378 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strconv" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/api" + apierrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/petset" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + "speter.net/go/exp/math/dec/inf" +) + +const ( + petsetPoll = 10 * time.Second + petsetTimeout = 5 * time.Minute +) + +var _ = framework.KubeDescribe("PetSet", func() { + f := framework.NewDefaultFramework("petset") + psName := "pet" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + var ns string + + var c *client.Client + BeforeEach(func() { + var err error + c, err = framework.LoadClient() + Expect(err).NotTo(HaveOccurred()) + ns = f.Namespace.Name + + By("creating service " + headlessSvcName + " in namespace " + ns) + headlessService := createServiceSpec(headlessSvcName, true, labels) + _, err = c.Services(ns).Create(headlessService) + Expect(err).NotTo(HaveOccurred()) + }) + + It("provide basic identity [Feature:PetSet]", func() { + By("creating petset " + psName + " in namespace " + ns) + defer func() { + err := c.Apps().PetSets(ns).Delete(psName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + + petMounts := []api.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + podMounts := []api.VolumeMount{{Name: "home", MountPath: "/home"}} + ps := newPetSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels) + _, err := c.Apps().PetSets(ns).Create(ps) + Expect(err).NotTo(HaveOccurred()) + + pt := petTester{c: c} + + By("Saturating pet set " + ps.Name) + pt.saturate(ps) + + cmd := "echo $(hostname) > /data/hostname" + By("Running " + cmd + " in all pets") + pt.execInPets(ps, cmd) + + By("Restarting pet set " + ps.Name) + pt.restart(ps) + pt.saturate(ps) + + cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi" + By("Running " + cmd + " in all pets") + pt.execInPets(ps, cmd) + }) + + It("should handle healthy pet restarts during scale [Feature:PetSet]", func() { + By("creating petset " + psName + " in namespace " + ns) + defer func() { + err := c.Apps().PetSets(ns).Delete(psName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + + petMounts := []api.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + podMounts := []api.VolumeMount{{Name: "home", MountPath: "/home"}} + ps := newPetSet(psName, ns, headlessSvcName, 2, petMounts, podMounts, labels) + _, err := c.Apps().PetSets(ns).Create(ps) + Expect(err).NotTo(HaveOccurred()) + + pt := petTester{c: c} + pt.waitForRunning(1, ps) + + By("Marking pet at index 0 as healthy.") + pt.setHealthy(ps) + + By("Waiting for pet at index 1 to enter running.") + pt.waitForRunning(2, ps) + + // Now we have 1 healthy and 1 unhealthy pet. Deleting the healthy pet should *not* + // create a new pet till the remaining pet becomes healthy, which won't happen till + // we set the healthy bit. + + By("Deleting healthy pet at index 0.") + pt.deletePetAtIndex(0, ps) + + By("Confirming pet at index 0 is not recreated.") + pt.confirmPetCount(1, ps, 10*time.Second) + + By("Deleting unhealthy pet at index 1.") + pt.deletePetAtIndex(1, ps) + + By("Confirming all pets in petset are created.") + pt.saturate(ps) + }) +}) + +type petTester struct { + c *client.Client +} + +func (p *petTester) execInPets(ps *apps.PetSet, cmd string) { + podList := p.getPodList(ps) + for _, pet := range podList.Items { + stdout, err := framework.RunHostCmd(pet.Namespace, pet.Name, cmd) + ExpectNoError(err) + framework.Logf("stdout %v on %v: %v", cmd, pet.Name, stdout) + } +} + +func (p *petTester) saturate(ps *apps.PetSet) { + // TOOD: Watch events and check that creation timestamps don't overlap + for i := 0; i < ps.Spec.Replicas; i++ { + framework.Logf("Waiting for pet at index " + fmt.Sprintf("%v", i+1) + " to enter Running") + p.waitForRunning(i+1, ps) + framework.Logf("Marking pet at index " + fmt.Sprintf("%v", i) + " healthy") + p.setHealthy(ps) + } +} + +func (p *petTester) deletePetAtIndex(index int, ps *apps.PetSet) { + // TODO: we won't use "-index" as the name strategy forever, + // pull the name out from an identity mapper. + name := fmt.Sprintf("%v-%v", ps.Name, index) + noGrace := int64(0) + if err := p.c.Pods(ps.Namespace).Delete(name, &api.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil { + framework.Failf("Failed to delete pet %v for PetSet %v: %v", name, ps.Name, ps.Namespace, err) + } +} + +func (p *petTester) restart(ps *apps.PetSet) { + name := ps.Name + ns := ps.Namespace + oldReplicas := ps.Spec.Replicas + p.update(ns, name, func(ps *apps.PetSet) { ps.Spec.Replicas = 0 }) + + var petList *api.PodList + pollErr := wait.PollImmediate(petsetPoll, petsetTimeout, func() (bool, error) { + petList = p.getPodList(ps) + if len(petList.Items) == 0 { + return true, nil + } + return false, nil + }) + if pollErr != nil { + ts := []string{} + for _, pet := range petList.Items { + if pet.DeletionTimestamp != nil { + ts = append(ts, fmt.Sprintf("%v", pet.DeletionTimestamp.Time)) + } + } + framework.Failf("Failed to scale petset down to 0, %d remaining pods with deletion timestamps: %v", len(petList.Items), ts) + } + p.update(ns, name, func(ps *apps.PetSet) { ps.Spec.Replicas = oldReplicas }) +} + +func (p *petTester) update(ns, name string, update func(ps *apps.PetSet)) { + for i := 0; i < 3; i++ { + ps, err := p.c.Apps().PetSets(ns).Get(name) + if err != nil { + framework.Failf("failed to get petset %q: %v", name, err) + } + update(ps) + ps, err = p.c.Apps().PetSets(ns).Update(ps) + if err == nil { + return + } + if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) { + framework.Failf("failed to update petset %q: %v", name, err) + } + } + framework.Failf("too many retries draining petset %q", name) +} + +func (p *petTester) getPodList(ps *apps.PetSet) *api.PodList { + selector, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + ExpectNoError(err) + podList, err := p.c.Pods(ps.Namespace).List(api.ListOptions{LabelSelector: selector}) + ExpectNoError(err) + return podList +} + +func ExpectNoError(err error) { + Expect(err).NotTo(HaveOccurred()) +} + +func (p *petTester) confirmPetCount(count int, ps *apps.PetSet, timeout time.Duration) { + start := time.Now() + deadline := start.Add(timeout) + for t := time.Now(); t.Before(deadline); t = time.Now() { + podList := p.getPodList(ps) + petCount := len(podList.Items) + if petCount != count { + framework.Failf("PetSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ps.Name, count, len(podList.Items), podList) + } + framework.Logf("Verifying petset %v doesn't scale past %d for another %+v", ps.Name, count, deadline.Sub(t)) + time.Sleep(1 * time.Second) + } +} + +func (p *petTester) waitForRunning(numPets int, ps *apps.PetSet) { + pollErr := wait.PollImmediate(petsetPoll, petsetTimeout, + func() (bool, error) { + podList := p.getPodList(ps) + if len(podList.Items) < numPets { + framework.Logf("Found %d pets, waiting for %d", len(podList.Items), numPets) + return false, nil + } + if len(podList.Items) > numPets { + return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numPods, len(podList.Items)) + } + for _, p := range podList.Items { + if p.Status.Phase != api.PodRunning { + framework.Logf("Waiting for pod %v to enter %v, currently %v", p.Name, api.PodRunning, p.Status.Phase) + return false, nil + } + } + return true, nil + }) + if pollErr != nil { + framework.Failf("Failed waiting for pods to enter running: %v", pollErr) + } +} + +func (p *petTester) setHealthy(ps *apps.PetSet) { + podList := p.getPodList(ps) + markedHealthyPod := "" + for _, pod := range podList.Items { + if pod.Status.Phase != api.PodRunning { + framework.Failf("Found pod in %v cannot set health", pod.Status.Phase) + } + if isInitialized(pod) { + continue + } + if markedHealthyPod != "" { + framework.Failf("Found multiple non-healthy pets: %v and %v", pod.Name, markedHealthyPod) + } + p, err := framework.UpdatePodWithRetries(p.c, pod.Namespace, pod.Name, func(up *api.Pod) { + up.Annotations[petset.PetSetInitAnnotation] = "true" + }) + ExpectNoError(err) + framework.Logf("Set annotation %v to %v on pod %v", petset.PetSetInitAnnotation, p.Annotations[petset.PetSetInitAnnotation], pod.Name) + markedHealthyPod = pod.Name + } +} + +func isInitialized(pod api.Pod) bool { + initialized, ok := pod.Annotations[petset.PetSetInitAnnotation] + if !ok { + return false + } + inited, err := strconv.ParseBool(initialized) + if err != nil { + framework.Failf("Couldn't parse petset init annotations %v", initialized) + } + return inited +} + +func dec(i int64, exponent int) *inf.Dec { + return inf.NewDec(i, inf.Scale(-exponent)) +} + +func newPVC(name string) api.PersistentVolumeClaim { + return api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + "volume.alpha.kubernetes.io/storage-class": "anything", + }, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + }, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceStorage: resource.Quantity{ + Amount: dec(1, 0), + Format: resource.BinarySI, + }, + }, + }, + }, + } +} + +func newPetSet(name, ns, governingSvcName string, replicas int, petMounts []api.VolumeMount, podMounts []api.VolumeMount, labels map[string]string) *apps.PetSet { + mounts := append(petMounts, podMounts...) + claims := []api.PersistentVolumeClaim{} + for _, m := range petMounts { + claims = append(claims, newPVC(m.Name)) + } + + vols := []api.Volume{} + for _, m := range podMounts { + vols = append(vols, api.Volume{ + Name: m.Name, + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: fmt.Sprintf("/tmp/%v", m.Name), + }, + }, + }) + } + + return &apps.PetSet{ + TypeMeta: unversioned.TypeMeta{ + Kind: "PetSet", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: apps.PetSetSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: labels, + }, + Replicas: replicas, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: labels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "nginx", + Image: "gcr.io/google_containers/nginx-slim:0.5", + VolumeMounts: mounts, + }, + }, + Volumes: vols, + }, + }, + VolumeClaimTemplates: claims, + ServiceName: governingSvcName, + }, + } +}