mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #30807 from caesarxuchao/change_pod_lister_api
Automatic merge from submit-queue Continue on #30774: Change podNamespacer API continue on #30774, credit to @wojtek-t, Ref #30759 I just fixed a test and converted IsActivePod to operate on *Pod.
This commit is contained in:
commit
e2f39fca86
@ -292,8 +292,9 @@ func (e *endpointController) syncService(key string) {
|
||||
|
||||
subsets := []api.EndpointSubset{}
|
||||
containerPortAnnotations := map[string]string{} // by <HostIP>:<Port>
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
for i := range pods {
|
||||
// TODO: Do we need to copy here?
|
||||
pod := &(*pods[i])
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
|
20
pkg/client/cache/listers.go
vendored
20
pkg/client/cache/listers.go
vendored
@ -49,13 +49,9 @@ type StoreToPodLister struct {
|
||||
// Please note that selector is filtering among the pods that have gotten into
|
||||
// the store; there may have been some filtering that already happened before
|
||||
// that.
|
||||
//
|
||||
// TODO: converge on the interface in pkg/client.
|
||||
// We explicitly don't return api.PodList, to avoid expensive allocations, which
|
||||
// in most cases are unnecessary.
|
||||
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
|
||||
// TODO: it'd be great to just call
|
||||
// s.Pods(api.NamespaceAll).List(selector), however then we'd have to
|
||||
// remake the list.Items as a []*api.Pod. So leave this separate for
|
||||
// now.
|
||||
for _, m := range s.Indexer.List() {
|
||||
pod := m.(*api.Pod)
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
@ -78,14 +74,14 @@ type storePodsNamespacer struct {
|
||||
// Please note that selector is filtering among the pods that have gotten into
|
||||
// the store; there may have been some filtering that already happened before
|
||||
// that.
|
||||
func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) {
|
||||
pods := api.PodList{}
|
||||
|
||||
// We explicitly don't return api.PodList, to avoid expensive allocations, which
|
||||
// in most cases are unnecessary.
|
||||
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
|
||||
if s.namespace == api.NamespaceAll {
|
||||
for _, m := range s.indexer.List() {
|
||||
pod := m.(*api.Pod)
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
pods.Items = append(pods.Items, *pod)
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
}
|
||||
return pods, nil
|
||||
@ -99,7 +95,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error)
|
||||
for _, m := range s.indexer.List() {
|
||||
pod := m.(*api.Pod)
|
||||
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
|
||||
pods.Items = append(pods.Items, *pod)
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
}
|
||||
return pods, nil
|
||||
@ -107,7 +103,7 @@ func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error)
|
||||
for _, m := range items {
|
||||
pod := m.(*api.Pod)
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
pods.Items = append(pods.Items, *pod)
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
}
|
||||
return pods, nil
|
||||
|
4
pkg/client/cache/listers_test.go
vendored
4
pkg/client/cache/listers_test.go
vendored
@ -718,9 +718,9 @@ func TestStoreToPodLister(t *testing.T) {
|
||||
defaultPods, err := spl.Pods(api.NamespaceDefault).List(labels.Set{}.AsSelector())
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
} else if e, a := 1, len(defaultPods.Items); e != a {
|
||||
} else if e, a := 1, len(defaultPods); e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
} else if e, a := "quux", defaultPods.Items[0].Name; e != a {
|
||||
} else if e, a := "quux", defaultPods[0].Name; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func NewPodControllerRefManager(
|
||||
// controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch
|
||||
// are the pods that have a controllerRef pointing to the controller, but their
|
||||
// labels no longer match the selector.
|
||||
func (m *PodControllerRefManager) Classify(pods []api.Pod) (
|
||||
func (m *PodControllerRefManager) Classify(pods []*api.Pod) (
|
||||
matchesAndControlled []*api.Pod,
|
||||
matchesNeedsController []*api.Pod,
|
||||
controlledDoesNotMatch []*api.Pod) {
|
||||
@ -69,9 +69,9 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) (
|
||||
if controllerRef.UID == m.controllerObject.UID {
|
||||
// already controlled
|
||||
if m.controllerSelector.Matches(labels.Set(pod.Labels)) {
|
||||
matchesAndControlled = append(matchesAndControlled, &pod)
|
||||
matchesAndControlled = append(matchesAndControlled, pod)
|
||||
} else {
|
||||
controlledDoesNotMatch = append(controlledDoesNotMatch, &pod)
|
||||
controlledDoesNotMatch = append(controlledDoesNotMatch, pod)
|
||||
}
|
||||
} else {
|
||||
// ignoring the pod controlled by other controller
|
||||
@ -83,7 +83,7 @@ func (m *PodControllerRefManager) Classify(pods []api.Pod) (
|
||||
if !m.controllerSelector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
matchesNeedsController = append(matchesNeedsController, &pod)
|
||||
matchesNeedsController = append(matchesNeedsController, pod)
|
||||
}
|
||||
}
|
||||
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch
|
||||
|
@ -683,12 +683,11 @@ func maxContainerRestarts(pod *api.Pod) int {
|
||||
}
|
||||
|
||||
// FilterActivePods returns pods that have not terminated.
|
||||
func FilterActivePods(pods []api.Pod) []*api.Pod {
|
||||
func FilterActivePods(pods []*api.Pod) []*api.Pod {
|
||||
var result []*api.Pod
|
||||
for i := range pods {
|
||||
p := pods[i]
|
||||
for _, p := range pods {
|
||||
if IsPodActive(p) {
|
||||
result = append(result, &p)
|
||||
result = append(result, p)
|
||||
} else {
|
||||
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
|
||||
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
|
||||
@ -697,7 +696,7 @@ func FilterActivePods(pods []api.Pod) []*api.Pod {
|
||||
return result
|
||||
}
|
||||
|
||||
func IsPodActive(p api.Pod) bool {
|
||||
func IsPodActive(p *api.Pod) bool {
|
||||
return api.PodSucceeded != p.Status.Phase &&
|
||||
api.PodFailed != p.Status.Phase &&
|
||||
p.DeletionTimestamp == nil
|
||||
|
@ -287,7 +287,11 @@ func TestActivePodFiltering(t *testing.T) {
|
||||
expectedNames.Insert(pod.Name)
|
||||
}
|
||||
|
||||
got := FilterActivePods(podList.Items)
|
||||
var podPointers []*api.Pod
|
||||
for i := range podList.Items {
|
||||
podPointers = append(podPointers, &podList.Items[i])
|
||||
}
|
||||
got := FilterActivePods(podPointers)
|
||||
gotNames := sets.NewString()
|
||||
for _, pod := range got {
|
||||
gotNames.Insert(pod.Name)
|
||||
|
@ -458,9 +458,11 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
|
||||
if err != nil {
|
||||
return nodeToDaemonPods, err
|
||||
}
|
||||
for i := range daemonPods.Items {
|
||||
nodeName := daemonPods.Items[i].Spec.NodeName
|
||||
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i])
|
||||
for i := range daemonPods {
|
||||
// TODO: Do we need to copy here?
|
||||
daemonPod := &(*daemonPods[i])
|
||||
nodeName := daemonPod.Spec.NodeName
|
||||
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod)
|
||||
}
|
||||
return nodeToDaemonPods, nil
|
||||
}
|
||||
|
@ -174,10 +174,14 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
|
||||
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
|
||||
}
|
||||
options := api.ListOptions{LabelSelector: selector}
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
|
||||
}
|
||||
podList := api.PodList{Items: make([]api.Pod, 0, len(pods))}
|
||||
for i := range pods {
|
||||
podList.Items = append(podList.Items, *pods[i])
|
||||
}
|
||||
allPodsLabeled := false
|
||||
if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
|
||||
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
|
||||
@ -224,8 +228,12 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
|
||||
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
|
||||
return deploymentutil.ListPods(deployment,
|
||||
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
return &podList, err
|
||||
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
result := api.PodList{Items: make([]api.Pod, 0, len(pods))}
|
||||
for i := range pods {
|
||||
result.Items = append(result.Items, *pods[i])
|
||||
}
|
||||
return &result, err
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -633,7 +633,7 @@ func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
|
||||
|
||||
// IsPodAvailable return true if the pod is available.
|
||||
func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool {
|
||||
if !controller.IsPodActive(*pod) {
|
||||
if !controller.IsPodActive(pod) {
|
||||
return false
|
||||
}
|
||||
// Check if we've passed minReadySeconds since LastTransitionTime
|
||||
|
@ -377,16 +377,16 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) (
|
||||
if err != nil {
|
||||
return []*api.Pod{}, err
|
||||
}
|
||||
podList, err := dc.podLister.Pods(pdb.Namespace).List(sel)
|
||||
pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
|
||||
if err != nil {
|
||||
return []*api.Pod{}, err
|
||||
}
|
||||
pods := []*api.Pod{}
|
||||
for i := range podList.Items {
|
||||
pod := podList.Items[i]
|
||||
pods = append(pods, &pod)
|
||||
// TODO: Do we need to copy here?
|
||||
result := make([]*api.Pod, 0, len(pods))
|
||||
for i := range pods {
|
||||
result = append(result, &(*pods[i]))
|
||||
}
|
||||
return pods, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) worker() {
|
||||
|
@ -380,8 +380,9 @@ func (e *EndpointController) syncService(key string) {
|
||||
}
|
||||
}
|
||||
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
for i := range pods {
|
||||
// TODO: Do we need to copy here?
|
||||
pod := &(*pods[i])
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
|
@ -332,16 +332,16 @@ func (jm *JobController) syncJob(key string) error {
|
||||
}
|
||||
jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
|
||||
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
|
||||
podList, err := jm.podStore.Pods(job.Namespace).List(selector)
|
||||
pods, err := jm.podStore.Pods(job.Namespace).List(selector)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods for job %q: %v", key, err)
|
||||
jm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
|
||||
activePods := controller.FilterActivePods(podList.Items)
|
||||
activePods := controller.FilterActivePods(pods)
|
||||
active := int32(len(activePods))
|
||||
succeeded, failed := getStatus(podList.Items)
|
||||
succeeded, failed := getStatus(pods)
|
||||
conditions := len(job.Status.Conditions)
|
||||
if job.Status.StartTime == nil {
|
||||
now := unversioned.Now()
|
||||
@ -450,7 +450,7 @@ func newCondition(conditionType batch.JobConditionType, reason, message string)
|
||||
}
|
||||
|
||||
// getStatus returns no of succeeded and failed pods running a job
|
||||
func getStatus(pods []api.Pod) (succeeded, failed int32) {
|
||||
func getStatus(pods []*api.Pod) (succeeded, failed int32) {
|
||||
succeeded = int32(filterPods(pods, api.PodSucceeded))
|
||||
failed = int32(filterPods(pods, api.PodFailed))
|
||||
return
|
||||
@ -458,6 +458,7 @@ func getStatus(pods []api.Pod) (succeeded, failed int32) {
|
||||
|
||||
// manageJob is the core method responsible for managing the number of running
|
||||
// pods according to what is specified in the job.Spec.
|
||||
// Does NOT modify <activePods>.
|
||||
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *batch.Job) int32 {
|
||||
var activeLock sync.Mutex
|
||||
active := int32(len(activePods))
|
||||
@ -550,7 +551,7 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error {
|
||||
}
|
||||
|
||||
// filterPods returns pods based on their phase.
|
||||
func filterPods(pods []api.Pod, phase api.PodPhase) int {
|
||||
func filterPods(pods []*api.Pod, phase api.PodPhase) int {
|
||||
result := 0
|
||||
for i := range pods {
|
||||
if phase == pods[i].Status.Phase {
|
||||
|
@ -218,15 +218,16 @@ func (psc *PetSetController) getPodsForPetSet(ps *apps.PetSet) ([]*api.Pod, erro
|
||||
if err != nil {
|
||||
return []*api.Pod{}, err
|
||||
}
|
||||
petList, err := psc.podStore.Pods(ps.Namespace).List(sel)
|
||||
pods, err := psc.podStore.Pods(ps.Namespace).List(sel)
|
||||
if err != nil {
|
||||
return []*api.Pod{}, err
|
||||
}
|
||||
pods := []*api.Pod{}
|
||||
for _, p := range petList.Items {
|
||||
pods = append(pods, &p)
|
||||
// TODO: Do we need to copy?
|
||||
result := make([]*api.Pod, 0, len(pods))
|
||||
for i := range pods {
|
||||
result = append(result, &(*pods[i]))
|
||||
}
|
||||
return pods, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// getPetSetForPod returns the pet set managing the given pod.
|
||||
|
@ -458,6 +458,7 @@ func (rsc *ReplicaSetController) worker() {
|
||||
}
|
||||
|
||||
// manageReplicas checks and updates replicas for the given ReplicaSet.
|
||||
// Does NOT modify <filteredPods>.
|
||||
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) {
|
||||
diff := len(filteredPods) - int(rs.Spec.Replicas)
|
||||
rsKey, err := controller.KeyFunc(rs)
|
||||
@ -593,19 +594,21 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||
// modify them, you need to copy it first.
|
||||
// TODO: Do the List and Filter in a single pass, or use an index.
|
||||
var filteredPods []*api.Pod
|
||||
if rsc.garbageCollectorEnabled {
|
||||
// list all pods to include the pods that don't match the rs`s selector
|
||||
// anymore but has the stale controller ref.
|
||||
podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
|
||||
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods for rs %q: %v", key, err)
|
||||
rsc.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind())
|
||||
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
|
||||
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
|
||||
for _, pod := range matchesNeedsController {
|
||||
err := cm.AdoptPod(pod)
|
||||
// continue to next pod if adoption fails.
|
||||
@ -636,13 +639,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
return aggregate
|
||||
}
|
||||
} else {
|
||||
podList, err := rsc.podStore.Pods(rs.Namespace).List(selector)
|
||||
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods for rs %q: %v", key, err)
|
||||
rsc.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
filteredPods = controller.FilterActivePods(podList.Items)
|
||||
filteredPods = controller.FilterActivePods(pods)
|
||||
}
|
||||
|
||||
if rsNeedsSync && rs.DeletionTimestamp == nil {
|
||||
|
@ -479,6 +479,7 @@ func (rm *ReplicationManager) worker() {
|
||||
}
|
||||
|
||||
// manageReplicas checks and updates replicas for the given replication controller.
|
||||
// Does NOT modify <filteredPods>.
|
||||
func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) {
|
||||
diff := len(filteredPods) - int(rc.Spec.Replicas)
|
||||
rcKey, err := controller.KeyFunc(rc)
|
||||
@ -617,19 +618,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||
rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
|
||||
trace.Step("Expectations restored")
|
||||
|
||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||
// modify them, you need to copy it first.
|
||||
// TODO: Do the List and Filter in a single pass, or use an index.
|
||||
var filteredPods []*api.Pod
|
||||
if rm.garbageCollectorEnabled {
|
||||
// list all pods to include the pods that don't match the rc's selector
|
||||
// anymore but has the stale controller ref.
|
||||
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
|
||||
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
||||
rm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelector(), getRCKind())
|
||||
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
|
||||
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
|
||||
for _, pod := range matchesNeedsController {
|
||||
err := cm.AdoptPod(pod)
|
||||
// continue to next pod if adoption fails.
|
||||
@ -660,13 +663,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||
return aggregate
|
||||
}
|
||||
} else {
|
||||
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
|
||||
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
||||
rm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
filteredPods = controller.FilterActivePods(podList.Items)
|
||||
filteredPods = controller.FilterActivePods(pods)
|
||||
}
|
||||
|
||||
if rcNeedsSync && rc.DeletionTimestamp == nil {
|
||||
|
@ -3166,7 +3166,7 @@ func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
|
||||
return wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
pods := ps.List()
|
||||
for _, pod := range pods {
|
||||
if controller.IsPodActive(*pod) {
|
||||
if controller.IsPodActive(pod) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user