mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-02-22 07:03:28 +00:00
Merge pull request #134662 from liggitt/unexport-orphan-1.33
Manual partial cherry pick of #134654: Include relevant dimensions in pod controller indexing
This commit is contained in:
@@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -87,12 +88,9 @@ const (
|
||||
// PodNodeNameKeyIndex is the name of the index used by PodInformer to index pods by their node name.
|
||||
PodNodeNameKeyIndex = "spec.nodeName"
|
||||
|
||||
// OrphanPodIndexKey is used to index all Orphan pods to this key
|
||||
OrphanPodIndexKey = "_ORPHAN_POD"
|
||||
|
||||
// podControllerUIDIndex is the name for the Pod store's index function,
|
||||
// which is to index by pods's controllerUID.
|
||||
PodControllerUIDIndex = "podControllerUID"
|
||||
// PodControllerIndex is the name for the Pod store's index function,
|
||||
// which indexes by the key returned from PodControllerIndexKey.
|
||||
PodControllerIndex = "podController"
|
||||
)
|
||||
|
||||
var UpdateTaintBackoff = wait.Backoff{
|
||||
@@ -1083,35 +1081,74 @@ func AddPodNodeNameIndexer(podInformer cache.SharedIndexInformer) error {
|
||||
})
|
||||
}
|
||||
|
||||
// OrphanPodIndexKeyForNamespace returns the orphan pod index key for a specific namespace.
|
||||
func OrphanPodIndexKeyForNamespace(namespace string) string {
|
||||
return OrphanPodIndexKey + "/" + namespace
|
||||
// PodControllerIndexKey returns the index key to locate pods with the specified controller ownerReference.
|
||||
// If ownerReference is nil, the returned key locates pods in the namespace without a controller ownerReference.
|
||||
func PodControllerIndexKey(namespace string, ownerReference *metav1.OwnerReference) string {
|
||||
if ownerReference == nil {
|
||||
return namespace
|
||||
}
|
||||
return namespace + "/" + ownerReference.Kind + "/" + ownerReference.Name + "/" + string(ownerReference.UID)
|
||||
}
|
||||
|
||||
// AddPodControllerUIDIndexer adds an indexer for Pod's controllerRef.UID to the given PodInformer.
|
||||
// AddPodControllerIndexer adds an indexer for Pod's controllerRef.UID to the given PodInformer.
|
||||
// This indexer is used to efficiently look up pods by their ControllerRef.UID
|
||||
func AddPodControllerUIDIndexer(podInformer cache.SharedIndexInformer) error {
|
||||
if _, exists := podInformer.GetIndexer().GetIndexers()[PodControllerUIDIndex]; exists {
|
||||
func AddPodControllerIndexer(podInformer cache.SharedIndexInformer) error {
|
||||
if _, exists := podInformer.GetIndexer().GetIndexers()[PodControllerIndex]; exists {
|
||||
// indexer already exists, do nothing
|
||||
return nil
|
||||
}
|
||||
return podInformer.AddIndexers(cache.Indexers{
|
||||
PodControllerUIDIndex: func(obj interface{}) ([]string, error) {
|
||||
PodControllerIndex: func(obj interface{}) ([]string, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
// Get the ControllerRef of the Pod to check if it's managed by a controller
|
||||
if ref := metav1.GetControllerOf(pod); ref != nil {
|
||||
return []string{string(ref.UID)}, nil
|
||||
}
|
||||
// If the Pod has no controller (i.e., it's orphaned), index it with the OrphanPodIndexKeyForNamespace
|
||||
// This helps identify orphan pods for reconciliation and adoption by controllers
|
||||
return []string{OrphanPodIndexKeyForNamespace(pod.Namespace)}, nil
|
||||
// Get the ControllerRef of the Pod to check if it's managed by a controller.
|
||||
// Index with a non-nil controller (indicating an owned pod) or a nil controller (indicating an orphan pod).
|
||||
return []string{PodControllerIndexKey(pod.Namespace, metav1.GetControllerOf(pod))}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// FilterPodsByOwner gets the Pods managed by an owner or orphan Pods in the owner's namespace
|
||||
func FilterPodsByOwner(podIndexer cache.Indexer, owner *metav1.ObjectMeta, ownerKind string, includeOrphanedPods bool) ([]*v1.Pod, error) {
|
||||
result := []*v1.Pod{}
|
||||
|
||||
if len(owner.Namespace) == 0 {
|
||||
return nil, fmt.Errorf("no owner namespace provided")
|
||||
}
|
||||
if len(owner.Name) == 0 {
|
||||
return nil, fmt.Errorf("no owner name provided")
|
||||
}
|
||||
if len(owner.UID) == 0 {
|
||||
return nil, fmt.Errorf("no owner uid provided")
|
||||
}
|
||||
if len(ownerKind) == 0 {
|
||||
return nil, fmt.Errorf("no owner kind provided")
|
||||
}
|
||||
// Always include the owner key, which identifies Pods that are controlled by the owner
|
||||
keys := []string{PodControllerIndexKey(owner.Namespace, &metav1.OwnerReference{Name: owner.Name, Kind: ownerKind, UID: owner.UID})}
|
||||
if includeOrphanedPods {
|
||||
// Optionally include the unowned key, which identifies orphaned Pods in the owner's namespace and might be adopted by the owner later
|
||||
keys = append(keys, PodControllerIndexKey(owner.Namespace, nil))
|
||||
}
|
||||
for _, key := range keys {
|
||||
pods, err := podIndexer.ByIndex(PodControllerIndex, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range pods {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type in pod indexer: %v", obj))
|
||||
continue
|
||||
}
|
||||
result = append(result, pod)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PodKey returns a key unique to the given pod within a cluster.
|
||||
// It's used so we consistently use the same key scheme in this module.
|
||||
// It does exactly what cache.MetaNamespaceKeyFunc would have done
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clientscheme "k8s.io/client-go/kubernetes/scheme"
|
||||
@@ -1359,3 +1360,164 @@ func TestAddOrUpdateTaintOnNode(t *testing.T) {
|
||||
test.name, test.requestCount, test.nodeHandler.RequestCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterPodsByOwner(t *testing.T) {
|
||||
newPod := func(name, ns string, owner *metav1.OwnerReference) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: ns,
|
||||
},
|
||||
}
|
||||
if owner != nil {
|
||||
pod.OwnerReferences = append(pod.OwnerReferences, *owner)
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
ownerKind := "OwnerKind"
|
||||
ownerName := "ownerName"
|
||||
cases := map[string]struct {
|
||||
owner *metav1.ObjectMeta
|
||||
ownedOnly bool
|
||||
allPods []*v1.Pod
|
||||
wantPodsKeys sets.Set[string]
|
||||
}{
|
||||
"multiple Pods, some are owned by the owner": {
|
||||
owner: &metav1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: ownerName,
|
||||
UID: "abc",
|
||||
},
|
||||
allPods: []*v1.Pod{
|
||||
newPod("a", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("b", "ns1", &metav1.OwnerReference{
|
||||
UID: "def",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("c", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
},
|
||||
wantPodsKeys: sets.New("ns1/a", "ns1/c"),
|
||||
},
|
||||
"orphan Pods in multiple namespaces": {
|
||||
owner: &metav1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: ownerName,
|
||||
UID: "abc",
|
||||
},
|
||||
allPods: []*v1.Pod{
|
||||
newPod("a", "ns1", nil),
|
||||
newPod("b", "ns2", nil),
|
||||
},
|
||||
wantPodsKeys: sets.New("ns1/a"),
|
||||
},
|
||||
"owned Pods and orphan Pods in the owner's namespace": {
|
||||
owner: &metav1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: ownerName,
|
||||
UID: "abc",
|
||||
},
|
||||
allPods: []*v1.Pod{
|
||||
newPod("a", "ns1", nil),
|
||||
newPod("b", "ns2", nil),
|
||||
newPod("c", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
},
|
||||
wantPodsKeys: sets.New("ns1/a", "ns1/c"),
|
||||
},
|
||||
"exclude orphan pods, pods in mismatched ns,uid,kind,name,controller": {
|
||||
owner: &metav1.ObjectMeta{
|
||||
Namespace: "ns1",
|
||||
Name: ownerName,
|
||||
UID: "abc",
|
||||
},
|
||||
allPods: []*v1.Pod{
|
||||
newPod("a", "ns1", nil),
|
||||
newPod("other-ns-orphan", "ns2", nil),
|
||||
newPod("other-ns-owned", "ns2", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("c", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("other-uid", "ns1", &metav1.OwnerReference{
|
||||
UID: "other-uid",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("other-kind", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: "OtherKind",
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("other-name", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: "otherName",
|
||||
Controller: ptr.To(true),
|
||||
}),
|
||||
newPod("non-controller", "ns1", &metav1.OwnerReference{
|
||||
UID: "abc",
|
||||
Kind: ownerKind,
|
||||
Name: ownerName,
|
||||
Controller: ptr.To(false),
|
||||
}),
|
||||
},
|
||||
ownedOnly: true,
|
||||
wantPodsKeys: sets.New("ns1/c"),
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
podInformer := sharedInformers.Core().V1().Pods()
|
||||
|
||||
err := AddPodControllerIndexer(podInformer.Informer())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to register indexer: %v", err)
|
||||
}
|
||||
podIndexer := podInformer.Informer().GetIndexer()
|
||||
for _, pod := range tc.allPods {
|
||||
if err := podIndexer.Add(pod); err != nil {
|
||||
t.Fatalf("failed adding Pod to indexer: %v", err)
|
||||
}
|
||||
}
|
||||
gotPods, err := FilterPodsByOwner(podIndexer, tc.owner, ownerKind, !tc.ownedOnly)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
gotPodKeys := sets.New[string]()
|
||||
for _, pod := range gotPods {
|
||||
gotPodKeys.Insert(pod.Namespace + "/" + pod.Name)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantPodsKeys, gotPodKeys); diff != "" {
|
||||
t.Errorf("unexpected pods returned, diff=%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,7 +217,7 @@ func NewDaemonSetsController(
|
||||
dsc.podLister = podInformer.Lister()
|
||||
dsc.podStoreSynced = podInformer.Informer().HasSynced
|
||||
controller.AddPodNodeNameIndexer(podInformer.Informer())
|
||||
controller.AddPodControllerUIDIndexer(podInformer.Informer())
|
||||
controller.AddPodControllerIndexer(podInformer.Informer())
|
||||
dsc.podIndexer = podInformer.Informer().GetIndexer()
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@@ -689,30 +689,6 @@ func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interfa
|
||||
dsc.nodeUpdateQueue.Add(curNode.Name)
|
||||
}
|
||||
|
||||
// getPodsFromCache returns the Pods that a given DS should manage.
|
||||
func (dsc *DaemonSetsController) getDaemonPodsFromCache(ds *apps.DaemonSet) ([]*v1.Pod, error) {
|
||||
// Iterate over two keys:
|
||||
// The UID of the Daemonset, which identifies Pods that are controlled by the Daemonset.
|
||||
// The OrphanPodIndexKey, which helps identify orphaned Pods that are not currently managed by any controller,
|
||||
// but may be adopted later on if they have matching labels with the Daemonset.
|
||||
podsForDS := []*v1.Pod{}
|
||||
for _, key := range []string{string(ds.UID), controller.OrphanPodIndexKeyForNamespace(ds.Namespace)} {
|
||||
podObjs, err := dsc.podIndexer.ByIndex(controller.PodControllerUIDIndex, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range podObjs {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type in pod indexer: %v", obj))
|
||||
continue
|
||||
}
|
||||
podsForDS = append(podsForDS, pod)
|
||||
}
|
||||
}
|
||||
return podsForDS, nil
|
||||
}
|
||||
|
||||
// getDaemonPods returns daemon pods owned by the given ds.
|
||||
// This also reconciles ControllerRef by adopting/orphaning.
|
||||
// Note that returned Pods are pointers to objects in the cache.
|
||||
@@ -723,7 +699,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.Dae
|
||||
return nil, err
|
||||
}
|
||||
// List all pods indexed to DS UID and Orphan pods
|
||||
pods, err := dsc.getDaemonPodsFromCache(ds)
|
||||
pods, err := controller.FilterPodsByOwner(dsc.podIndexer, &ds.ObjectMeta, "DaemonSet", true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user