mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Refactor ControllerRefManager
To prepare for implementing ControllerRef across all controllers, this pushes the common adopt/orphan logic into ControllerRefManager so each controller doesn't have to duplicate it. This also shares the adopt/orphan logic between Pods and ReplicaSets, so it lives in only one place.
This commit is contained in:
@@ -30,7 +30,6 @@ go_library(
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/errors",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apiserver/pkg/util/trace",
|
||||
|
||||
@@ -30,7 +30,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||
@@ -620,39 +619,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||
rm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
|
||||
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
|
||||
// Adopt pods only if this replication controller is not going to be deleted.
|
||||
if rc.DeletionTimestamp == nil {
|
||||
for _, pod := range matchesNeedsController {
|
||||
err := cm.AdoptPod(pod)
|
||||
// continue to next pod if adoption fails.
|
||||
if err != nil {
|
||||
// If the pod no longer exists, don't even log the error.
|
||||
if !errors.IsNotFound(err) {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
} else {
|
||||
matchesAndControlled = append(matchesAndControlled, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
filteredPods = matchesAndControlled
|
||||
// remove the controllerRef for the pods that no longer have matching labels
|
||||
var errlist []error
|
||||
for _, pod := range controlledDoesNotMatch {
|
||||
err := cm.ReleasePod(pod)
|
||||
if err != nil {
|
||||
errlist = append(errlist, err)
|
||||
}
|
||||
}
|
||||
if len(errlist) != 0 {
|
||||
aggregate := utilerrors.NewAggregate(errlist)
|
||||
// push the RC into work queue again. We need to try to free the
|
||||
// pods again otherwise they will stuck with the stale
|
||||
// controllerRef.
|
||||
cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
|
||||
filteredPods, err = cm.ClaimPods(pods)
|
||||
if err != nil {
|
||||
// Something went wrong with adoption or release.
|
||||
// Requeue and try again so we don't leave orphans sitting around.
|
||||
rm.queue.Add(key)
|
||||
return aggregate
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
|
||||
|
||||
@@ -1190,14 +1190,20 @@ func TestPatchPodFails(t *testing.T) {
|
||||
podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil))
|
||||
podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil))
|
||||
// let both patches fail. The rc manager will assume it fails to take
|
||||
// control of the pods and create new ones.
|
||||
// control of the pods and requeue to try again.
|
||||
fakePodControl.Err = fmt.Errorf("Fake Error")
|
||||
err := manager.syncReplicationController(getKey(rc, t))
|
||||
if err == nil || err.Error() != "Fake Error" {
|
||||
rcKey := getKey(rc, t)
|
||||
err := manager.syncReplicationController(rcKey)
|
||||
if err == nil || !strings.Contains(err.Error(), "Fake Error") {
|
||||
t.Fatalf("expected Fake Error, got %v", err)
|
||||
}
|
||||
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
|
||||
validateSyncReplication(t, fakePodControl, 2, 0, 2)
|
||||
// 2 patches to take control of pod1 and pod2 (both fail).
|
||||
validateSyncReplication(t, fakePodControl, 0, 0, 2)
|
||||
// RC should requeue itself.
|
||||
queueRC, _ := manager.queue.Get()
|
||||
if queueRC != rcKey {
|
||||
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchExtraPodsThenDelete(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user