Change podNamespacer API

This commit is contained in:
Wojciech Tyczynski 2016-08-17 16:16:01 +02:00
parent c0e79d8da7
commit 331083727f
13 changed files with 73 additions and 58 deletions

View File

@ -292,8 +292,9 @@ func (e *endpointController) syncService(key string) {
subsets := []api.EndpointSubset{}
containerPortAnnotations := map[string]string{} // by <HostIP>:<Port>
for i := range pods.Items {
pod := &pods.Items[i]
for i := range pods {
// TODO: Do we need to copy here?
pod := &(*pods[i])
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]

View File

@ -49,13 +49,9 @@ type StoreToPodLister struct {
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
//
// TODO: converge on the interface in pkg/client.
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
// TODO: it'd be great to just call
// s.Pods(api.NamespaceAll).List(selector), however then we'd have to
// remake the list.Items as a []*api.Pod. So leave this separate for
// now.
for _, m := range s.Indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
@ -78,14 +74,14 @@ type storePodsNamespacer struct {
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) {
pods := api.PodList{}
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods.Items = append(pods.Items, *pod)
pods = append(pods, pod)
}
}
return pods, nil
@ -99,7 +95,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error)
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
pods.Items = append(pods.Items, *pod)
pods = append(pods, pod)
}
}
return pods, nil
@ -107,7 +103,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error)
for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods.Items = append(pods.Items, *pod)
pods = append(pods, pod)
}
}
return pods, nil

View File

