diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 26f6fe22ce4..f619df5bab7 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -115,7 +115,7 @@ func NewCMServer() *CMServer { NamespaceSyncPeriod: 5 * time.Minute, PVClaimBinderSyncPeriod: 10 * time.Second, HorizontalPodAutoscalerSyncPeriod: 30 * time.Second, - DeploymentControllerSyncPeriod: 1 * time.Minute, + DeploymentControllerSyncPeriod: 30 * time.Second, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, ClusterName: "kubernetes", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b50d2db4736..a13965f4a6c 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -32,14 +33,20 @@ import ( ) type DeploymentController struct { - client client.Interface - expClient client.ExperimentalInterface + client client.Interface + expClient client.ExperimentalInterface + eventRecorder record.EventRecorder } func New(client client.Interface) *DeploymentController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(client.Events("")) + return &DeploymentController{ - client: client, - expClient: client.Experimental(), + client: client, + expClient: client.Experimental(), + eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), } } @@ -92,9 +99,7 @@ func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment exper return err } - allRCs := []*api.ReplicationController{} - allRCs = append(allRCs, oldRCs...) - allRCs = append(allRCs, newRC) + allRCs := append(oldRCs, newRC) // Scale up, if we can. scaledUp, err := d.scaleUp(allRCs, newRC, deployment) @@ -153,26 +158,6 @@ func (d *DeploymentController) getNewRC(deployment experimental.Deployment) (*ap return createdRC, nil } -func (d *DeploymentController) getPodsForRCs(replicationControllers []*api.ReplicationController) ([]api.Pod, error) { - allPods := []api.Pod{} - for _, rc := range replicationControllers { - podList, err := d.client.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything()) - if err != nil { - return allPods, fmt.Errorf("error listing pods: %v", err) - } - allPods = append(allPods, podList.Items...) - } - return allPods, nil -} - -func (d *DeploymentController) getReplicaCountForRCs(replicationControllers []*api.ReplicationController) int { - totalReplicaCount := 0 - for _, rc := range replicationControllers { - totalReplicaCount += rc.Spec.Replicas - } - return totalReplicaCount -} - func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) { if newRC.Spec.Replicas == deployment.Spec.Replicas { // Scaling up not required. @@ -186,11 +171,7 @@ func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newR maxSurge = util.GetValueFromPercent(maxSurge, deployment.Spec.Replicas) } // Find the total number of pods - allPods, err := d.getPodsForRCs(allRCs) - if err != nil { - return false, err - } - currentPodCount := len(allPods) + currentPodCount := deploymentUtil.GetReplicaCountForRCs(allRCs) // Check if we can scale up. maxTotalPods := deployment.Spec.Replicas + maxSurge if currentPodCount >= maxTotalPods { @@ -200,12 +181,16 @@ func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newR // Scale up. scaleUpCount := maxTotalPods - currentPodCount scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas))) - _, err = d.scaleRC(newRC, newRC.Spec.Replicas+scaleUpCount) + newReplicasCount := newRC.Spec.Replicas + scaleUpCount + _, err = d.scaleRC(newRC, newReplicasCount) + if err == nil { + d.eventRecorder.Eventf(&deployment, "ScalingRC", "Scaled up rc %s to %d", newRC.Name, newReplicasCount) + } return true, err } func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) { - oldPodsCount := d.getReplicaCountForRCs(oldRCs) + oldPodsCount := deploymentUtil.GetReplicaCountForRCs(oldRCs) if oldPodsCount == 0 { // Cant scale down further return false, nil @@ -220,13 +205,9 @@ func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, ol // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable // Find the number of ready pods. - // TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged. - readyPodCount := 0 - allPods, err := d.getPodsForRCs(allRCs) - for _, pod := range allPods { - if api.IsPodReady(&pod) { - readyPodCount++ - } + readyPodCount, err := deploymentUtil.GetAvailablePodsForRCs(d.client, allRCs) + if err != nil { + return false, fmt.Errorf("could not find available pods: %v", err) } if readyPodCount <= minAvailable { @@ -245,18 +226,20 @@ func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, ol } // Scale down. scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount))) - _, err = d.scaleRC(targetRC, targetRC.Spec.Replicas-scaleDownCount) + newReplicasCount := targetRC.Spec.Replicas - scaleDownCount + _, err = d.scaleRC(targetRC, newReplicasCount) if err != nil { return false, err } + d.eventRecorder.Eventf(&deployment, "ScalingRC", "Scaled down rc %s to %d", targetRC.Name, newReplicasCount) totalScaleDownCount -= scaleDownCount } return true, err } func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) error { - totalReplicas := d.getReplicaCountForRCs(allRCs) - updatedReplicas := d.getReplicaCountForRCs([]*api.ReplicationController{newRC}) + totalReplicas := deploymentUtil.GetReplicaCountForRCs(allRCs) + updatedReplicas := deploymentUtil.GetReplicaCountForRCs([]*api.ReplicationController{newRC}) newDeployment := deployment // TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods. newDeployment.Status = experimental.DeploymentStatus{ diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index 3bfa9ab5ccb..35456a402c6 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -1435,6 +1435,10 @@ func (dd *DeploymentDescriber) Describe(namespace, name string) (string, error) } fmt.Fprintf(out, "NewReplicationController:\t%s\n", printReplicationControllersByLabels(newRCs)) } + events, err := dd.Events(namespace).Search(d) + if err == nil && events != nil { + DescribeEvents(events, out) + } return nil }) } diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 99c87390289..84e0dc8cf1c 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -107,3 +107,40 @@ func GetPodTemplateSpecHash(template *api.PodTemplateSpec) uint32 { util.DeepHashObject(podTemplateSpecHasher, template) return podTemplateSpecHasher.Sum32() } + +// Returns the sum of Replicas of the given replication controllers. +func GetReplicaCountForRCs(replicationControllers []*api.ReplicationController) int { + totalReplicaCount := 0 + for _, rc := range replicationControllers { + totalReplicaCount += rc.Spec.Replicas + } + return totalReplicaCount +} + +// Returns the number of available pods corresponding to the given RCs. +func GetAvailablePodsForRCs(c client.Interface, rcs []*api.ReplicationController) (int, error) { + // TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged. + allPods, err := getPodsForRCs(c, rcs) + if err != nil { + return 0, err + } + readyPodCount := 0 + for _, pod := range allPods { + if api.IsPodReady(&pod) { + readyPodCount++ + } + } + return readyPodCount, nil +} + +func getPodsForRCs(c client.Interface, replicationControllers []*api.ReplicationController) ([]api.Pod, error) { + allPods := []api.Pod{} + for _, rc := range replicationControllers { + podList, err := c.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything()) + if err != nil { + return allPods, fmt.Errorf("error listing pods: %v", err) + } + allPods = append(allPods, podList.Items...) + } + return allPods, nil +} diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index cc4aef5b9a0..96efce89715 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -17,8 +17,10 @@ limitations under the License. package e2e import ( + "fmt" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/experimental" + deploymentUtil "k8s.io/kubernetes/pkg/util/deployment" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -31,7 +33,10 @@ var _ = Describe("Deployment", func() { testNewDeployment(f) }) It("deployment should delete old pods and create new ones", func() { - testDeploymentDeletesOldPods(f) + testRollingUpdateDeployment(f) + }) + It("deployment should scale up and down in the right order", func() { + testRollingUpdateDeploymentEvents(f) }) }) @@ -72,13 +77,12 @@ func testNewDeployment(f *Framework) { // Check that deployment is created fine. deployment, err := c.Deployments(ns).Get(deploymentName) Expect(err).NotTo(HaveOccurred()) - Logf("Deployment: %s", deployment) // Verify that the required pods have come up. err = verifyPods(c, ns, "nginx", false, 1) if err != nil { Logf("error in waiting for pods to come up: %s", err) - return + Expect(err).NotTo(HaveOccurred()) } // DeploymentStatus should be appropriately updated. deployment, err = c.Deployments(ns).Get(deploymentName) @@ -87,57 +91,19 @@ func testNewDeployment(f *Framework) { Expect(deployment.Status.UpdatedReplicas).Should(Equal(1)) } -func testDeploymentDeletesOldPods(f *Framework) { +func testRollingUpdateDeployment(f *Framework) { ns := f.Namespace.Name c := f.Client - // Create redis pods. + // Create nginx pods. podLabels := map[string]string{"name": "sample-pod"} - rcName := "redis-controller" + rcName := "nginx-controller" _, err := c.ReplicationControllers(ns).Create(&api.ReplicationController{ ObjectMeta: api.ObjectMeta{ Name: rcName, }, Spec: api.ReplicationControllerSpec{ - Replicas: 1, + Replicas: 3, Selector: podLabels, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: podLabels, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "redis", - Image: "redis", - }, - }, - }, - }, - }, - }) - Expect(err).NotTo(HaveOccurred()) - defer func() { - Logf("deleting replication controller %s", rcName) - Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred()) - }() - // Verify that the required pods have come up. - err = verifyPods(c, ns, "sample-pod", false, 1) - if err != nil { - Logf("error in waiting for pods to come up: %s", err) - return - } - - // Create a deployment to delete redis pods and instead bring up nginx pods. - deploymentName := "nginx-deployment" - Logf("Creating deployment %s", deploymentName) - _, err = c.Deployments(ns).Create(&experimental.Deployment{ - ObjectMeta: api.ObjectMeta{ - Name: deploymentName, - }, - Spec: experimental.DeploymentSpec{ - Replicas: 1, - Selector: podLabels, - UniqueLabelKey: "deployment.kubernetes.io/podTemplateHash", Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ Labels: podLabels, @@ -154,16 +120,140 @@ func testDeploymentDeletesOldPods(f *Framework) { }, }) Expect(err).NotTo(HaveOccurred()) + defer func() { + Logf("deleting replication controller %s", rcName) + Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred()) + }() + // Verify that the required pods have come up. + err = verifyPods(c, ns, "sample-pod", false, 3) + if err != nil { + Logf("error in waiting for pods to come up: %s", err) + Expect(err).NotTo(HaveOccurred()) + } + + // Create a deployment to delete nginx pods and instead bring up redis pods. + deploymentName := "redis-deployment" + Logf("Creating deployment %s", deploymentName) + newDeployment := experimental.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: deploymentName, + }, + Spec: experimental.DeploymentSpec{ + Replicas: 3, + Selector: podLabels, + UniqueLabelKey: "deployment.kubernetes.io/podTemplateHash", + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: podLabels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "redis", + Image: "redis", + }, + }, + }, + }, + }, + } + _, err = c.Deployments(ns).Create(&newDeployment) + Expect(err).NotTo(HaveOccurred()) defer func() { Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) }() + waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4) +} + +func testRollingUpdateDeploymentEvents(f *Framework) { + ns := f.Namespace.Name + c := f.Client + // Create nginx pods. + podLabels := map[string]string{"name": "sample-pod"} + rcName := "nginx-controller" + _, err := c.ReplicationControllers(ns).Create(&api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: rcName, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: 1, + Selector: podLabels, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: podLabels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + defer func() { + Logf("deleting replication controller %s", rcName) + Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred()) + }() // Verify that the required pods have come up. - verifyPods(c, ns, "nginx", false, 1) - // DeploymentStatus should be appropriately updated. + err = verifyPods(c, ns, "sample-pod", false, 1) + if err != nil { + Logf("error in waiting for pods to come up: %s", err) + Expect(err).NotTo(HaveOccurred()) + } + + // Create a deployment to delete nginx pods and instead bring up redis pods. + deploymentName := "redis-deployment" + Logf("Creating deployment %s", deploymentName) + newDeployment := experimental.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: deploymentName, + }, + Spec: experimental.DeploymentSpec{ + Replicas: 1, + Selector: podLabels, + UniqueLabelKey: "deployment.kubernetes.io/podTemplateHash", + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: podLabels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "redis", + Image: "redis", + }, + }, + }, + }, + }, + } + _, err = c.Deployments(ns).Create(&newDeployment) + Expect(err).NotTo(HaveOccurred()) + defer func() { + Logf("deleting deployment %s", deploymentName) + Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + }() + + waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2) + // Verify that the pods were scaled up and down as expected. We use events to verify that. deployment, err := c.Deployments(ns).Get(deploymentName) Expect(err).NotTo(HaveOccurred()) - Expect(deployment.Status.Replicas).Should(Equal(1)) - Expect(deployment.Status.UpdatedReplicas).Should(Equal(1)) + events, err := c.Events(ns).Search(deployment) + if err != nil { + Logf("error in listing events: %s", err) + Expect(err).NotTo(HaveOccurred()) + } + // There should be 2 events, one to scale up the new RC and then to scale down the old RC. + Expect(len(events.Items)).Should(Equal(2)) + newRC, err := deploymentUtil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(newRC).NotTo(Equal(nil)) + Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up rc %s to 1", newRC.Name))) + Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down rc %s to 0", rcName))) } diff --git a/test/e2e/util.go b/test/e2e/util.go index d24a6973ff7..82e196b9769 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + deploymentUtil "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" @@ -1568,6 +1569,55 @@ func waitForRCPodsGone(c *client.Client, rc *api.ReplicationController) error { }) } +// Waits for the deployment to reach desired state. +// Returns an error if minAvailable or maxCreated is broken at any times. +func waitForDeploymentStatus(c *client.Client, ns, deploymentName string, desiredUpdatedReplicas, minAvailable, maxCreated int) error { + return wait.Poll(poll, 2*time.Minute, func() (bool, error) { + + deployment, err := c.Deployments(ns).Get(deploymentName) + if err != nil { + return false, err + } + oldRCs, err := deploymentUtil.GetOldRCs(*deployment, c) + if err != nil { + return false, err + } + newRC, err := deploymentUtil.GetNewRC(*deployment, c) + if err != nil { + return false, err + } + if newRC == nil { + // New RC hasnt been created yet. + return false, nil + } + allRCs := append(oldRCs, newRC) + totalCreated := deploymentUtil.GetReplicaCountForRCs(allRCs) + totalAvailable, err := deploymentUtil.GetAvailablePodsForRCs(c, allRCs) + if err != nil { + return false, err + } + if totalCreated > maxCreated { + return false, fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated) + } + if totalAvailable < minAvailable { + return false, fmt.Errorf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable) + } + + if deployment.Status.Replicas == desiredUpdatedReplicas && + deployment.Status.UpdatedReplicas == desiredUpdatedReplicas { + // Verify RCs. + if deploymentUtil.GetReplicaCountForRCs(oldRCs) != 0 { + return false, fmt.Errorf("old RCs are not fully scaled down") + } + if deploymentUtil.GetReplicaCountForRCs([]*api.ReplicationController{newRC}) != desiredUpdatedReplicas { + return false, fmt.Errorf("new RCs is not fully scaled up") + } + return true, nil + } + return false, nil + }) +} + // Convenient wrapper around listing nodes supporting retries. func listNodes(c *client.Client, label labels.Selector, field fields.Selector) (*api.NodeList, error) { var nodes *api.NodeList