From 32d153093e7a33aefd1bbf1450e3ef505a4ec3bf Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 18 Nov 2015 15:12:11 -0800 Subject: [PATCH] Fix deployment tests failures; change ResyncPeriod --- .../app/controllermanager.go | 2 +- docs/admin/kube-controller-manager.md | 1 + hack/local-up-cluster.sh | 2 - hack/test-cmd.sh | 14 +- pkg/client/cache/listers.go | 22 +- pkg/controller/controller_utils.go | 30 +-- .../deployment/deployment_controller.go | 214 ++++++++++-------- .../deployment/deployment_controller_test.go | 2 +- pkg/util/deployment/deployment.go | 41 +++- test/e2e/deployment.go | 18 ++ 10 files changed, 193 insertions(+), 153 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index cf61249f225..ab7a6ac4ad5 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -386,7 +386,7 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") - deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")). + go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.ResyncPeriod). Run(s.ConcurrentDeploymentSyncs, util.NeverStop) } } diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index 27446f7e657..f1b177b5283 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -60,6 +60,7 @@ kube-controller-manager --cloud-provider="": The provider for cloud services. Empty string for no provider. --cluster-cidr=: CIDR Range for Pods in cluster. --cluster-name="kubernetes": The instance prefix for the cluster + --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index b93f098d5f0..234fb04450a 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -235,7 +235,6 @@ function start_apiserver { APISERVER_LOG=/tmp/kube-apiserver.log sudo -E "${GO_OUT}/kube-apiserver" ${priv_arg} ${runtime_config}\ --v=${LOG_LEVEL} \ - --runtime-config=experimental/v1=true \ --cert-dir="${CERT_DIR}" \ --service-account-key-file="${SERVICE_ACCOUNT_KEY}" \ --service-account-lookup="${SERVICE_ACCOUNT_LOOKUP}" \ @@ -256,7 +255,6 @@ function start_controller_manager { CTLRMGR_LOG=/tmp/kube-controller-manager.log sudo -E "${GO_OUT}/kube-controller-manager" \ --v=${LOG_LEVEL} \ - --enable-deployment-controller \ --service-account-private-key-file="${SERVICE_ACCOUNT_KEY}" \ --root-ca-file="${ROOT_CA_FILE}" \ --enable-hostpath-provisioner="${ENABLE_HOSTPATH_PROVISIONER}" \ diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 1a6ba00d75b..5a01d3dfc5f 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -640,7 +640,7 @@ runTests() { # Post-Condition: hpa "frontend" has configuration annotation [[ "$(kubectl get hpa frontend -o yaml "${kube_flags[@]}" | grep kubectl.kubernetes.io/last-applied-configuration)" ]] # Clean up - kubectl delete rc,hpa frontend + kubectl delete rc,hpa frontend "${kube_flags[@]}" ## kubectl apply should create the resource that doesn't exist yet # Pre-Condition: no POD is running @@ -658,19 +658,20 @@ runTests() { # Pre-Condition: no Job is running kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" '' # Command - kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' + kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}" # Post-Condition: Job "pi" is created kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:' # Clean up - kubectl delete jobs pi + kubectl delete jobs pi "${kube_flags[@]}" # Pre-Condition: no Deployment is running kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" '' # Command - kubectl run nginx --image=nginx --generator=deployment/v1beta1 + kubectl run nginx --image=nginx --generator=deployment/v1beta1 "${kube_flags[@]}" # Post-Condition: Deployment "nginx" is created kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:' # Clean up - kubectl delete deployment nginx + kubectl delete deployment nginx "${kube_flags[@]}" + kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}" ############## # Namespaces # @@ -1050,11 +1051,12 @@ __EOF__ kubectl create -f examples/extensions/deployment.yaml "${kube_flags[@]}" kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:' # autoscale 2~3 pods, default CPU utilization (80%) - kubectl autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3 + kubectl-with-retry autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3 kube::test::get_object_assert 'hpa nginx-deployment' "{{$hpa_min_field}} {{$hpa_max_field}} {{$hpa_cpu_field}}" '2 3 80' # Clean up kubectl delete hpa nginx-deployment "${kube_flags[@]}" kubectl delete deployment nginx-deployment "${kube_flags[@]}" + kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}" ###################### # Multiple Resources # diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 4307dd8d8a8..a7ee0c2e3b4 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -167,7 +167,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co var rc api.ReplicationController if len(pod.Labels) == 0 { - err = fmt.Errorf("No controllers found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) return } @@ -186,7 +186,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co controllers = append(controllers, rc) } if len(controllers) == 0 { - err = fmt.Errorf("Could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } @@ -220,7 +220,7 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl var d extensions.Deployment if len(rc.Labels) == 0 { - err = fmt.Errorf("No controllers found for replication controller %v because it has no labels", rc.Name) + err = fmt.Errorf("no deployments found for replication controller %v because it has no labels", rc.Name) return } @@ -233,14 +233,14 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl labelSet := labels.Set(d.Spec.Selector) selector = labels.Set(d.Spec.Selector).AsSelector() - // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(rc.Labels)) { continue } deployments = append(deployments, d) } if len(deployments) == 0 { - err = fmt.Errorf("Could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels) + err = fmt.Errorf("could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels) } return } @@ -275,7 +275,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex var daemonSet extensions.DaemonSet if len(pod.Labels) == 0 { - err = fmt.Errorf("No daemon sets found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name) return } @@ -297,7 +297,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex daemonSets = append(daemonSets, daemonSet) } if len(daemonSets) == 0 { - err = fmt.Errorf("Could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } @@ -337,7 +337,7 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv } } if len(services) == 0 { - err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return @@ -364,7 +364,7 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E return ep, nil } } - err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name) + err = fmt.Errorf("could not find endpoints for service: %v", svc.Name) return } @@ -396,7 +396,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err var job extensions.Job if len(pod.Labels) == 0 { - err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) return } @@ -413,7 +413,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err jobs = append(jobs, job) } if len(jobs) == 0 { - err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 0fde66772b1..a057ce7107a 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -223,34 +223,6 @@ func NewControllerExpectations() *ControllerExpectations { return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)} } -// RCControlInterface is an interface that knows how to add or delete -// replication controllers, as well as increment or decrement them. It is used -// by the deployment controller to ease testing of actions that it takes. -// @TODO Decide if we want to use this or use the kube client directly. -type RCControlInterface interface { - // CreateRC creates a new replication controller. - CreateRC(controller *api.ReplicationController) (*api.ReplicationController, error) - // ChangeReplicaCount increments or decrements a replication controller by - // a given amount. For decrementing, use negative numbers. - AdjustReplicaCount(controller *api.ReplicationController, count int) error -} - -// RealPodControl is the default implementation of PodControllerInterface. -type RealRCControl struct { - KubeClient client.Interface - Recorder record.EventRecorder -} - -func (r RealRCControl) CreateRC(controller *api.ReplicationController) (*api.ReplicationController, error) { - return r.KubeClient.ReplicationControllers(controller.Namespace).Create(controller) -} - -func (r RealRCControl) AdjustReplicaCount(controller *api.ReplicationController, count int) error { - controller.Spec.Replicas += count - _, err := r.KubeClient.ReplicationControllers(controller.Namespace).Update(controller) - return err -} - // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { @@ -262,7 +234,7 @@ type PodControlInterface interface { DeletePod(namespace string, podID string) error } -// RealPodControl is the default implementation of PodControllerInterface. +// RealPodControl is the default implementation of PodControlInterface. type RealPodControl struct { KubeClient client.Interface Recorder record.EventRecorder diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 3a8e0f155c2..8c707cc4b17 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -24,13 +24,13 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" @@ -40,27 +40,18 @@ import ( ) const ( - // We'll attempt to recompute the required replicas of all deployments - // that have fulfilled their expectations at least this often. This recomputation - // happens based on contents in the local caches. + // FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas + // of all deployments that have fulfilled their expectations at least this often. + // This recomputation happens based on contents in the local caches. FullDeploymentResyncPeriod = 30 * time.Second - - // We'll keep replication controller watches open up to this long. In the unlikely case - // that a watch misdelivers info about an RC, it'll take this long for - // that mistake to be rectified. - ControllerRelistPeriod = 5 * time.Minute - - // We'll keep pod watches open up to this long. In the unlikely case - // that a watch misdelivers info about a pod, it'll take this long for - // that mistake to be rectified. - PodRelistPeriod = 5 * time.Minute ) +// DeploymentController is responsible for synchronizing Deployment objects stored +// in the system with actual running rcs and pods. type DeploymentController struct { client client.Interface expClient client.ExtensionsInterface eventRecorder record.EventRecorder - rcControl controller.RCControlInterface // To allow injection of syncDeployment for testing. syncHandler func(dKey string) error @@ -88,7 +79,8 @@ type DeploymentController struct { queue *workqueue.Type } -func NewDeploymentController(client client.Interface) *DeploymentController { +// NewDeploymentController creates a new DeploymentController. +func NewDeploymentController(client client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(client.Events("")) @@ -103,10 +95,10 @@ func NewDeploymentController(client client.Interface) *DeploymentController { dc.dStore.Store, dc.dController = framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { - return dc.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + return dc.expClient.Deployments(api.NamespaceAll).List(unversioned.ListOptions{}) }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.expClient.Deployments(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) + WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + return dc.expClient.Deployments(api.NamespaceAll).Watch(options) }, }, &extensions.Deployment{}, @@ -118,8 +110,6 @@ func NewDeploymentController(client client.Interface) *DeploymentController { dc.enqueueDeployment(cur) }, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. - // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the deployment. DeleteFunc: dc.enqueueDeployment, }, ) @@ -127,14 +117,14 @@ func NewDeploymentController(client client.Interface) *DeploymentController { dc.rcStore.Store, dc.rcController = framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { - return dc.client.ReplicationControllers(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + return dc.client.ReplicationControllers(api.NamespaceAll).List(unversioned.ListOptions{}) }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.client.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) + WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options) }, }, &api.ReplicationController{}, - ControllerRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: dc.addRC, UpdateFunc: dc.updateRC, @@ -148,24 +138,44 @@ func NewDeploymentController(client client.Interface) *DeploymentController { dc.podStore.Store, dc.podController = framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { - return dc.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + return dc.client.Pods(api.NamespaceAll).List(unversioned.ListOptions{}) }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) + WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + return dc.client.Pods(api.NamespaceAll).Watch(options) }, }, &api.Pod{}, - PodRelistPeriod, - framework.ResourceEventHandlerFuncs{}, + resyncPeriod(), + framework.ResourceEventHandlerFuncs{ + // When pod updates (becomes ready), we need to enqueue deployment + UpdateFunc: dc.updatePod, + }, ) dc.syncHandler = dc.syncDeployment + dc.rcStoreSynced = dc.rcController.HasSynced + dc.podStoreSynced = dc.podController.HasSynced return dc } -// When an RC is created, enqueue the deployment that manages it. +// Run begins watching and syncing. +func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go dc.dController.Run(stopCh) + go dc.rcController.Run(stopCh) + go dc.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(dc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down deployment controller") + dc.queue.ShutDown() +} + +// addRC enqueues the deployment that manages an RC when the RC is created. func (dc *DeploymentController) addRC(obj interface{}) { rc := obj.(*api.ReplicationController) + glog.V(4).Infof("Replication controller %s added.", rc.Name) if d := dc.getDeploymentForRC(rc); rc != nil { dc.enqueueDeployment(d) } @@ -175,8 +185,8 @@ func (dc *DeploymentController) addRC(obj interface{}) { // TODO: Surface that we are ignoring multiple deployments for a given controller. func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController) *extensions.Deployment { deployments, err := dc.dStore.GetDeploymentsForRC(rc) - if err != nil { - glog.V(4).Infof("No deployments found for replication controller %v, deployment controller will avoid syncing", rc.Name) + if err != nil || len(deployments) == 0 { + glog.V(4).Infof("Error: %v. No deployment found for replication controller %v, deployment controller will avoid syncing.", err, rc.Name) return nil } // Because all RC's belonging to a deployment should have a unique label key, @@ -187,9 +197,9 @@ func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController return &deployments[0] } -// When a controller is updated, figure out what deployment/s manage it and wake them -// up. If the labels of the controller have changed we need to awaken both the old -// and new deployments. old and cur must be *api.ReplicationController types. +// updateRC figures out what deployment(s) manage an RC when the RC is updated and +// wake them up. If the anything of the RCs have changed, we need to awaken both +// the old and new deployments. old and cur must be *api.ReplicationController types. func (dc *DeploymentController) updateRC(old, cur interface{}) { if api.Semantic.DeepEqual(old, cur) { // A periodic relist will send update events for all known controllers. @@ -197,13 +207,13 @@ func (dc *DeploymentController) updateRC(old, cur interface{}) { } // TODO: Write a unittest for this case curRC := cur.(*api.ReplicationController) + glog.V(4).Infof("Replication controller %s updated.", curRC.Name) if d := dc.getDeploymentForRC(curRC); d != nil { dc.enqueueDeployment(d) } // A number of things could affect the old deployment: labels changing, // pod template changing, etc. oldRC := old.(*api.ReplicationController) - // TODO: Is this the right way to check this, or is checking names sufficient? if !api.Semantic.DeepEqual(oldRC, curRC) { if oldD := dc.getDeploymentForRC(oldRC); oldD != nil { dc.enqueueDeployment(oldD) @@ -211,11 +221,12 @@ func (dc *DeploymentController) updateRC(old, cur interface{}) { } } -// When a controller is deleted, enqueue the deployment that manages it. +// deleteRC enqueues the deployment that manages an RC when the RC is deleted. // obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown // marker item. func (dc *DeploymentController) deleteRC(obj interface{}) { rc, ok := obj.(*api.ReplicationController) + glog.V(4).Infof("Replication controller %s deleted.", rc.Name) // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains @@ -238,6 +249,44 @@ func (dc *DeploymentController) deleteRC(obj interface{}) { } } +// getDeploymentForPod returns the deployment managing the RC that manages the given Pod. +// TODO: Surface that we are ignoring multiple deployments for a given Pod. +func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { + rcs, err := dc.rcStore.GetPodControllers(pod) + if err != nil { + glog.V(4).Infof("Error: %v. No replication controllers found for pod %v, deployment controller will avoid syncing.", err, pod.Name) + return nil + } + for _, rc := range rcs { + deployments, err := dc.dStore.GetDeploymentsForRC(&rc) + if err == nil && len(deployments) > 0 { + return &deployments[0] + } + } + glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name) + return nil +} + +// updatePod figures out what deployment(s) manage the RC that manages the Pod when the Pod +// is updated and wake them up. If anything of the Pods have changed, we need to awaken both +// the old and new deployments. old and cur must be *api.Pod types. +func (dc *DeploymentController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + curPod := cur.(*api.Pod) + glog.V(4).Infof("Pod %s updated.", curPod.Name) + if d := dc.getDeploymentForPod(curPod); d != nil { + dc.enqueueDeployment(d) + } + oldPod := old.(*api.Pod) + if !api.Semantic.DeepEqual(oldPod, curPod) { + if oldD := dc.getDeploymentForPod(oldPod); oldD != nil { + dc.enqueueDeployment(oldD) + } + } +} + // obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item. func (dc *DeploymentController) enqueueDeployment(obj interface{}) { key, err := controller.KeyFunc(obj) @@ -255,19 +304,6 @@ func (dc *DeploymentController) enqueueDeployment(obj interface{}) { dc.queue.Add(key) } -func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() - go dc.dController.Run(stopCh) - go dc.rcController.Run(stopCh) - go dc.podController.Run(stopCh) - for i := 0; i < workers; i++ { - go util.Until(dc.worker, time.Second, stopCh) - } - <-stopCh - glog.Infof("Shutting down deployment controller") - dc.queue.ShutDown() -} - // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (dc *DeploymentController) worker() { @@ -286,6 +322,8 @@ func (dc *DeploymentController) worker() { } } +// syncDeployment will sync the deployment with the given key. +// This function is not meant to be invoked concurrently with the same key. func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() defer func() { @@ -293,15 +331,15 @@ func (dc *DeploymentController) syncDeployment(key string) error { }() obj, exists, err := dc.dStore.Store.GetByKey(key) - if !exists { - glog.Infof("Deployment has been deleted %v", key) - return nil - } if err != nil { glog.Infof("Unable to retrieve deployment %v from store: %v", key, err) dc.queue.Add(key) return err } + if !exists { + glog.Infof("Deployment has been deleted %v", key) + return nil + } d := *obj.(*extensions.Deployment) switch d.Spec.Strategy.Type { case extensions.RecreateDeploymentStrategyType: @@ -309,7 +347,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { case extensions.RollingUpdateDeploymentStrategyType: return dc.syncRollingUpdateDeployment(d) } - return fmt.Errorf("Unexpected deployment strategy type: %s", d.Spec.Strategy.Type) + return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error { @@ -349,14 +387,36 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension // Update DeploymentStatus return dc.updateDeploymentStatus(allRCs, newRC, deployment) } + + // Sync deployment status + totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs) + updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC}) + if deployment.Status.Replicas != totalReplicas || deployment.Status.UpdatedReplicas != updatedReplicas { + return dc.updateDeploymentStatus(allRCs, newRC, deployment) + } + // TODO: raise an event, neither scaled up nor down. return nil } +func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) { + return deploymentutil.GetOldRCsFromLists(deployment, dc.client, + func(namespace string, options unversioned.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector)) + return &podList, err + }, + func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + return dc.rcStore.List() + }) +} + // Returns an RC that matches the intent of the given deployment. // It creates a new RC if required. func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) { - existingNewRC, err := deploymentutil.GetNewRC(deployment, dc.client) + existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client, + func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + return dc.rcStore.List() + }) if err != nil || existingNewRC != nil { return existingNewRC, err } @@ -385,40 +445,6 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api return createdRC, nil } -func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) { - // TODO: (janet) HEAD >>> return deploymentutil.GetOldRCs(deployment, d.client) - namespace := deployment.ObjectMeta.Namespace - // 1. Find all pods whose labels match deployment.Spec.Selector - podList, err := dc.podStore.Pods(api.NamespaceAll).List(labels.SelectorFromSet(deployment.Spec.Selector)) - if err != nil { - return nil, fmt.Errorf("error listing pods: %v", err) - } - // 2. Find the corresponding RCs for pods in podList. - oldRCs := map[string]api.ReplicationController{} - rcList, err := dc.rcStore.List() - if err != nil { - return nil, fmt.Errorf("error listing replication controllers: %v", err) - } - for _, pod := range podList.Items { - podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) - for _, rc := range rcList { - rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector) - if rcLabelsSelector.Matches(podLabelsSelector) { - // Filter out RC that has the same pod template spec as the deployment - that is the new RC. - if api.Semantic.DeepEqual(rc.Spec.Template, deploymentutil.GetNewRCTemplate(deployment)) { - continue - } - oldRCs[rc.ObjectMeta.Name] = rc - } - } - } - rcSlice := []*api.ReplicationController{} - for _, value := range oldRCs { - rcSlice = append(rcSlice, &value) - } - return rcSlice, nil -} - func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) { if newRC.Spec.Replicas == deployment.Spec.Replicas { // Scaling not required. @@ -510,7 +536,7 @@ func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.Replication Replicas: totalReplicas, UpdatedReplicas: updatedReplicas, } - _, err := dc.client.Extensions().Deployments(api.NamespaceAll).UpdateStatus(&newDeployment) + _, err := dc.expClient.Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment) return err } @@ -521,7 +547,7 @@ func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationControl } newRC, err := dc.scaleRC(rc, newScale) if err == nil { - d.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale) + dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale) } return newRC, err } @@ -534,5 +560,5 @@ func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) { // TODO: Using client for now, update to use store when it is ready. - return dc.client.Extensions().Deployments(api.NamespaceAll).Update(deployment) + return dc.expClient.Deployments(deployment.ObjectMeta.Namespace).Update(deployment) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index e583ce17ea2..9dd0e5a6086 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -364,7 +364,7 @@ func newFixture(t *testing.T) *fixture { func (f *fixture) run(deploymentName string) { f.client = testclient.NewSimpleFake(f.objects) - c := NewDeploymentController(f.client) + c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc) c.rcStoreSynced = alwaysReady c.podStoreSynced = alwaysReady for _, d := range f.dStore { diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 51f0762895a..2cfd5843ea0 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -22,33 +22,46 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" ) -// Returns the old RCs targetted by the given Deployment. +// GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface. func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) { + return GetOldRCsFromLists(deployment, c, + func(namespace string, options unversioned.ListOptions) (*api.PodList, error) { + return c.Pods(namespace).List(options) + }, + func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + rcList, err := c.ReplicationControllers(namespace).List(options) + return rcList.Items, err + }) +} + +// GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions. +func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, unversioned.ListOptions) (*api.PodList, error), getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace // 1. Find all pods whose labels match deployment.Spec.Selector selector := labels.SelectorFromSet(deployment.Spec.Selector) options := api.ListOptions{LabelSelector: selector} - podList, err := c.Pods(namespace).List(options) + podList, err := getPodList(namespace, options) if err != nil { return nil, fmt.Errorf("error listing pods: %v", err) } // 2. Find the corresponding RCs for pods in podList. // TODO: Right now we list all RCs and then filter. We should add an API for this. oldRCs := map[string]api.ReplicationController{} - rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{}) + rcList, err := getRcList(namespace, api.ListOptions{}) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) } newRCTemplate := GetNewRCTemplate(deployment) for _, pod := range podList.Items { podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) - for _, rc := range rcList.Items { + for _, rc := range rcList { rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector) if rcLabelsSelector.Matches(podLabelsSelector) { // Filter out RC that has the same pod template spec as the deployment - that is the new RC. @@ -70,20 +83,30 @@ func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.Rep return requiredRCs, nil } -// Returns an RC that matches the intent of the given deployment. +// GetNewRC returns an RC that matches the intent of the given deployment; get RCList from client interface. // Returns nil if the new RC doesnt exist yet. func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) { + return GetNewRCFromList(deployment, c, + func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) { + rcList, err := c.ReplicationControllers(namespace).List(options) + return rcList.Items, err + }) +} + +// GetNewRCFromList returns an RC that matches the intent of the given deployment; get RCList with the input function. +// Returns nil if the new RC doesnt exist yet. +func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace - rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{}) + rcList, err := getRcList(namespace, api.ListOptions{}) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) } newRCTemplate := GetNewRCTemplate(deployment) - for i := range rcList.Items { - if api.Semantic.DeepEqual(rcList.Items[i].Spec.Template, &newRCTemplate) { + for i := range rcList { + if api.Semantic.DeepEqual(rcList[i].Spec.Template, &newRCTemplate) { // This is the new RC. - return &rcList.Items[i], nil + return &rcList[i], nil } } // new RC does not exist. diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 3a9074b84ed..bdc20716ae2 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -72,8 +72,14 @@ func testNewDeployment(f *Framework) { }) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() // Check that deployment is created fine. deployment, err := c.Deployments(ns).Get(deploymentName) @@ -166,8 +172,14 @@ func testRollingUpdateDeployment(f *Framework) { _, err = c.Deployments(ns).Create(&newDeployment) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() err = waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4, 0) @@ -247,8 +259,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) { _, err = c.Deployments(ns).Create(&newDeployment) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() err = waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2, 0)