diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index eec5d2c8b13..ab7a6ac4ad5 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -74,6 +74,7 @@ type CMServer struct { ConcurrentDSCSyncs int ConcurrentJobSyncs int ConcurrentResourceQuotaSyncs int + ConcurrentDeploymentSyncs int ServiceSyncPeriod time.Duration NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -116,6 +117,7 @@ func NewCMServer() *CMServer { ConcurrentDSCSyncs: 2, ConcurrentJobSyncs: 5, ConcurrentResourceQuotaSyncs: 5, + ConcurrentDeploymentSyncs: 5, ServiceSyncPeriod: 5 * time.Minute, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 5 * time.Minute, @@ -189,6 +191,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load") fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -383,8 +386,8 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") - deployment.New(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")). - Run(s.DeploymentControllerSyncPeriod) + go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.ResyncPeriod). + Run(s.ConcurrentDeploymentSyncs, util.NeverStop) } } diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index 27446f7e657..f1b177b5283 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -60,6 +60,7 @@ kube-controller-manager --cloud-provider="": The provider for cloud services. Empty string for no provider. --cluster-cidr=: CIDR Range for Pods in cluster. --cluster-name="kubernetes": The instance prefix for the cluster + --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 75067beb4a8..d5dab5594a9 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -640,7 +640,7 @@ runTests() { # Post-Condition: hpa "frontend" has configuration annotation [[ "$(kubectl get hpa frontend -o yaml "${kube_flags[@]}" | grep kubectl.kubernetes.io/last-applied-configuration)" ]] # Clean up - kubectl delete rc,hpa frontend + kubectl delete rc,hpa frontend "${kube_flags[@]}" ## kubectl apply should create the resource that doesn't exist yet # Pre-Condition: no POD is running @@ -658,19 +658,20 @@ runTests() { # Pre-Condition: no Job is running kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" '' # Command - kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' + kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}" # Post-Condition: Job "pi" is created kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:' # Clean up - kubectl delete jobs pi + kubectl delete jobs pi "${kube_flags[@]}" # Pre-Condition: no Deployment is running kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" '' # Command - kubectl run nginx --image=nginx --generator=deployment/v1beta1 + kubectl run nginx --image=nginx --generator=deployment/v1beta1 "${kube_flags[@]}" # Post-Condition: Deployment "nginx" is created kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:' # Clean up - kubectl delete deployment nginx + kubectl delete deployment nginx "${kube_flags[@]}" + kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}" ############## # Namespaces # @@ -1050,11 +1051,12 @@ __EOF__ kubectl create -f examples/extensions/deployment.yaml "${kube_flags[@]}" kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:' # autoscale 2~3 pods, default CPU utilization (80%) - kubectl autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3 + kubectl-with-retry autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3 kube::test::get_object_assert 'hpa nginx-deployment' "{{$hpa_min_field}} {{$hpa_max_field}} {{$hpa_cpu_field}}" '2 3 80' # Clean up kubectl delete hpa nginx-deployment "${kube_flags[@]}" kubectl delete deployment nginx-deployment "${kube_flags[@]}" + kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}" ###################### # Multiple Resources # diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 45809340583..66976d54d87 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -45,6 +45,7 @@ cluster-dns cluster-domain cluster-name cluster-tag +concurrent-deployment-syncs concurrent-endpoint-syncs concurrent-resource-quota-syncs config-sync-period diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 721db581ccb..a7ee0c2e3b4 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -167,7 +167,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co var rc api.ReplicationController if len(pod.Labels) == 0 { - err = fmt.Errorf("No controllers found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) return } @@ -186,7 +186,61 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co controllers = append(controllers, rc) } if len(controllers) == 0 { - err = fmt.Errorf("Could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. +type StoreToDeploymentLister struct { + Store +} + +// Exists checks if the given deployment exists in the store. +func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) { + _, exists, err := s.Store.Get(deployment) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToDeploymentLister lists all deployments in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) { + for _, c := range s.Store.List() { + deployments = append(deployments, *(c.(*extensions.Deployment))) + } + return deployments, nil +} + +// GetDeploymentsForRC returns a list of deployments managing a replication controller. Returns an error only if no matching deployments are found. +func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationController) (deployments []extensions.Deployment, err error) { + var selector labels.Selector + var d extensions.Deployment + + if len(rc.Labels) == 0 { + err = fmt.Errorf("no deployments found for replication controller %v because it has no labels", rc.Name) + return + } + + // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label + for _, m := range s.Store.List() { + d = *m.(*extensions.Deployment) + if d.Namespace != rc.Namespace { + continue + } + labelSet := labels.Set(d.Spec.Selector) + selector = labels.Set(d.Spec.Selector).AsSelector() + + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(rc.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels) } return } @@ -221,7 +275,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex var daemonSet extensions.DaemonSet if len(pod.Labels) == 0 { - err = fmt.Errorf("No daemon sets found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name) return } @@ -243,7 +297,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex daemonSets = append(daemonSets, daemonSet) } if len(daemonSets) == 0 { - err = fmt.Errorf("Could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } @@ -283,7 +337,7 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv } } if len(services) == 0 { - err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return @@ -310,7 +364,7 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E return ep, nil } } - err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name) + err = fmt.Errorf("could not find endpoints for service: %v", svc.Name) return } @@ -342,7 +396,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err var job extensions.Job if len(pod.Labels) == 0 { - err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name) + err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) return } @@ -359,7 +413,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err jobs = append(jobs, job) } if len(jobs) == 0 { - err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 6b1267d27c7..a057ce7107a 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -234,7 +234,7 @@ type PodControlInterface interface { DeletePod(namespace string, podID string) error } -// RealPodControl is the default implementation of PodControllerInterface. +// RealPodControl is the default implementation of PodControlInterface. type RealPodControl struct { KubeClient client.Interface Recorder record.EventRecorder diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 854e2637c1e..590d0054a65 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -22,77 +22,380 @@ import ( "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" ) +const ( + // FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas + // of all deployments that have fulfilled their expectations at least this often. + // This recomputation happens based on contents in the local caches. + FullDeploymentResyncPeriod = 30 * time.Second +) + +// DeploymentController is responsible for synchronizing Deployment objects stored +// in the system with actual running rcs and pods. type DeploymentController struct { client client.Interface expClient client.ExtensionsInterface eventRecorder record.EventRecorder + + // To allow injection of syncDeployment for testing. + syncHandler func(dKey string) error + + // A store of deployments, populated by the dController + dStore cache.StoreToDeploymentLister + // Watches changes to all deployments + dController *framework.Controller + // A store of replication controllers, populated by the rcController + rcStore cache.StoreToReplicationControllerLister + // Watches changes to all replication controllers + rcController *framework.Controller + // rcStoreSynced returns true if the RC store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + rcStoreSynced func() bool + // A store of pods, populated by the podController + podStore cache.StoreToPodLister + // Watches changes to all pods + podController *framework.Controller + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool + + // A TTLCache of pod creates/deletes each deployment expects to see + expectations controller.ControllerExpectationsInterface + + // Deployments that need to be synced + queue *workqueue.Type } -func New(client client.Interface) *DeploymentController { +// NewDeploymentController creates a new DeploymentController. +func NewDeploymentController(client client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(client.Events("")) - return &DeploymentController{ + dc := &DeploymentController{ client: client, expClient: client.Extensions(), eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), + queue: workqueue.New(), + expectations: controller.NewControllerExpectations(), + } + + dc.dStore.Store, dc.dController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.expClient.Deployments(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.expClient.Deployments(api.NamespaceAll).Watch(options) + }, + }, + &extensions.Deployment{}, + FullDeploymentResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: dc.enqueueDeployment, + UpdateFunc: func(old, cur interface{}) { + // Resync on deployment object relist. + dc.enqueueDeployment(cur) + }, + // This will enter the sync loop and no-op, because the deployment has been deleted from the store. + DeleteFunc: dc.enqueueDeployment, + }, + ) + + dc.rcStore.Store, dc.rcController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.client.ReplicationControllers(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options) + }, + }, + &api.ReplicationController{}, + resyncPeriod(), + framework.ResourceEventHandlerFuncs{ + AddFunc: dc.addRC, + UpdateFunc: dc.updateRC, + DeleteFunc: dc.deleteRC, + }, + ) + + dc.podStore.Store, dc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.client.Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.client.Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, + resyncPeriod(), + framework.ResourceEventHandlerFuncs{ + // When pod updates (becomes ready), we need to enqueue deployment + UpdateFunc: dc.updatePod, + // When pod is deleted, we need to update deployment's expectations + DeleteFunc: dc.deletePod, + }, + ) + + dc.syncHandler = dc.syncDeployment + dc.rcStoreSynced = dc.rcController.HasSynced + dc.podStoreSynced = dc.podController.HasSynced + return dc +} + +// Run begins watching and syncing. +func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go dc.dController.Run(stopCh) + go dc.rcController.Run(stopCh) + go dc.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(dc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down deployment controller") + dc.queue.ShutDown() +} + +// addRC enqueues the deployment that manages an RC when the RC is created. +func (dc *DeploymentController) addRC(obj interface{}) { + rc := obj.(*api.ReplicationController) + glog.V(4).Infof("Replication controller %s added.", rc.Name) + if d := dc.getDeploymentForRC(rc); rc != nil { + dc.enqueueDeployment(d) } } -func (d *DeploymentController) Run(syncPeriod time.Duration) { - go util.Until(func() { - errs := d.reconcileDeployments() - for _, err := range errs { - glog.Errorf("Failed to reconcile: %v", err) - } - }, syncPeriod, util.NeverStop) +// getDeploymentForRC returns the deployment managing the given RC. +// TODO: Surface that we are ignoring multiple deployments for a given controller. +func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController) *extensions.Deployment { + deployments, err := dc.dStore.GetDeploymentsForRC(rc) + if err != nil || len(deployments) == 0 { + glog.V(4).Infof("Error: %v. No deployment found for replication controller %v, deployment controller will avoid syncing.", err, rc.Name) + return nil + } + // Because all RC's belonging to a deployment should have a unique label key, + // there should never be more than one deployment returned by the above method. + // If that happens we should probably dynamically repair the situation by ultimately + // trying to clean up one of the controllers, for now we just return one of the two, + // likely randomly. + return &deployments[0] } -func (d *DeploymentController) reconcileDeployments() []error { - list, err := d.expClient.Deployments(api.NamespaceAll).List(api.ListOptions{}) +// updateRC figures out what deployment(s) manage an RC when the RC is updated and +// wake them up. If the anything of the RCs have changed, we need to awaken both +// the old and new deployments. old and cur must be *api.ReplicationController types. +func (dc *DeploymentController) updateRC(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known controllers. + return + } + // TODO: Write a unittest for this case + curRC := cur.(*api.ReplicationController) + glog.V(4).Infof("Replication controller %s updated.", curRC.Name) + if d := dc.getDeploymentForRC(curRC); d != nil { + dc.enqueueDeployment(d) + } + // A number of things could affect the old deployment: labels changing, + // pod template changing, etc. + oldRC := old.(*api.ReplicationController) + if !api.Semantic.DeepEqual(oldRC, curRC) { + if oldD := dc.getDeploymentForRC(oldRC); oldD != nil { + dc.enqueueDeployment(oldD) + } + } +} + +// deleteRC enqueues the deployment that manages an RC when the RC is deleted. +// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown +// marker item. +func (dc *DeploymentController) deleteRC(obj interface{}) { + rc, ok := obj.(*api.ReplicationController) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the RC + // changed labels the new deployment will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod) + return + } + rc, ok = tombstone.Obj.(*api.ReplicationController) + if !ok { + glog.Errorf("Tombstone contained object that is not an rc %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod) + return + } + } + glog.V(4).Infof("Replication controller %s deleted.", rc.Name) + if d := dc.getDeploymentForRC(rc); d != nil { + dc.enqueueDeployment(d) + } +} + +// getDeploymentForPod returns the deployment managing the RC that manages the given Pod. +// TODO: Surface that we are ignoring multiple deployments for a given Pod. +func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { + rcs, err := dc.rcStore.GetPodControllers(pod) if err != nil { - return []error{fmt.Errorf("error listing deployments: %v", err)} + glog.V(4).Infof("Error: %v. No replication controllers found for pod %v, deployment controller will avoid syncing.", err, pod.Name) + return nil } - errs := []error{} - for _, deployment := range list.Items { - if err := d.reconcileDeployment(&deployment); err != nil { - errs = append(errs, fmt.Errorf("error in reconciling deployment %s: %v", deployment.Name, err)) + for _, rc := range rcs { + deployments, err := dc.dStore.GetDeploymentsForRC(&rc) + if err == nil && len(deployments) > 0 { + return &deployments[0] } } - return errs + glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name) + return nil } -func (d *DeploymentController) reconcileDeployment(deployment *extensions.Deployment) error { - switch deployment.Spec.Strategy.Type { - case extensions.RecreateDeploymentStrategyType: - return d.reconcileRecreateDeployment(*deployment) - case extensions.RollingUpdateDeploymentStrategyType: - return d.reconcileRollingUpdateDeployment(*deployment) +// updatePod figures out what deployment(s) manage the RC that manages the Pod when the Pod +// is updated and wake them up. If anything of the Pods have changed, we need to awaken both +// the old and new deployments. old and cur must be *api.Pod types. +func (dc *DeploymentController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + curPod := cur.(*api.Pod) + glog.V(4).Infof("Pod %s updated.", curPod.Name) + if d := dc.getDeploymentForPod(curPod); d != nil { + dc.enqueueDeployment(d) + } + oldPod := old.(*api.Pod) + if !api.Semantic.DeepEqual(oldPod, curPod) { + if oldD := dc.getDeploymentForPod(oldPod); oldD != nil { + dc.enqueueDeployment(oldD) + } } - return fmt.Errorf("unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type) } -func (d *DeploymentController) reconcileRecreateDeployment(deployment extensions.Deployment) error { +// When a pod is deleted, update expectations of the controller that manages the pod. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (dc *DeploymentController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new rc will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout) + return + } + } + glog.V(4).Infof("Pod %s deleted.", pod.Name) + if d := dc.getDeploymentForPod(pod); d != nil { + dKey, err := controller.KeyFunc(d) + if err != nil { + glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err) + return + } + dc.expectations.DeletionObserved(dKey) + } +} + +// obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item. +func (dc *DeploymentController) enqueueDeployment(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + // TODO: Handle overlapping deployments better. Either disallow them at admission time or + // deterministically avoid syncing deployments that fight over RC's. Currently, we only + // ensure that the same deployment is synced for a given RC. When we periodically relist + // all deployments there will still be some RC instability. One way to handle this is + // by querying the store for all deployments that this deployment overlaps, as well as all + // deployments that overlap this deployments, and sorting them. + dc.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (dc *DeploymentController) worker() { + for { + func() { + key, quit := dc.queue.Get() + if quit { + return + } + defer dc.queue.Done(key) + err := dc.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing deployment: %v", err) + } + }() + } +} + +// syncDeployment will sync the deployment with the given key. +// This function is not meant to be invoked concurrently with the same key. +func (dc *DeploymentController) syncDeployment(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) + }() + + obj, exists, err := dc.dStore.Store.GetByKey(key) + if err != nil { + glog.Infof("Unable to retrieve deployment %v from store: %v", key, err) + dc.queue.Add(key) + return err + } + if !exists { + glog.Infof("Deployment has been deleted %v", key) + dc.expectations.DeleteExpectations(key) + return nil + } + d := *obj.(*extensions.Deployment) + switch d.Spec.Strategy.Type { + case extensions.RecreateDeploymentStrategyType: + return dc.syncRecreateDeployment(d) + case extensions.RollingUpdateDeploymentStrategyType: + return dc.syncRollingUpdateDeployment(d) + } + return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) +} + +func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error { // TODO: implement me. return nil } -func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment extensions.Deployment) error { - newRC, err := d.getNewRC(deployment) +func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error { + newRC, err := dc.getNewRC(deployment) if err != nil { return err } - oldRCs, err := d.getOldRCs(deployment) + oldRCs, err := dc.getOldRCs(deployment) if err != nil { return err } @@ -100,36 +403,54 @@ func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment exten allRCs := append(oldRCs, newRC) // Scale up, if we can. - scaledUp, err := d.reconcileNewRC(allRCs, newRC, deployment) + scaledUp, err := dc.reconcileNewRC(allRCs, newRC, deployment) if err != nil { return err } if scaledUp { // Update DeploymentStatus - return d.updateDeploymentStatus(allRCs, newRC, deployment) + return dc.updateDeploymentStatus(allRCs, newRC, deployment) } // Scale down, if we can. - scaledDown, err := d.reconcileOldRCs(allRCs, oldRCs, newRC, deployment) + scaledDown, err := dc.reconcileOldRCs(allRCs, oldRCs, newRC, deployment, true) if err != nil { return err } if scaledDown { // Update DeploymentStatus - return d.updateDeploymentStatus(allRCs, newRC, deployment) + return dc.updateDeploymentStatus(allRCs, newRC, deployment) } + + // Sync deployment status + totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs) + updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC}) + if deployment.Status.Replicas != totalReplicas || deployment.Status.UpdatedReplicas != updatedReplicas { + return dc.updateDeploymentStatus(allRCs, newRC, deployment) + } + // TODO: raise an event, neither scaled up nor down. return nil } -func (d *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) { - return deploymentutil.GetOldRCs(deployment, d.client) +func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) { + return deploymentutil.GetOldRCsFromLists(deployment, dc.client, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector)) + return &podList, err + }, + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { + return dc.rcStore.List() + }) } // Returns an RC that matches the intent of the given deployment. // It creates a new RC if required. -func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) { - existingNewRC, err := deploymentutil.GetNewRC(deployment, d.client) +func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) { + existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client, + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { + return dc.rcStore.List() + }) if err != nil || existingNewRC != nil { return existingNewRC, err } @@ -151,21 +472,21 @@ func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api. Template: &newRCTemplate, }, } - createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC) + createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC) if err != nil { return nil, fmt.Errorf("error creating replication controller: %v", err) } return createdRC, nil } -func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) { +func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) { if newRC.Spec.Replicas == deployment.Spec.Replicas { // Scaling not required. return false, nil } if newRC.Spec.Replicas > deployment.Spec.Replicas { // Scale down. - _, err := d.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment) + _, err := dc.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment) return true, err } // Check if we can scale up. @@ -188,11 +509,12 @@ func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationControlle // Do not exceed the number of desired replicas. scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas))) newReplicasCount := newRC.Spec.Replicas + scaleUpCount - _, err = d.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment) + _, err = dc.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment) return true, err } -func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) { +// Set expectationsCheck to false to bypass expectations check when testing +func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment, expectationsCheck bool) (bool, error) { oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs) if oldPodsCount == 0 { // Cant scale down further @@ -208,8 +530,17 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable minReadySeconds := deployment.Spec.Strategy.RollingUpdate.MinReadySeconds + // Check the expectations of deployment before counting available pods + dKey, err := controller.KeyFunc(&deployment) + if err != nil { + return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) + } + if expectationsCheck && !dc.expectations.SatisfiedExpectations(dKey) { + fmt.Printf("Expectations not met yet before reconciling old RCs\n") + return false, nil + } // Find the number of ready pods. - readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(d.client, allRCs, minReadySeconds) + readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds) if err != nil { return false, fmt.Errorf("could not find available pods: %v", err) } @@ -231,16 +562,23 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll // Scale down. scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount))) newReplicasCount := targetRC.Spec.Replicas - scaleDownCount - _, err = d.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment) + _, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment) if err != nil { return false, err } totalScaleDownCount -= scaleDownCount + dKey, err := controller.KeyFunc(&deployment) + if err != nil { + return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) + } + if expectationsCheck { + dc.expectations.ExpectDeletions(dKey, scaleDownCount) + } } return true, err } -func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error { +func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error { totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs) updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC}) newDeployment := deployment @@ -249,29 +587,29 @@ func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationC Replicas: totalReplicas, UpdatedReplicas: updatedReplicas, } - _, err := d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment) + _, err := dc.expClient.Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment) return err } -func (d *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) { +func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) { scalingOperation := "down" if rc.Spec.Replicas < newScale { scalingOperation = "up" } - newRC, err := d.scaleRC(rc, newScale) + newRC, err := dc.scaleRC(rc, newScale) if err == nil { - d.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale) + dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale) } return newRC, err } -func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) { +func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) { // TODO: Using client for now, update to use store when it is ready. rc.Spec.Replicas = newScale - return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc) + return dc.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc) } -func (d *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) { +func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) { // TODO: Using client for now, update to use store when it is ready. - return d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) + return dc.expClient.Deployments(deployment.ObjectMeta.Namespace).Update(deployment) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 94d00a32522..df8403c2d82 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -21,10 +21,14 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" exp "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -190,7 +194,7 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) { client: fake, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment) + scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment, false) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -258,3 +262,159 @@ func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOr }, } } + +var alwaysReady = func() bool { return true } + +func newDeployment(replicas int) *exp.Deployment { + d := exp.Deployment{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: exp.DeploymentSpec{ + Strategy: exp.DeploymentStrategy{ + Type: exp.RollingUpdateDeploymentStrategyType, + RollingUpdate: &exp.RollingUpdateDeployment{}, + }, + Replicas: replicas, + Selector: map[string]string{"foo": "bar"}, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo/bar", + }, + }, + }, + }, + }, + } + return &d +} + +func getKey(d *exp.Deployment, t *testing.T) string { + if key, err := controller.KeyFunc(d); err != nil { + t.Errorf("Unexpected error getting key for deployment %v: %v", d.Name, err) + return "" + } else { + return key + } +} + +func newReplicationController(d *exp.Deployment, name string, replicas int) *api.ReplicationController { + return &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: 0, + Template: &d.Spec.Template, + }, + } + +} + +type fixture struct { + t *testing.T + + client *testclient.Fake + + // Objects to put in the store. + dStore []*exp.Deployment + rcStore []*api.ReplicationController + podStore []*api.Pod + + // Actions expected to happen on the client. Objects from here are also + // preloaded into NewSimpleFake. + actions []testclient.Action + objects *api.List +} + +func (f *fixture) expectUpdateDeploymentAction(d *exp.Deployment) { + f.actions = append(f.actions, testclient.NewUpdateAction("deployments", d.Namespace, d)) + f.objects.Items = append(f.objects.Items, d) +} + +func (f *fixture) expectCreateRCAction(rc *api.ReplicationController) { + f.actions = append(f.actions, testclient.NewCreateAction("replicationcontrollers", rc.Namespace, rc)) + f.objects.Items = append(f.objects.Items, rc) +} + +func (f *fixture) expectUpdateRCAction(rc *api.ReplicationController) { + f.actions = append(f.actions, testclient.NewUpdateAction("replicationcontrollers", rc.Namespace, rc)) + f.objects.Items = append(f.objects.Items, rc) +} + +func newFixture(t *testing.T) *fixture { + f := &fixture{} + f.t = t + f.objects = &api.List{} + return f +} + +func (f *fixture) run(deploymentName string) { + f.client = testclient.NewSimpleFake(f.objects) + c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc) + c.rcStoreSynced = alwaysReady + c.podStoreSynced = alwaysReady + for _, d := range f.dStore { + c.dStore.Store.Add(d) + } + for _, rc := range f.rcStore { + c.rcStore.Store.Add(rc) + } + for _, pod := range f.podStore { + c.podStore.Store.Add(pod) + } + + err := c.syncDeployment(deploymentName) + if err != nil { + f.t.Errorf("error syncing deployment: %v", err) + } + + actions := f.client.Actions() + for i, action := range actions { + if len(f.actions) < i+1 { + f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:]) + break + } + + expectedAction := f.actions[i] + if !expectedAction.Matches(action.GetVerb(), action.GetResource()) { + f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action) + continue + } + } + + if len(f.actions) > len(actions) { + f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):]) + } +} + +func TestSyncDeploymentCreatesRC(t *testing.T) { + f := newFixture(t) + + d := newDeployment(1) + f.dStore = append(f.dStore, d) + + // expect that one rc with zero replicas is created + // then is updated to 1 replica + rc := newReplicationController(d, "deploymentrc-4186632231", 0) + updatedRC := newReplicationController(d, "deploymentrc-4186632231", 1) + + f.expectCreateRCAction(rc) + f.expectUpdateRCAction(updatedRC) + f.expectUpdateDeploymentAction(d) + + f.run(getKey(d, t)) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 873f9af17fa..6ea68d16c19 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -210,7 +210,7 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon // overlap, sort by creation timestamp, subsort by name, then pick // the first. glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels) - sort.Sort(overlappingControllers(controllers)) + sort.Sort(OverlappingControllers(controllers)) } return &controllers[0] } diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index 5033c29011c..b300e092205 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -57,12 +57,12 @@ func updateReplicaCount(rcClient client.ReplicationControllerInterface, controll } // OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type overlappingControllers []api.ReplicationController +type OverlappingControllers []api.ReplicationController -func (o overlappingControllers) Len() int { return len(o) } -func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o OverlappingControllers) Len() int { return len(o) } +func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } -func (o overlappingControllers) Less(i, j int) bool { +func (o OverlappingControllers) Less(i, j int) bool { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name } diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 73dc7459978..5885339ea51 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -28,27 +28,39 @@ import ( "k8s.io/kubernetes/pkg/util" ) -// Returns the old RCs targetted by the given Deployment. +// GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface. func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) { + return GetOldRCsFromLists(deployment, c, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + return c.Pods(namespace).List(options) + }, + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { + rcList, err := c.ReplicationControllers(namespace).List(options) + return rcList.Items, err + }) +} + +// GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions. +func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace // 1. Find all pods whose labels match deployment.Spec.Selector selector := labels.SelectorFromSet(deployment.Spec.Selector) options := api.ListOptions{LabelSelector: selector} - podList, err := c.Pods(namespace).List(options) + podList, err := getPodList(namespace, options) if err != nil { return nil, fmt.Errorf("error listing pods: %v", err) } // 2. Find the corresponding RCs for pods in podList. // TODO: Right now we list all RCs and then filter. We should add an API for this. oldRCs := map[string]api.ReplicationController{} - rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{}) + rcList, err := getRcList(namespace, api.ListOptions{}) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) } newRCTemplate := GetNewRCTemplate(deployment) for _, pod := range podList.Items { podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) - for _, rc := range rcList.Items { + for _, rc := range rcList { rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector) if rcLabelsSelector.Matches(podLabelsSelector) { // Filter out RC that has the same pod template spec as the deployment - that is the new RC. @@ -67,20 +79,30 @@ func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.Rep return requiredRCs, nil } -// Returns an RC that matches the intent of the given deployment. +// GetNewRC returns an RC that matches the intent of the given deployment; get RCList from client interface. // Returns nil if the new RC doesnt exist yet. func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) { + return GetNewRCFromList(deployment, c, + func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) { + rcList, err := c.ReplicationControllers(namespace).List(options) + return rcList.Items, err + }) +} + +// GetNewRCFromList returns an RC that matches the intent of the given deployment; get RCList with the input function. +// Returns nil if the new RC doesnt exist yet. +func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) { namespace := deployment.ObjectMeta.Namespace - rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{}) + rcList, err := getRcList(namespace, api.ListOptions{}) if err != nil { return nil, fmt.Errorf("error listing replication controllers: %v", err) } newRCTemplate := GetNewRCTemplate(deployment) - for i := range rcList.Items { - if api.Semantic.DeepEqual(rcList.Items[i].Spec.Template, &newRCTemplate) { + for i := range rcList { + if api.Semantic.DeepEqual(rcList[i].Spec.Template, &newRCTemplate) { // This is the new RC. - return &rcList.Items[i], nil + return &rcList[i], nil } } // new RC does not exist. diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 3a9074b84ed..ffbf6f3dbf9 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -72,8 +72,14 @@ func testNewDeployment(f *Framework) { }) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() // Check that deployment is created fine. deployment, err := c.Deployments(ns).Get(deploymentName) @@ -166,8 +172,14 @@ func testRollingUpdateDeployment(f *Framework) { _, err = c.Deployments(ns).Create(&newDeployment) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() err = waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4, 0) @@ -178,9 +190,9 @@ func testRollingUpdateDeploymentEvents(f *Framework) { ns := f.Namespace.Name c := f.Client // Create nginx pods. - deploymentPodLabels := map[string]string{"name": "sample-pod"} + deploymentPodLabels := map[string]string{"name": "sample-pod-2"} rcPodLabels := map[string]string{ - "name": "sample-pod", + "name": "sample-pod-2", "pod": "nginx", } rcName := "nginx-controller" @@ -212,14 +224,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) { Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred()) }() // Verify that the required pods have come up. - err = verifyPods(c, ns, "sample-pod", false, 1) + err = verifyPods(c, ns, "sample-pod-2", false, 1) if err != nil { Logf("error in waiting for pods to come up: %s", err) Expect(err).NotTo(HaveOccurred()) } // Create a deployment to delete nginx pods and instead bring up redis pods. - deploymentName := "redis-deployment" + deploymentName := "redis-deployment-2" Logf("Creating deployment %s", deploymentName) newDeployment := extensions.Deployment{ ObjectMeta: api.ObjectMeta{ @@ -247,8 +259,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) { _, err = c.Deployments(ns).Create(&newDeployment) Expect(err).NotTo(HaveOccurred()) defer func() { + deployment, err := c.Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) Logf("deleting deployment %s", deploymentName) Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete rcs with deployment + newRC, err := deploymentutil.GetNewRC(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred()) }() err = waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2, 0)