@ -718,9 +718,9 @@ func TestStoreToPodLister(t *testing.T) {
defaultPods, err := spl.Pods(api.NamespaceDefault).List(labels.Set{}.AsSelector())
if err != nil {
t.Errorf("Unexpected error: %v", err)
} else if e, a := 1, len(defaultPods.Items); e != a {
} else if e, a := 1, len(defaultPods); e != a {
t.Errorf("Expected %v, got %v", e, a)
} else if e, a := "quux", defaultPods.Items[0].Name; e != a {
} else if e, a := "quux", defaultPods[0].Name; e != a {
t.Errorf("Expected %v, got %v", e, a)
}

View File

@ -53,13 +53,13 @@ func NewPodControllerRefManager(
// controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch
// are the pods that have a controllerRef pointing to the controller, but their
// labels no longer match the selector.
func (m *PodControllerRefManager) Classify(pods []api.Pod) (
func (m *PodControllerRefManager) Classify(pods []*api.Pod) (
matchesAndControlled []*api.Pod,
matchesNeedsController []*api.Pod,
controlledDoesNotMatch []*api.Pod) {
for i := range pods {
pod := pods[i]
if !IsPodActive(pod) {
if !IsPodActive(*pod) {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp)
continue
@ -69,9 +69,9 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) (
if controllerRef.UID == m.controllerObject.UID {
// already controlled
if m.controllerSelector.Matches(labels.Set(pod.Labels)) {
matchesAndControlled = append(matchesAndControlled, &pod)
matchesAndControlled = append(matchesAndControlled, pod)
} else {
controlledDoesNotMatch = append(controlledDoesNotMatch, &pod)
controlledDoesNotMatch = append(controlledDoesNotMatch, pod)
}
} else {
// ignoring the pod controlled by other controller
@ -83,7 +83,7 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) (
if !m.controllerSelector.Matches(labels.Set(pod.Labels)) {
continue
}
matchesNeedsController = append(matchesNeedsController, &pod)
matchesNeedsController = append(matchesNeedsController, pod)
}
}
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch

View File

@ -683,12 +683,11 @@ func maxContainerRestarts(pod *api.Pod) int {
}
// FilterActivePods returns pods that have not terminated.
func FilterActivePods(pods []api.Pod) []*api.Pod {
func FilterActivePods(pods []*api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
p := pods[i]
if IsPodActive(p) {
result = append(result, &p)
for _, p := range pods {
if IsPodActive(*p) {
result = append(result, p)
} else {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)

View File

@ -458,9 +458,11 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
if err != nil {
return nodeToDaemonPods, err
}
for i := range daemonPods.Items {
nodeName := daemonPods.Items[i].Spec.NodeName
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i])
for i := range daemonPods {
// TODO: Do we need to copy here?
daemonPod := &(*daemonPods[i])
nodeName := daemonPod.Spec.NodeName
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod)
}
return nodeToDaemonPods, nil
}

View File

@ -174,10 +174,14 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
}
options := api.ListOptions{LabelSelector: selector}
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
}
podList := api.PodList{Items: make([]api.Pod, 0, len(pods))}
for i := range pods {
podList.Items = append(podList.Items, *pods[i])
}
allPodsLabeled := false
if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
@ -224,8 +228,12 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
return deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
return &podList, err
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
result := api.PodList{Items: make([]api.Pod, 0, len(pods))}
for i := range pods {
result.Items = append(result.Items, *pods[i])
}
return &result, err
})
}

View File

@ -377,16 +377,16 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) (
if err != nil {
return []*api.Pod{}, err
}
podList, err := dc.podLister.Pods(pdb.Namespace).List(sel)
pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
if err != nil {
return []*api.Pod{}, err
}
pods := []*api.Pod{}
for i := range podList.Items {
pod := podList.Items[i]
pods = append(pods, &pod)
// TODO: Do we need to copy here?
result := make([]*api.Pod, 0, len(pods))
for i := range pods {
result = append(result, &(*pods[i]))
}
return pods, nil
return result, nil
}
func (dc *DisruptionController) worker() {

View File

@ -380,8 +380,9 @@ func (e *EndpointController) syncService(key string) {
}
}
for i := range pods.Items {
pod := &pods.Items[i]
for i := range pods {
// TODO: Do we need to copy here?
pod := &(*pods[i])
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]

View File

@ -332,16 +332,16 @@ func (jm *JobController) syncJob(key string) error {
}
jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
podList, err := jm.podStore.Pods(job.Namespace).List(selector)
pods, err := jm.podStore.Pods(job.Namespace).List(selector)
if err != nil {
glog.Errorf("Error getting pods for job %q: %v", key, err)
jm.queue.Add(key)
return err
}
activePods := controller.FilterActivePods(podList.Items)
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(podList.Items)
succeeded, failed := getStatus(pods)
conditions := len(job.Status.Conditions)
if job.Status.StartTime == nil {
now := unversioned.Now()
@ -450,7 +450,7 @@ func newCondition(conditionType batch.JobConditionType, reason, message string)
}
// getStatus returns no of succeeded and failed pods running a job
func getStatus(pods []api.Pod) (succeeded, failed int32) {
func getStatus(pods []*api.Pod) (succeeded, failed int32) {
succeeded = int32(filterPods(pods, api.PodSucceeded))
failed = int32(filterPods(pods, api.PodFailed))
return
@ -458,6 +458,7 @@ func getStatus(pods []api.Pod) (succeeded, failed int32) {
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *batch.Job) int32 {
var activeLock sync.Mutex
active := int32(len(activePods))
@ -550,7 +551,7 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error {
}
// filterPods returns pods based on their phase.
func filterPods(pods []api.Pod, phase api.PodPhase) int {
func filterPods(pods []*api.Pod, phase api.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {

View File

@ -218,15 +218,16 @@ func (psc *PetSetController) getPodsForPetSet(ps *apps.PetSet) ([]*api.Pod, erro
if err != nil {
return []*api.Pod{}, err
}
petList, err := psc.podStore.Pods(ps.Namespace).List(sel)
pods, err := psc.podStore.Pods(ps.Namespace).List(sel)
if err != nil {
return []*api.Pod{}, err
}
pods := []*api.Pod{}
for _, p := range petList.Items {
pods = append(pods, &p)
// TODO: Do we need to copy?
result := make([]*api.Pod, 0, len(pods))
for i := range pods {
result = append(result, &(*pods[i]))
}
return pods, nil
return result, nil
}
// getPetSetForPod returns the pet set managing the given pod.

View File

@ -458,6 +458,7 @@ func (rsc *ReplicaSetController) worker() {
}
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) {
diff := len(filteredPods) - int(rs.Spec.Replicas)
rsKey, err := controller.KeyFunc(rs)
@ -593,19 +594,21 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return err
}
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*api.Pod
if rsc.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rs %q: %v", key, err)
rsc.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
for _, pod := range matchesNeedsController {
err := cm.AdoptPod(pod)
// continue to next pod if adoption fails.
@ -636,13 +639,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return aggregate
}
} else {
podList, err := rsc.podStore.Pods(rs.Namespace).List(selector)
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector)
if err != nil {
glog.Errorf("Error getting pods for rs %q: %v", key, err)
rsc.queue.Add(key)
return err
}
filteredPods = controller.FilterActivePods(podList.Items)
filteredPods = controller.FilterActivePods(pods)
}
if rsNeedsSync && rs.DeletionTimestamp == nil {

View File

@ -479,6 +479,7 @@ func (rm *ReplicationManager) worker() {
}
// manageReplicas checks and updates replicas for the given replication controller.
// Does NOT modify <filteredPods>.
func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) {
diff := len(filteredPods) - int(rc.Spec.Replicas)
rcKey, err := controller.KeyFunc(rc)
@ -617,19 +618,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
trace.Step("Expectations restored")
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*api.Pod
if rm.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelector(), getRCKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
for _, pod := range matchesNeedsController {
err := cm.AdoptPod(pod)
// continue to next pod if adoption fails.
@ -660,13 +663,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
return aggregate
}
} else {
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
filteredPods = controller.FilterActivePods(podList.Items)
filteredPods = controller.FilterActivePods(pods)
}
if rcNeedsSync && rc.DeletionTimestamp == nil {