diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b16ec67fac7..5120863e6ee 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -19,9 +19,9 @@ package deployment import ( "fmt" "hash/adler32" + "math" "time" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -45,7 +45,6 @@ func New(client client.Interface) *DeploymentController { func (d *DeploymentController) Run(syncPeriod time.Duration) { go util.Until(func() { if err := d.reconcileDeployments(); err != nil { - glog.Errorf("Couldnt reconcile deployments: %v", err) } }, syncPeriod, util.NeverStop) } @@ -64,24 +63,59 @@ func (d *DeploymentController) reconcileDeployments() error { } func (d *DeploymentController) reconcileDeployment(deployment *experimental.Deployment) error { - targetedRCs, err := d.getTargetedRCs(deployment) - if err != nil { - return err + switch deployment.Spec.Strategy.Type { + case experimental.RecreateDeploymentStrategyType: + return d.reconcileRecreateDeployment(*deployment) + case experimental.RollingUpdateDeploymentStrategyType: + return d.reconcileRollingUpdateDeployment(*deployment) } - desiredRC, err := d.getDesiredRC(deployment) - if err != nil { - return err - } - // TODO: Scale up and down the targeted and desired RCs. - // For now, just print their names, until we start doing something useful. - for _, targetedRC := range targetedRCs { - glog.Infof("TargetedRC: %s", targetedRC.ObjectMeta.Name) - } - glog.Infof("DesiredRC: %s", desiredRC.ObjectMeta.Name) + return fmt.Errorf("Unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type) +} + +func (d *DeploymentController) reconcileRecreateDeployment(deployment experimental.Deployment) error { + // TODO: implement me. return nil } -func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deployment) ([]api.ReplicationController, error) { +func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment experimental.Deployment) error { + newRC, err := d.getNewRC(deployment) + if err != nil { + return err + } + + oldRCs, err := d.getOldRCs(deployment) + if err != nil { + return err + } + + allRCs := []*api.ReplicationController{} + allRCs = append(allRCs, oldRCs...) + allRCs = append(allRCs, newRC) + + // Scale up, if we can. + scaledUp, err := d.scaleUp(allRCs, newRC, deployment) + if err != nil { + return err + } + if scaledUp { + // Update DeploymentStatus + return d.updateDeploymentStatus(allRCs, newRC, deployment) + } + + // Scale down, if we can. + scaledDown, err := d.scaleDown(allRCs, oldRCs, newRC, deployment) + if err != nil { + return err + } + if scaledDown { + // Update DeploymentStatus + return d.updateDeploymentStatus(allRCs, newRC, deployment) + } + // TODO: raise an event, neither scaled up nor down. + return nil +} + +func (d *DeploymentController) getOldRCs(deployment experimental.Deployment) ([]*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace // 1. Find all pods whose labels match deployment.Spec.Selector podList, err := d.client.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector), fields.Everything()) @@ -90,7 +124,7 @@ func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deploymen } // 2. Find the corresponding RCs for pods in podList. // TODO: Right now we list all RCs and then filter. We should add an API for this. - targetedRCs := map[string]api.ReplicationController{} + oldRCs := map[string]api.ReplicationController{} rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) @@ -100,51 +134,204 @@ func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deploymen for _, rc := range rcList.Items { rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector) if rcLabelsSelector.Matches(podLabelsSelector) { - targetedRCs[rc.ObjectMeta.Name] = rc - continue + // Filter out RC that has the same pod template spec as the deployment - that is the new RC. + if api.Semantic.DeepEqual(rc.Spec.Template, getNewRCTemplate(deployment)) { + continue + } + oldRCs[rc.ObjectMeta.Name] = rc } } } - requiredRCs := []api.ReplicationController{} - for _, value := range targetedRCs { - requiredRCs = append(requiredRCs, value) + requiredRCs := []*api.ReplicationController{} + for _, value := range oldRCs { + requiredRCs = append(requiredRCs, &value) } return requiredRCs, nil } // Returns an RC that matches the intent of the given deployment. // It creates a new RC if required. -func (d *DeploymentController) getDesiredRC(deployment *experimental.Deployment) (*api.ReplicationController, error) { +func (d *DeploymentController) getNewRC(deployment experimental.Deployment) (*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace // Find if the required RC exists already. rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) } + newRCTemplate := getNewRCTemplate(deployment) + for _, rc := range rcList.Items { - if api.Semantic.DeepEqual(rc.Spec.Template, deployment.Spec.Template) { - // This is the desired RC. + if api.Semantic.DeepEqual(rc.Spec.Template, newRCTemplate) { + // This is the new RC. return &rc, nil } } - // desired RC does not exist, create a new one. - podTemplateSpecHasher := adler32.New() - util.DeepHashObject(podTemplateSpecHasher, deployment.Spec.Template) - podTemplateSpecHash := podTemplateSpecHasher.Sum32() + // new RC does not exist, create one. + podTemplateSpecHash := getPodTemplateSpecHash(deployment.Spec.Template) rcName := fmt.Sprintf("deploymentrc-%d", podTemplateSpecHash) - desiredRC := api.ReplicationController{ + newRC := api.ReplicationController{ ObjectMeta: api.ObjectMeta{ Name: rcName, Namespace: namespace, }, Spec: api.ReplicationControllerSpec{ Replicas: 0, - Template: deployment.Spec.Template, + Selector: newRCTemplate.ObjectMeta.Labels, + Template: newRCTemplate, }, } - createdRC, err := d.client.ReplicationControllers(namespace).Create(&desiredRC) + createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC) if err != nil { return nil, fmt.Errorf("error creating replication controller: %v", err) } return createdRC, nil } + +func getNewRCTemplate(deployment experimental.Deployment) *api.PodTemplateSpec { + // newRC will have the same template as in deployment spec, plus a unique label in some cases. + newRCTemplate := &api.PodTemplateSpec{ + ObjectMeta: deployment.Spec.Template.ObjectMeta, + Spec: deployment.Spec.Template.Spec, + } + podTemplateSpecHash := getPodTemplateSpecHash(newRCTemplate) + if deployment.Spec.UniqueLabelKey != "" { + newLabels := map[string]string{} + for key, value := range deployment.Spec.Template.ObjectMeta.Labels { + newLabels[key] = value + } + newLabels[deployment.Spec.UniqueLabelKey] = fmt.Sprintf("%d", podTemplateSpecHash) + newRCTemplate.ObjectMeta.Labels = newLabels + } + return newRCTemplate +} + +func getPodTemplateSpecHash(template *api.PodTemplateSpec) uint32 { + podTemplateSpecHasher := adler32.New() + util.DeepHashObject(podTemplateSpecHasher, template) + return podTemplateSpecHasher.Sum32() +} + +func (d *DeploymentController) getPodsForRCs(replicationControllers []*api.ReplicationController) ([]api.Pod, error) { + allPods := []api.Pod{} + for _, rc := range replicationControllers { + podList, err := d.client.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything()) + if err != nil { + return allPods, fmt.Errorf("error listing pods: %v", err) + } + allPods = append(allPods, podList.Items...) + } + return allPods, nil +} + +func (d *DeploymentController) getReplicaCountForRCs(replicationControllers []*api.ReplicationController) int { + totalReplicaCount := 0 + for _, rc := range replicationControllers { + totalReplicaCount += rc.Spec.Replicas + } + return totalReplicaCount +} + +func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) { + if newRC.Spec.Replicas == deployment.Spec.Replicas { + // Scaling up not required. + return false, nil + } + maxSurge, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxSurge) + if err != nil { + return false, fmt.Errorf("Invalid value for MaxSurge: %v", err) + } + if isPercent { + maxSurge = util.GetValueFromPercent(maxSurge, deployment.Spec.Replicas) + } + // Find the total number of pods + allPods, err := d.getPodsForRCs(allRCs) + if err != nil { + return false, err + } + currentPodCount := len(allPods) + // Check if we can scale up. + maxTotalPods := deployment.Spec.Replicas + maxSurge + if currentPodCount >= maxTotalPods { + // Cannot scale up. + return false, nil + } + // Scale up. + scaleUpCount := maxTotalPods - currentPodCount + scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas))) + _, err = d.scaleRC(newRC, newRC.Spec.Replicas+scaleUpCount) + return true, err +} + +func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) { + oldPodsCount := d.getReplicaCountForRCs(oldRCs) + if oldPodsCount == 0 { + // Cant scale down further + return false, nil + } + maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable) + if err != nil { + return false, fmt.Errorf("Invalid value for MaxUnavailable: %v", err) + } + if isPercent { + maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas) + } + // Check if we can scale down. + minAvailable := deployment.Spec.Replicas - maxUnavailable + // Find the number of ready pods. + // TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged. + readyPodCount := 0 + allPods, err := d.getPodsForRCs(allRCs) + for _, pod := range allPods { + if api.IsPodReady(&pod) { + readyPodCount++ + } + } + + if readyPodCount <= minAvailable { + // Cannot scale down. + return false, nil + } + totalScaleDownCount := readyPodCount - minAvailable + for _, targetRC := range oldRCs { + if totalScaleDownCount == 0 { + // No further scaling required. + break + } + if targetRC.Spec.Replicas == 0 { + // cannot scale down this RC. + continue + } + // Scale down. + scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount))) + _, err = d.scaleRC(targetRC, targetRC.Spec.Replicas-scaleDownCount) + if err != nil { + return false, err + } + totalScaleDownCount -= scaleDownCount + } + return true, err +} + +func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) error { + totalReplicas := d.getReplicaCountForRCs(allRCs) + updatedReplicas := d.getReplicaCountForRCs([]*api.ReplicationController{newRC}) + newDeployment := deployment + // TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods. + newDeployment.Status = experimental.DeploymentStatus{ + Replicas: totalReplicas, + UpdatedReplicas: updatedReplicas, + } + _, err := d.updateDeployment(&newDeployment) + return err +} + +func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) { + // TODO: Using client for now, update to use store when it is ready. + rc.Spec.Replicas = newScale + return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc) +} + +func (d *DeploymentController) updateDeployment(deployment *experimental.Deployment) (*experimental.Deployment, error) { + // TODO: Using client for now, update to use store when it is ready. + return d.client.Experimental().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 2f84a9b2d68..6cefef26f34 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net" "net/http" "os" @@ -191,6 +192,25 @@ func (intstr *IntOrString) Fuzz(c fuzz.Continue) { } } +func GetIntOrPercentValue(intStr *IntOrString) (int, bool, error) { + switch intStr.Kind { + case IntstrInt: + return intStr.IntVal, false, nil + case IntstrString: + s := strings.Replace(intStr.StrVal, "%", "", -1) + v, err := strconv.Atoi(s) + if err != nil { + return 0, false, fmt.Errorf("invalid value %q: %v", intStr.StrVal, err) + } + return v, true, nil + } + return 0, false, fmt.Errorf("invalid value: neither int nor percentage") +} + +func GetValueFromPercent(percent int, value int) int { + return int(math.Ceil(float64(percent) * (float64(value)) / 100)) +} + // Takes a list of strings and compiles them into a list of regular expressions func CompileRegexps(regexpStrings []string) ([]*regexp.Regexp, error) { regexps := []*regexp.Regexp{}