From 331083727f97b428025d44729afd5ad1681b3e8e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 17 Aug 2016 16:16:01 +0200 Subject: [PATCH 1/2] Change podNamespacer API --- .../mesos/pkg/service/endpoints_controller.go | 5 +++-- pkg/client/cache/listers.go | 20 ++++++++----------- pkg/client/cache/listers_test.go | 4 ++-- pkg/controller/controller_ref_manager.go | 10 +++++----- pkg/controller/controller_utils.go | 9 ++++----- pkg/controller/daemon/daemoncontroller.go | 8 +++++--- pkg/controller/deployment/sync.go | 14 ++++++++++--- pkg/controller/disruption/disruption.go | 12 +++++------ .../endpoint/endpoints_controller.go | 5 +++-- pkg/controller/job/jobcontroller.go | 11 +++++----- pkg/controller/petset/pet_set.go | 11 +++++----- pkg/controller/replicaset/replica_set.go | 11 ++++++---- .../replication/replication_controller.go | 11 ++++++---- 13 files changed, 73 insertions(+), 58 deletions(-) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 199e0051a7b..56bcafb7644 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -292,8 +292,9 @@ func (e *endpointController) syncService(key string) { subsets := []api.EndpointSubset{} containerPortAnnotations := map[string]string{} // by : - for i := range pods.Items { - pod := &pods.Items[i] + for i := range pods { + // TODO: Do we need to copy here? + pod := &(*pods[i]) for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 8abde96c4ac..9de3f099feb 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -49,13 +49,9 @@ type StoreToPodLister struct { // Please note that selector is filtering among the pods that have gotten into // the store; there may have been some filtering that already happened before // that. -// -// TODO: converge on the interface in pkg/client. +// We explicitly don't return api.PodList, to avoid expensive allocations, which +// in most cases are unnecessary. func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { - // TODO: it'd be great to just call - // s.Pods(api.NamespaceAll).List(selector), however then we'd have to - // remake the list.Items as a []*api.Pod. So leave this separate for - // now. for _, m := range s.Indexer.List() { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { @@ -78,14 +74,14 @@ type storePodsNamespacer struct { // Please note that selector is filtering among the pods that have gotten into // the store; there may have been some filtering that already happened before // that. -func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) { - pods := api.PodList{} - +// We explicitly don't return api.PodList, to avoid expensive allocations, which +// in most cases are unnecessary. +func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) { if s.namespace == api.NamespaceAll { for _, m := range s.indexer.List() { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { - pods.Items = append(pods.Items, *pod) + pods = append(pods, pod) } } return pods, nil @@ -99,7 +95,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) for _, m := range s.indexer.List() { pod := m.(*api.Pod) if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) { - pods.Items = append(pods.Items, *pod) + pods = append(pods, pod) } } return pods, nil @@ -107,7 +103,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) for _, m := range items { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { - pods.Items = append(pods.Items, *pod) + pods = append(pods, pod) } } return pods, nil diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index d6e89e82b67..a63f06ee93e 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -718,9 +718,9 @@ func TestStoreToPodLister(t *testing.T) { defaultPods, err := spl.Pods(api.NamespaceDefault).List(labels.Set{}.AsSelector()) if err != nil { t.Errorf("Unexpected error: %v", err) - } else if e, a := 1, len(defaultPods.Items); e != a { + } else if e, a := 1, len(defaultPods); e != a { t.Errorf("Expected %v, got %v", e, a) - } else if e, a := "quux", defaultPods.Items[0].Name; e != a { + } else if e, a := "quux", defaultPods[0].Name; e != a { t.Errorf("Expected %v, got %v", e, a) } diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 6c530e47f37..f2dcae5fd73 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -53,13 +53,13 @@ func NewPodControllerRefManager( // controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch // are the pods that have a controllerRef pointing to the controller, but their // labels no longer match the selector. -func (m *PodControllerRefManager) Classify(pods []api.Pod) ( +func (m *PodControllerRefManager) Classify(pods []*api.Pod) ( matchesAndControlled []*api.Pod, matchesNeedsController []*api.Pod, controlledDoesNotMatch []*api.Pod) { for i := range pods { pod := pods[i] - if !IsPodActive(pod) { + if !IsPodActive(*pod) { glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) continue @@ -69,9 +69,9 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) ( if controllerRef.UID == m.controllerObject.UID { // already controlled if m.controllerSelector.Matches(labels.Set(pod.Labels)) { - matchesAndControlled = append(matchesAndControlled, &pod) + matchesAndControlled = append(matchesAndControlled, pod) } else { - controlledDoesNotMatch = append(controlledDoesNotMatch, &pod) + controlledDoesNotMatch = append(controlledDoesNotMatch, pod) } } else { // ignoring the pod controlled by other controller @@ -83,7 +83,7 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) ( if !m.controllerSelector.Matches(labels.Set(pod.Labels)) { continue } - matchesNeedsController = append(matchesNeedsController, &pod) + matchesNeedsController = append(matchesNeedsController, pod) } } return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 38cb99feb3d..2ccccfb07de 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -683,12 +683,11 @@ func maxContainerRestarts(pod *api.Pod) int { } // FilterActivePods returns pods that have not terminated. -func FilterActivePods(pods []api.Pod) []*api.Pod { +func FilterActivePods(pods []*api.Pod) []*api.Pod { var result []*api.Pod - for i := range pods { - p := pods[i] - if IsPodActive(p) { - result = append(result, &p) + for _, p := range pods { + if IsPodActive(*p) { + result = append(result, p) } else { glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 1b5d34c94b2..02247f3b8ba 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -458,9 +458,11 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) if err != nil { return nodeToDaemonPods, err } - for i := range daemonPods.Items { - nodeName := daemonPods.Items[i].Spec.NodeName - nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i]) + for i := range daemonPods { + // TODO: Do we need to copy here? + daemonPod := &(*daemonPods[i]) + nodeName := daemonPod.Spec.NodeName + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod) } return nodeToDaemonPods, nil } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index e6c82af1b90..1479d02f701 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -174,10 +174,14 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) } options := api.ListOptions{LabelSelector: selector} - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) if err != nil { return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) } + podList := api.PodList{Items: make([]api.Pod, 0, len(pods))} + for i := range pods { + podList.Items = append(podList.Items, *pods[i]) + } allPodsLabeled := false if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil { return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) @@ -224,8 +228,12 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { return deploymentutil.ListPods(deployment, func(namespace string, options api.ListOptions) (*api.PodList, error) { - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - return &podList, err + pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + result := api.PodList{Items: make([]api.Pod, 0, len(pods))} + for i := range pods { + result.Items = append(result.Items, *pods[i]) + } + return &result, err }) } diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 8b80a9bcc69..704ee38af2d 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -377,16 +377,16 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ( if err != nil { return []*api.Pod{}, err } - podList, err := dc.podLister.Pods(pdb.Namespace).List(sel) + pods, err := dc.podLister.Pods(pdb.Namespace).List(sel) if err != nil { return []*api.Pod{}, err } - pods := []*api.Pod{} - for i := range podList.Items { - pod := podList.Items[i] - pods = append(pods, &pod) + // TODO: Do we need to copy here? + result := make([]*api.Pod, 0, len(pods)) + for i := range pods { + result = append(result, &(*pods[i])) } - return pods, nil + return result, nil } func (dc *DisruptionController) worker() { diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index e61c771c38c..fbc53141a20 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -380,8 +380,9 @@ func (e *EndpointController) syncService(key string) { } } - for i := range pods.Items { - pod := &pods.Items[i] + for i := range pods { + // TODO: Do we need to copy here? + pod := &(*pods[i]) for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 360e07332ca..cfb353915d9 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -332,16 +332,16 @@ func (jm *JobController) syncJob(key string) error { } jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey) selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector) - podList, err := jm.podStore.Pods(job.Namespace).List(selector) + pods, err := jm.podStore.Pods(job.Namespace).List(selector) if err != nil { glog.Errorf("Error getting pods for job %q: %v", key, err) jm.queue.Add(key) return err } - activePods := controller.FilterActivePods(podList.Items) + activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(podList.Items) + succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) if job.Status.StartTime == nil { now := unversioned.Now() @@ -450,7 +450,7 @@ func newCondition(conditionType batch.JobConditionType, reason, message string) } // getStatus returns no of succeeded and failed pods running a job -func getStatus(pods []api.Pod) (succeeded, failed int32) { +func getStatus(pods []*api.Pod) (succeeded, failed int32) { succeeded = int32(filterPods(pods, api.PodSucceeded)) failed = int32(filterPods(pods, api.PodFailed)) return @@ -458,6 +458,7 @@ func getStatus(pods []api.Pod) (succeeded, failed int32) { // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. +// Does NOT modify . func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *batch.Job) int32 { var activeLock sync.Mutex active := int32(len(activePods)) @@ -550,7 +551,7 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error { } // filterPods returns pods based on their phase. -func filterPods(pods []api.Pod, phase api.PodPhase) int { +func filterPods(pods []*api.Pod, phase api.PodPhase) int { result := 0 for i := range pods { if phase == pods[i].Status.Phase { diff --git a/pkg/controller/petset/pet_set.go b/pkg/controller/petset/pet_set.go index d15688d6ccd..bc77dd02004 100644 --- a/pkg/controller/petset/pet_set.go +++ b/pkg/controller/petset/pet_set.go @@ -218,15 +218,16 @@ func (psc *PetSetController) getPodsForPetSet(ps *apps.PetSet) ([]*api.Pod, erro if err != nil { return []*api.Pod{}, err } - petList, err := psc.podStore.Pods(ps.Namespace).List(sel) + pods, err := psc.podStore.Pods(ps.Namespace).List(sel) if err != nil { return []*api.Pod{}, err } - pods := []*api.Pod{} - for _, p := range petList.Items { - pods = append(pods, &p) + // TODO: Do we need to copy? + result := make([]*api.Pod, 0, len(pods)) + for i := range pods { + result = append(result, &(*pods[i])) } - return pods, nil + return result, nil } // getPetSetForPod returns the pet set managing the given pod. diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 5cc31366b45..e2d44df2f4e 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -458,6 +458,7 @@ func (rsc *ReplicaSetController) worker() { } // manageReplicas checks and updates replicas for the given ReplicaSet. +// Does NOT modify . func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) { diff := len(filteredPods) - int(rs.Spec.Replicas) rsKey, err := controller.KeyFunc(rs) @@ -593,19 +594,21 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return err } + // NOTE: filteredPods are pointing to objects from cache - if you need to + // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*api.Pod if rsc.garbageCollectorEnabled { // list all pods to include the pods that don't match the rs`s selector // anymore but has the stale controller ref. - podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) + pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rs %q: %v", key, err) rsc.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) - matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items) + matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. @@ -636,13 +639,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return aggregate } } else { - podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) + pods, err := rsc.podStore.Pods(rs.Namespace).List(selector) if err != nil { glog.Errorf("Error getting pods for rs %q: %v", key, err) rsc.queue.Add(key) return err } - filteredPods = controller.FilterActivePods(podList.Items) + filteredPods = controller.FilterActivePods(pods) } if rsNeedsSync && rs.DeletionTimestamp == nil { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 5ef0b486ccd..ecb1e9a34df 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -479,6 +479,7 @@ func (rm *ReplicationManager) worker() { } // manageReplicas checks and updates replicas for the given replication controller. +// Does NOT modify . func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) { diff := len(filteredPods) - int(rc.Spec.Replicas) rcKey, err := controller.KeyFunc(rc) @@ -617,19 +618,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey) trace.Step("Expectations restored") + // NOTE: filteredPods are pointing to objects from cache - if you need to + // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*api.Pod if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. - podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) + pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelector(), getRCKind()) - matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items) + matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. @@ -660,13 +663,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { return aggregate } } else { - podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector()) + pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } - filteredPods = controller.FilterActivePods(podList.Items) + filteredPods = controller.FilterActivePods(pods) } if rcNeedsSync && rc.DeletionTimestamp == nil { From 594234d61c4e6c7dd29be1a575d88aa899478cc6 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Wed, 17 Aug 2016 13:05:37 -0700 Subject: [PATCH 2/2] fix tests; convert IsPodActive to operate on *Pod --- pkg/controller/controller_ref_manager.go | 2 +- pkg/controller/controller_utils.go | 4 ++-- pkg/controller/controller_utils_test.go | 6 +++++- pkg/controller/deployment/util/deployment_util.go | 2 +- test/e2e/framework/util.go | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index f2dcae5fd73..52a8667f209 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -59,7 +59,7 @@ func (m *PodControllerRefManager) Classify(pods []*api.Pod) ( controlledDoesNotMatch []*api.Pod) { for i := range pods { pod := pods[i] - if !IsPodActive(*pod) { + if !IsPodActive(pod) { glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) continue diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 2ccccfb07de..81930422eba 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -686,7 +686,7 @@ func maxContainerRestarts(pod *api.Pod) int { func FilterActivePods(pods []*api.Pod) []*api.Pod { var result []*api.Pod for _, p := range pods { - if IsPodActive(*p) { + if IsPodActive(p) { result = append(result, p) } else { glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", @@ -696,7 +696,7 @@ func FilterActivePods(pods []*api.Pod) []*api.Pod { return result } -func IsPodActive(p api.Pod) bool { +func IsPodActive(p *api.Pod) bool { return api.PodSucceeded != p.Status.Phase && api.PodFailed != p.Status.Phase && p.DeletionTimestamp == nil diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index a605fb658f9..92bf9b496c3 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -287,7 +287,11 @@ func TestActivePodFiltering(t *testing.T) { expectedNames.Insert(pod.Name) } - got := FilterActivePods(podList.Items) + var podPointers []*api.Pod + for i := range podList.Items { + podPointers = append(podPointers, &podList.Items[i]) + } + got := FilterActivePods(podPointers) gotNames := sets.NewString() for _, pod := range got { gotNames.Insert(pod.Name) diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 083f8aef1ba..d7e43508f33 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -633,7 +633,7 @@ func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 { // IsPodAvailable return true if the pod is available. func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool { - if !controller.IsPodActive(*pod) { + if !controller.IsPodActive(pod) { return false } // Check if we've passed minReadySeconds since LastTransitionTime diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 6dc37d74ef6..1931c92c219 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3166,7 +3166,7 @@ func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error { return wait.PollImmediate(interval, timeout, func() (bool, error) { pods := ps.List() for _, pod := range pods { - if controller.IsPodActive(*pod) { + if controller.IsPodActive(pod) { return false, nil } }